修改 增加用户注册广播

parent 2c64b6b5
package cn.quantgroup.xyqb.config.mq;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by xuran on 2017/9/7.
*/
@Configuration
public class IRegisterMqConfig {
@Value("${register.rabbitmq.queue}")
private String queueName;
@Value("${register.rabbitmq.exchange}")
private String loanVestExchange;
@Value("${register.rabbitmq.connection.host}")
private String host;
@Value("${register.rabbitmq.connection.port}")
private Integer port;
@Value("${register.rabbitmq.connection.user}")
private String user;
@Value("${register.rabbitmq.connection.password}")
private String password;
@Value("${register.rabbitmq.connection.virtual-host}")
private String virtualHost;
//@Value("${loanvest.rabbitmq.stateMsgQueue}")
//private String stateQueueName;
@Bean(name = "registerMqFactory")
public ConnectionFactory vestFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setChannelCacheSize(1024);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setChannelCacheSize(180 * 1000);
connectionFactory.setConnectionCacheSize(1024);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherReturns(false);
connectionFactory.setPublisherConfirms(false);
return connectionFactory;
}
@Bean(name= "registerMqAmqpAdmin")
public AmqpAdmin loanVestAdmin(@Qualifier("registerMqFactory") ConnectionFactory vestFactory) {
return new RabbitAdmin(vestFactory);
}
@Bean(name = "registerMqExchange")
public FanoutExchange loanVestExchange() {
return new FanoutExchange(loanVestExchange);
}
@Bean(name = "registerMqQueue")
public Queue loanVestQueue() {
return new Queue(queueName);
}
@Bean(name = "registerMqBinding")
public Binding bindingLoanVest(@Qualifier("registerMqAmqpAdmin")AmqpAdmin loanVestAdmin, @Qualifier("registerMqQueue")Queue loanVestQueue, @Qualifier("registerMqExchange")FanoutExchange loanVestExchange) {
Binding binding = BindingBuilder.bind(loanVestQueue).to(loanVestExchange);
loanVestAdmin.declareBinding(binding);
return binding;
}
@Bean(name = "registerRabbitTemplate")
public RabbitTemplate loanVestTemplate(@Qualifier("registerMqFactory") ConnectionFactory vestFactory) {
RabbitTemplate template = new RabbitTemplate(vestFactory);
template.setExchange(loanVestExchange);
return template;
}
}
......@@ -288,6 +288,9 @@ public class AppController implements IBaseController {
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,null,2,channelId);
MqUtils.sendLoanVest(statistics);
//增加用户注册广播
UserRegisterMqMessage registerMqMessage=new UserRegisterMqMessage(user);
MqUtils.sendRegisterMessage(registerMqMessage);
return user;
}
......
......@@ -23,6 +23,7 @@ import cn.quantgroup.xyqb.model.UserAssociation;
import cn.quantgroup.xyqb.model.UserDetailRet;
import cn.quantgroup.xyqb.model.UserExtInfoRet;
import cn.quantgroup.xyqb.model.UserInfo;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserRet;
import cn.quantgroup.xyqb.model.UserSpouseRet;
import cn.quantgroup.xyqb.model.UserStatistics;
......@@ -248,6 +249,9 @@ public class InnerController implements IBaseController {
UserStatistics statistics=new UserStatistics(user,null,2,registeredFrom);
MqUtils.sendLoanVest(statistics);
userRet = new UserRet(user);
//增加用户注册广播
UserRegisterMqMessage registerMqMessage=new UserRegisterMqMessage(user);
MqUtils.sendRegisterMessage(registerMqMessage);
}
return JsonResult.buildSuccessResult(null, userRet);
}
......
......@@ -10,6 +10,7 @@ import cn.quantgroup.xyqb.entity.WechatUserInfo;
import cn.quantgroup.xyqb.exception.UserNotExistException;
import cn.quantgroup.xyqb.model.JsonResult;
import cn.quantgroup.xyqb.model.UserModel;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.merchant.IMerchantService;
import cn.quantgroup.xyqb.service.session.ISessionService;
......@@ -158,6 +159,9 @@ public class UserController implements IBaseController {
LOGGER.info("用户快速注册成功, registerFrom:{}, phoneNo:{}", registerFrom, phoneNo);
UserStatistics statistics=new UserStatistics(user,dimension,2,channelId);
MqUtils.sendLoanVest(statistics);
//增加用户注册广播
UserRegisterMqMessage registerMqMessage=new UserRegisterMqMessage(user);
MqUtils.sendRegisterMessage(registerMqMessage);
return user;
}
......
package cn.quantgroup.xyqb.model;
import cn.quantgroup.xyqb.entity.User;
import java.io.Serializable;
import lombok.Data;
import org.apache.commons.lang.StringUtils;
/**
* Created by xuran on 2017/9/7.
*/
@Data
public class UserRegisterMqMessage implements Serializable {
private static final long serialVersionUID = -1L;
private Long id;
//手机号
private String phoneNo;
//第一次用户来源 channel_id
private Long registeredFrom;
//uuid
private String uuid;
private Boolean enable;
//创建时间
private Long createdAt;
//上一次修改时间
private Long updatedAt;
public UserRegisterMqMessage(User user) {
Long createTimeStamp = user.getCreatedAt().getTime();
Long updateTimeStamp = user.getUpdatedAt().getTime();
this.setId(user.getId());
this.setPhoneNo(user.getPhoneNo());
this.setEnable(user.getEnable());
this.setRegisteredFrom(user.getRegisteredFrom());
this.setUuid(user.getUuid());
this.setCreatedAt(createTimeStamp);
this.setUpdatedAt(updateTimeStamp);
}
}
......@@ -6,6 +6,7 @@ import cn.quantgroup.xyqb.entity.UserDetail;
import cn.quantgroup.xyqb.entity.UserJr58;
import cn.quantgroup.xyqb.model.IdCardInfo;
import cn.quantgroup.xyqb.model.IdType;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.model.jr58.Jr58Authorization;
import cn.quantgroup.xyqb.model.jr58.Jr58RegisterParam;
......@@ -112,6 +113,9 @@ public class Jr58ServiceImpl implements Jr58Service {
UserStatistics statistics=new UserStatistics(user,null,2,user.getRegisteredFrom());
MqUtils.sendLoanVest(statistics);
LOGGER.info("用户注册成功, registerFrom:{}, phoneNo:{}", Constants.Channel.JR58, param.getPhone());
//增加用户注册广播
UserRegisterMqMessage registerMqMessage=new UserRegisterMqMessage(user);
MqUtils.sendRegisterMessage(registerMqMessage);
return uuid;
}
......
package cn.quantgroup.xyqb.service.mq;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
/**
* Created by xuran on 2017/9/7.
* 用户注册信息广播
*/
public interface IRegisterMqService {
void send(UserRegisterMqMessage message);
}
package cn.quantgroup.xyqb.service.mq.Impl;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.mq.IRegisterMqService;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* Created by xuran on 2017/9/7.
*/
@Service("registerMqService")
public class IRegisterMqServiceImpl implements IRegisterMqService {
private static final Logger LOGGER = LoggerFactory.getLogger(IRegisterMqServiceImpl.class);
@Autowired
@Qualifier(value = "registerRabbitTemplate")
RabbitTemplate registerRabbitTemplate;
@Autowired
@Qualifier(value = "registerMqQueue")
Queue loanVestQueue;
/**
* 发送用登陆统计信息
* @param message 订单信息
*/
@Async
public void send(UserRegisterMqMessage message){
if(null==message){
LOGGER.error("广播用户注册消息不能为空");
}
LOGGER.info("广播用户注册消息,message={}",message);
String msg = JSONObject.toJSONString(message);
registerRabbitTemplate.convertAndSend(msg);
LOGGER.info("广播用户注册消息,message={}",msg);
}
}
......@@ -6,6 +6,7 @@ import cn.quantgroup.xyqb.entity.*;
import cn.quantgroup.xyqb.model.IdCardInfo;
import cn.quantgroup.xyqb.model.IdType;
import cn.quantgroup.xyqb.model.JsonResult;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.repository.IAddressRepository;
import cn.quantgroup.xyqb.repository.IContactRepository;
......@@ -194,6 +195,9 @@ public class UserServiceImpl implements IUserService {
//增加登陆统计发送
UserStatistics statistics=new UserStatistics(user,dimension,2,channelId);
MqUtils.sendLoanVest(statistics);
//增加用户注册广播
UserRegisterMqMessage registerMqMessage=new UserRegisterMqMessage(user);
MqUtils.sendRegisterMessage(registerMqMessage);
}
return user != null;
}
......
package cn.quantgroup.xyqb.util;
import cn.quantgroup.xyqb.model.UserRegisterMqMessage;
import cn.quantgroup.xyqb.model.UserStatistics;
import cn.quantgroup.xyqb.service.mq.IRegisterMqService;
import cn.quantgroup.xyqb.service.mq.IVestService;
import lombok.extern.slf4j.Slf4j;
......@@ -22,4 +24,17 @@ public class MqUtils {
vest, e);
}
}
/**
* 用户注册消息进行广播
* @param message
*/
public static void sendRegisterMessage(UserRegisterMqMessage message){
try {
IRegisterMqService mqService = ApplicationContextHolder.getBean("registerMqService");
mqService.send(message);
} catch (Exception e) {
log.error("[MQUtils][MQUtils_exception]发送用户注册广播信息,message={},error={}",
message, e);
}
}
}
......@@ -104,4 +104,13 @@ loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=192.168.4.46
loanvest.rabbitmq.connection.port=5672
loanvest.rabbitmq.connection.user=qa
loanvest.rabbitmq.connection.password=qatest
\ No newline at end of file
loanvest.rabbitmq.connection.password=qatest
#用户注册广播
register.rabbitmq.connection.virtual-host=/user_register
register.rabbitmq.queue=user_register_queue
register.rabbitmq.exchange=user_register_exchange
register.rabbitmq.connection.host=192.168.4.46
register.rabbitmq.connection.port=5672
register.rabbitmq.connection.user=qa
register.rabbitmq.connection.password=qatest
\ No newline at end of file
......@@ -69,4 +69,14 @@ loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=172.16.1.242
loanvest.rabbitmq.connection.port=56720
loanvest.rabbitmq.connection.user=rabbit_admin
loanvest.rabbitmq.connection.password=abc1234
\ No newline at end of file
loanvest.rabbitmq.connection.password=abc1234
#用户注册广播
register.rabbitmq.connection.virtual-host=/user_register
register.rabbitmq.queue=user_register_queue
register.rabbitmq.exchange=user_register_exchange
register.rabbitmq.connection.host=172.16.1.242
register.rabbitmq.connection.port=56720
register.rabbitmq.connection.user=rabbit_admin
register.rabbitmq.connection.password=abc1234
\ No newline at end of file
......@@ -64,4 +64,12 @@ loanvest.rabbitmq.exchange=loan_vest_exchange
loanvest.rabbitmq.connection.host=192.168.4.46
loanvest.rabbitmq.connection.port=5672
loanvest.rabbitmq.connection.user=qa
loanvest.rabbitmq.connection.password=qatest
\ No newline at end of file
loanvest.rabbitmq.connection.password=qatest
#用户注册广播
register.rabbitmq.connection.virtual-host=/user_register
register.rabbitmq.queue=user_register_queue
register.rabbitmq.exchange=user_register_exchange
register.rabbitmq.connection.host=192.168.4.153
register.rabbitmq.connection.port=5672
register.rabbitmq.connection.user=qa
register.rabbitmq.connection.password=qatest
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment