为用户中心统计提高mq服务

parent fd161ca8
......@@ -31,7 +31,16 @@
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
......
package cn.quantgroup.xyqb.config.mq;
import cn.quantgroup.xyqb.service.mq.IVestService;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* //马甲包 xuran
*/
@Configuration
public class LoanVestMQConfig {
@Value("${loanvest.rabbitmq.queue}")
private String queueName;
@Value("${loanvest.rabbitmq.exchange}")
private String loanVestExchange;
@Value("${loanvest.rabbitmq.connection.host}")
private String host;
@Value("${loanvest.rabbitmq.connection.port}")
private Integer port;
@Value("${loanvest.rabbitmq.connection.user}")
private String user;
@Value("${loanvest.rabbitmq.connection.password}")
private String password;
@Value("${loanvest.rabbitmq.connection.virtual-host}")
private String virtualHost;
//@Value("${loanvest.rabbitmq.stateMsgQueue}")
//private String stateQueueName;
@Bean(name = "vestFactory")
public ConnectionFactory vestFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setChannelCacheSize(1024);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setChannelCacheSize(180 * 1000);
connectionFactory.setConnectionCacheSize(1024);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherReturns(false);
connectionFactory.setPublisherConfirms(false);
return connectionFactory;
}
@Bean(name= "loanVestAmqpAdmin")
public AmqpAdmin loanVestAdmin(@Qualifier("vestFactory") ConnectionFactory vestFactory) {
return new RabbitAdmin(vestFactory);
}
@Bean(name = "loanVestExchange")
public FanoutExchange loanVestExchange() {
return new FanoutExchange(loanVestExchange);
}
@Bean(name = "loanVestQueue")
public Queue loanVestQueue() {
return new Queue(queueName);
}
@Bean(name = "loanVestBinding")
public Binding bindingLoanVest(@Qualifier("loanVestAmqpAdmin")AmqpAdmin loanVestAdmin, @Qualifier("loanVestQueue")Queue loanVestQueue, @Qualifier("loanVestExchange")FanoutExchange loanVestExchange) {
Binding binding = BindingBuilder.bind(loanVestQueue).to(loanVestExchange);
loanVestAdmin.declareBinding(binding);
return binding;
}
@Bean(name = "loanVestRabbitTemplate")
public RabbitTemplate loanVestTemplate(@Qualifier("vestFactory") ConnectionFactory vestFactory) {
RabbitTemplate template = new RabbitTemplate(vestFactory);
template.setExchange(loanVestExchange);
return template;
}
//@Bean(name = "loanVestMsgQueue")
//public Queue loanStateMsgQueue() {
// return new Queue(stateQueueName);
//}
//@Bean(name = "loanVestListenerContainer")
//public SimpleMessageListenerContainer loanVestSimpleMessageListenerContainer(@Qualifier("loanVestMQService") IVestService vestService, @Qualifier("vestFactory") ConnectionFactory vestFactory, @Qualifier("loanVestMsgQueue")Queue loanVestMsgQueue) {
// SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
// container.setConnectionFactory(vestFactory);
// container.setQueues(loanVestMsgQueue);
// container.setAcknowledgeMode(AcknowledgeMode.NONE);
// container.setMessageListener(new MessageListenerAdapter(vestService));
// container.start();
// return container;
//
//}
}
......@@ -21,6 +21,7 @@ import cn.quantgroup.xyqb.service.user.IUserBtRegisterService;
import cn.quantgroup.xyqb.service.user.IUserDetailService;
import cn.quantgroup.xyqb.service.user.IUserService;
import cn.quantgroup.xyqb.util.IPUtil;
import cn.quantgroup.xyqb.util.MqUtils;
import cn.quantgroup.xyqb.util.PasswordUtil;
import cn.quantgroup.xyqb.util.ValidationUtil;
import org.apache.commons.lang3.StringUtils;
......@@ -73,7 +74,7 @@ public class AppController implements IBaseController {
String phoneNo,
@RequestParam(required = false, defaultValue = "1") Long registerFrom,
@RequestParam(required = false, defaultValue = "1") Long channelId,
String idNo, String name, String key, @RequestParam(required = false, defaultValue = "") String appChannel,@RequestParam(required = false)Long btRegisterChannelId, HttpServletRequest request
String idNo, String name, String key, @RequestParam(required = false, defaultValue = "") String appChannel,@RequestParam(required = false)Long btRegisterChannelId,HttpServletRequest request
) {
if (!ValidationUtil.validatePhoneNo(phoneNo)) {
return JsonResult.buildErrorStateResult(USER_ERROR_OR_PASSWORD_ERROR, null);
......@@ -84,6 +85,7 @@ public class AppController implements IBaseController {
User user = userService.findByPhoneInDb(phoneNo);
if (user == null) {
user = register(registerFrom, phoneNo, idNo, name, channelId,btRegisterChannelId);
}
if (user == null) {
return JsonResult.buildErrorStateResult(USER_ERROR_OR_PASSWORD_ERROR, null);
......@@ -105,6 +107,9 @@ public class AppController implements IBaseController {
bean.setToken(sessionStruct.getSid());
bean.setPhoneNo(phoneNo);
LOGGER.info("第三方用户登录成功, loginFrom:{}, phoneNo:{},appChannel:{}", registerFrom, phoneNo, appChannel);
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,4);
MqUtils.sendLoanVest(statistics);
return new JsonResult(bean);
}
......@@ -157,7 +162,11 @@ public class AppController implements IBaseController {
loginInfo.setLoginContext(context);
LOGGER.info("第三方用户获取信息登录成功, loginFrom:{}, phoneNo:{},appChannel:{}", registerFrom, phoneNo, appChannel);
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,4);
MqUtils.sendLoanVest(statistics);
return JsonResult.buildSuccessResult("", loginInfo);
}
/**
......@@ -240,7 +249,9 @@ public class AppController implements IBaseController {
LOGGER.error("保存 UserDetail 出现异常", e);
}
}
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,4);
MqUtils.sendLoanVest(statistics);
return user;
}
......
......@@ -9,12 +9,14 @@ import cn.quantgroup.xyqb.entity.WechatUserInfo;
import cn.quantgroup.xyqb.exception.UserNotExistException;
import cn.quantgroup.xyqb.model.JsonResult;
import cn.quantgroup.xyqb.model.UserModel;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.merchant.IMerchantService;
import cn.quantgroup.xyqb.service.session.ISessionService;
import cn.quantgroup.xyqb.service.sms.ISmsService;
import cn.quantgroup.xyqb.service.user.IUserDetailService;
import cn.quantgroup.xyqb.service.user.IUserService;
import cn.quantgroup.xyqb.service.wechat.IWechatService;
import cn.quantgroup.xyqb.util.MqUtils;
import cn.quantgroup.xyqb.util.PasswordUtil;
import cn.quantgroup.xyqb.util.ValidationUtil;
import org.apache.commons.codec.binary.Base64;
......@@ -74,13 +76,14 @@ public class UserController implements IBaseController {
public JsonResult login(
@RequestParam(required = false, defaultValue = "1") Long channelId, String appChannel,
@RequestParam(required = false, defaultValue = "1") Long createdFrom,
@RequestParam(required = false, defaultValue = "") String userId, String key, HttpServletRequest request, String openId) {
@RequestParam(required = false, defaultValue = "") String userId, String key, HttpServletRequest request, String openId,@RequestParam(required = false) String dimension) {
Merchant merchant = merchantService.findMerchantByName(key);
if (merchant == null) {
return JsonResult.buildErrorStateResult("未知的连接", null);
}
if (!StringUtils.isEmpty(userId) && userId.length() > 10) {
return loginWithUserId(channelId, appChannel, createdFrom, userId, merchant);
return loginWithUserId(channelId, appChannel, createdFrom, userId, merchant,dimension);
} else {
return loginWithHttpBasic(channelId, appChannel, createdFrom, merchant, request, openId);
}
......@@ -94,7 +97,7 @@ public class UserController implements IBaseController {
@RequestMapping("/login/fast")
public JsonResult loginFast(
@RequestParam(required = false, defaultValue = "1") Long channelId, String appChannel,
@RequestParam(required = false, defaultValue = "1") Long createdFrom, String key,@RequestParam(required = false)Long btRegisterChannelId ,HttpServletRequest request) {
@RequestParam(required = false, defaultValue = "1") Long createdFrom, String key,@RequestParam(required = false)Long btRegisterChannelId,@RequestParam(required = false) String dimension ,HttpServletRequest request) {
Map<String, JsonResult> validMap = getHeaderParam(request);
if (null != validMap.get("fail")) {
return validMap.get("fail");
......@@ -115,6 +118,9 @@ public class UserController implements IBaseController {
throw new UserNotExistException("用户未找到");
}
}
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,3);
MqUtils.sendLoanVest(statistics);
return new JsonResult(sessionService.createSession(channelId, createdFrom, appChannel, user, merchant));
// return createSession(channelId, createdFrom, appChannel, user);
}
......@@ -216,7 +222,8 @@ public class UserController implements IBaseController {
LOGGER.info("用户快速注册失败,短信验证码错误, registerFrom:{}, phoneNo:{}, verificationCode:{}", registerFrom, phoneNo, verificationCode);
return JsonResult.buildErrorStateResult("短信验证码错误", null);
}
if (!userService.register(phoneNo, password, registerFrom, getIp(), channelId,btRegisterChannelId)) {
User user=userService.register(phoneNo, password, registerFrom, getIp(), channelId,btRegisterChannelId);
if (null!=user) {
LOGGER.info("用户快速注册失败,请稍后重试, registerFrom:{}, phoneNo:{}", registerFrom, phoneNo);
return JsonResult.buildErrorStateResult("注册失败,请稍后重试", null);
}
......@@ -265,11 +272,14 @@ public class UserController implements IBaseController {
LOGGER.info("用户注册失败,短信验证码错误, registerFrom:{}, phoneNo:{}, verificationCode:{}", registerFrom, phoneNo, verificationCode);
return JsonResult.buildErrorStateResult("短信验证码错误", null);
}
if (!userService.register(phoneNo, password, registerFrom, getIp(), channelId,btRegisterChannelId)) {
User user=userService.register(phoneNo, password, registerFrom, getIp(), channelId,btRegisterChannelId);
if (null!=user) {
LOGGER.info("用户快速注册失败,请稍后重试, registerFrom:{}, phoneNo:{}", registerFrom, phoneNo);
return JsonResult.buildErrorStateResult("注册失败,请稍后重试", null);
}
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,3);
MqUtils.sendLoanVest(statistics);
LOGGER.info("用户注册成功, registerFrom:{}, phoneNo:{}", registerFrom, phoneNo);
return JsonResult.buildSuccessResult(null, null);
......@@ -440,11 +450,14 @@ public class UserController implements IBaseController {
return StringUtils.defaultString(targetPassword, "").equals(PasswordUtil.MD5(paramPass.toLowerCase() + pwdSalt));
}
private JsonResult loginWithUserId(Long channelId, String appChannel, Long createdFrom, String userId, Merchant merchant) {
private JsonResult loginWithUserId(Long channelId, String appChannel, Long createdFrom, String userId, Merchant merchant,String dimension) {
//查询用户,存在则保存用户session信息,userId为uuid
User user = userService.findByUuidInDb(userId);
//用户信息存在,更新session中的最后访问时间,重新写入缓存.
if (null != user || !user.getEnable()) {
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,dimension,1);
MqUtils.sendLoanVest(statistics);
return new JsonResult(sessionService.createSession(channelId, createdFrom, appChannel, user, merchant));
} else {
return JsonResult.buildErrorStateResult("登录失败", null);
......
package cn.quantgroup.xyqb.model;
import cn.quantgroup.xyqb.entity.User;
import lombok.Data;
/**
* Created by xuran on 2017/6/23.
* 用户统计使用
*/
@Data
public class UserStatistics {
private String uuid;
private Long registerFrom;
private String dimension;
private int action;//登录0,注册1,快捷登录2,免密登录3
public UserStatistics() {
}
public UserStatistics(User user, String dimension, int action) {
this.uuid = user.getUuid();
this.registerFrom = user.getRegisteredFrom();
this.action = action;
this.dimension = dimension;
}
}
package cn.quantgroup.xyqb.service.mq;
import cn.quantgroup.xyqb.model.UserStatistics;
/**
* Created by xuran on 2017/6/21.
* 马甲包
*/
public interface IVestService {
void send(UserStatistics message);
}
package cn.quantgroup.xyqb.service.mq.Impl;
import cn.quantgroup.xyqb.Constants;;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.mq.IVestService;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* Created by xuran on 2017/6/21.
* 用户统计信息
*/
@Service("loanVestMQService")
public class LoanVestMQServiceImpl implements IVestService {
private static final Logger LOGGER = LoggerFactory.getLogger(LoanVestMQServiceImpl.class);
@Autowired
@Qualifier(value = "loanVestRabbitTemplate")
RabbitTemplate loanVestRabbitTemplate;
@Autowired
@Qualifier(value = "loanVestQueue")
Queue loanVestQueue;
/**
* 发送用登陆统计信息
* @param message 订单信息
*/
@Async
public void send(UserStatistics message){
if(null==message){
LOGGER.error("用登陆统计消息不能为空");
}
LOGGER.info("用登陆统计发送,message={}",message);
String msg = JSONObject.toJSONString(message);
loanVestRabbitTemplate.convertAndSend(loanVestQueue.getName(), msg);
LOGGER.info("用登陆统计成功,message={}",msg);
}
}
......@@ -32,7 +32,7 @@ public interface IUserService {
User findById(Long userId);
boolean register(String phoneNo, String password, Long registerFrom, String ip, Long channelId, Long btRegisterChannelId);
User register(String phoneNo, String password, Long registerFrom, String ip, Long channelId, Long btRegisterChannelId);
List<User> findByPhones(List<String> phones);
User registerAndReturn(String phoneNo, String password, Long registerFrom, Long btRegisterChannelId);
......
......@@ -150,7 +150,7 @@ public class UserServiceImpl implements IUserService {
@Override
public boolean register(String phoneNo, String password, Long registerFrom, String ip, Long channelId, Long btRegisterChannelId) {
public User register(String phoneNo, String password, Long registerFrom, String ip, Long channelId, Long btRegisterChannelId) {
String uuid = lkbUserService.registerApp(phoneNo, password);
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
......@@ -178,7 +178,7 @@ public class UserServiceImpl implements IUserService {
log.info("白条注册渠道信息保存完成");
}
smsService.sendAfterRegister(phoneNo);
return user != null;
return user;
}
......
package cn.quantgroup.xyqb.util;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.mq.IVestService;
import lombok.extern.slf4j.Slf4j;
/**
* Created by xuran on 2017/6/23.
*/
@Slf4j
public class MqUtils {
/**
* 发送用户统计信息
* @param vest
*/
public static void sendLoanVest(UserStatistics vest){
try {
IVestService mqService = ApplicationContextHolder.getBean("loanVestMQService");
mqService.send(vest);
} catch (Exception e) {
log.error("[MQUtils][MQUtils_exception]发送马甲信息,vest={},error={}",
vest, e);
}
}
}
......@@ -96,3 +96,12 @@ xyqb.paycenter.url=http://payapi.xyqb.com/
xyqb.paycenter.id=3
#内部运营系统
xyqb.yunying.url=http://192.168.4.50:7047
#马甲包
loanvest.rabbitmq.connection.virtual-host=/floan_order
#loanvest.rabbitmq.stateMsgQueue=loan_vest_msg_queue
loanvest.rabbitmq.queue=loan_user_queue
loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=192.168.4.153
loanvest.rabbitmq.connection.port=5672
loanvest.rabbitmq.connection.user=qa
loanvest.rabbitmq.connection.password=qatest
\ No newline at end of file
......@@ -60,3 +60,13 @@ xyqb.paycenter.url=http://payapi.xyqb.com/
xyqb.paycenter.id=3
#内部运营系统
xyqb.yunying.url=https://opapi.xyqb.com
#马甲包
loanvest.rabbitmq.connection.virtual-host=/floan_order
loanvest.rabbitmq.queue=loan_user_queue
#loanvest.rabbitmq.stateMsgQueue=loan_vest_msg_queue
loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=172.16.1.242
loanvest.rabbitmq.connection.port=56720
loanvest.rabbitmq.connection.user=rabbit_admin
loanvest.rabbitmq.connection.password=abc1234
\ No newline at end of file
......@@ -55,3 +55,13 @@ xyqb.user.domain=passport.xyqb.com
xyqb.user.query.url=http://userqry.xyqb.com
xyqb.domain = http://192.168.4.153:7003
#马甲包
loanvest.rabbitmq.connection.virtual-host=/floan_order
#loanvest.rabbitmq.stateMsgQueue=loan_vest_msg_queue
loanvest.rabbitmq.queue=loan_user_queue
loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=192.168.4.153
loanvest.rabbitmq.connection.port=5672
loanvest.rabbitmq.connection.user=qa
loanvest.rabbitmq.connection.password=qatest
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment