Commit a599264b authored by 杨锐's avatar 杨锐

add mq-plus-spring-boot-starter.

parent 8d2beb6b
package cn.quantgroup.tech.mq.aop;
import cn.quantgroup.tech.mq.common.Constants;
import cn.quantgroup.tech.mq.entity.KoalaRabbitmqMessage;
import cn.quantgroup.tech.mq.mapper.KoalaRabbitmqMessageMapperV1;
import com.rabbitmq.client.Channel;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Aspect
@Component
public class RabbitListenerAspect {
@Resource
private RabbitTemplate moNotifyRabbitTemplate;
@Resource
private KoalaRabbitmqMessageMapperV1 koalaRabbitmqMessageMapperV1;
/**
* arg[0] message
* arg[1] channel
* arg[2] deliveryTag
* arg[3] correlationId -> unique_id
* arg[4] scenario -> 场景
*
* @param joinPoint
* @return
*/
@Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public void aroundAdvice(ProceedingJoinPoint joinPoint) throws IOException {
Object rtValue = null;
Object obj = joinPoint.getArgs()[0];
Channel channel = (Channel) joinPoint.getArgs()[1];
Long deliveryTag = (Long) joinPoint.getArgs()[2];
String uniqueId = (String) joinPoint.getArgs()[3];
String scenario = (String) joinPoint.getArgs()[4];
koalaRabbitmqMessageMapperV1.addConsumeCountByUniqueId(uniqueId);
try {
rtValue = joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
if (scenario.equals("GRADIENT_NOTICE")) {
if ("success".equals(rtValue)) {
channel.basicAck(deliveryTag, false);
} else {
KoalaRabbitmqMessage koalaRabbitmqMessage = koalaRabbitmqMessageMapperV1.selectByUniqueId(uniqueId);
CorrelationData correlationData = new CorrelationData(uniqueId);
moNotifyRabbitTemplate.convertAndSend(Constants.MO_TTL_EXCHANGE, koalaRabbitmqMessage.getRoutingKey(), obj, correlationData);
channel.basicNack(deliveryTag, false, false);
}
} else {
channel.basicAck(deliveryTag, false);
}
}
}
package cn.quantgroup.tech.mq.common;
public interface Constants {
/**
* 消息状态
*/
Integer MSG_STATUS_SUCCESS = 1;
Integer MSG_STATUS_FAILURE = 2;
/**
* 消息最大消费次数
*/
Integer DEFAULT_CONSUME_COUNT = 5;
/**
* 字符串
*/
String CROSS = "-";
String DEFAULT = "";
/**
* routing message to ttl queue
*/
String MO_TTL_EXCHANGE = "mo_TTL_exchange";
/**
* routing key
*/
String MO_TTL_ROUTING_KEY = "mo.#";
/**
* receive ttl message, no receiver
*/
String MO_TTL_QUEUE = "mo_TTL_queue";
/**
* dlx, routing ttl message
*/
String MO_TTL_DLX_EXCHANGE = "mo_TTL_DLX_exchange";
}
package cn.quantgroup.tech.mq.config;
import cn.quantgroup.tech.mq.common.Constants;
import cn.quantgroup.tech.mq.entity.KoalaRabbitmqMessage;
import cn.quantgroup.tech.mq.mapper.KoalaRabbitmqMessageMapperV1;
import cn.quantgroup.tech.mq.service.MONotify;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.CorrelationAwareMessagePostProcessor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.Correlation;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class MORabbitMQConfiguration {
@Value("${rabbitmq.connection.host}")
private String hostname;
@Value("${rabbitmq.connection.port}")
private Integer port;
@Value("${rabbitmq.connection.user}")
private String userName;
@Value("${rabbitmq.connection.password}")
private String password;
@Value("${rabbitmq.connection.virtual-host}")
private String virtualHost;
/* 15s/15s/30s/3m/10m */
final int[] notifyFrequency = {15, 15, 30, 180, 600};
@Resource
private KoalaRabbitmqMessageMapperV1 koalaRabbitmqMessageMapperV1;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname, port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate moNotifyRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
koalaRabbitmqMessageMapperV1.updateMsgStatusByUniqueId(correlationData.getId(), Constants.MSG_STATUS_SUCCESS);
}
});
rabbitTemplate.setBeforePublishPostProcessors(new CorrelationAwareMessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
return message;
}
@Override
public Message postProcessMessage(Message message, Correlation correlation) {
String uniqueId = ((CorrelationData) correlation).getId();
KoalaRabbitmqMessage koalaRabbitmqMessage = koalaRabbitmqMessageMapperV1.selectByUniqueId(uniqueId);
if (koalaRabbitmqMessage == null) {
throw new RuntimeException("数据异常,消息不存在。");
}
if (koalaRabbitmqMessage.getConsumeCount() >= koalaRabbitmqMessage.getMaxConsumeCount()) {
throw new RuntimeException("消息id = 【" + uniqueId + "】消费已达最大消费次数");
}
message.getMessageProperties().setExpiration(String.valueOf(notifyFrequency[koalaRabbitmqMessage.getConsumeCount()] * 1000));
message.getMessageProperties().getHeaders().put(AmqpHeaders.CORRELATION_ID, uniqueId);
message.getMessageProperties().getHeaders().put("SCENARIO", "GRADIENT_NOTICE");
return message;
}
});
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public MONotify moNotify() {
return new MONotify();
}
}
package cn.quantgroup.tech.mq.config;
import cn.quantgroup.tech.mq.common.Constants;
import cn.quantgroup.tech.mq.enums.MOEnums;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MOTTLConfiguration {
@Bean
public TopicExchange moTTLExchange() {
return new TopicExchange(Constants.MO_TTL_EXCHANGE);
}
@Bean
public TopicExchange moTTLDLXExchange() {
return new TopicExchange(Constants.MO_TTL_DLX_EXCHANGE);
}
@Bean
public Queue moTTLQueue() {
Map<String, Object> agruments = new HashMap<>();
agruments.put("x-dead-letter-exchange", Constants.MO_TTL_DLX_EXCHANGE);
return new Queue(Constants.MO_TTL_QUEUE, true, false, false, agruments);
}
@Bean
public Binding bindTTLQueueToTTLExchange() {
return BindingBuilder.bind(moTTLQueue()).to(moTTLExchange()).with(Constants.MO_TTL_ROUTING_KEY);
}
/*---------- START EXCHANGE START ----------*/
@Bean
public TopicExchange moBlackHoleExchange() {
return new TopicExchange(MOEnums.BLACK_HOLE.getExchange());
}
@Bean
public TopicExchange moTradeExchange() {
return new TopicExchange(MOEnums.TRADE.getExchange());
}
@Bean
public TopicExchange moClothoExchange() {
return new TopicExchange(MOEnums.CLOTHO.getExchange());
}
/*---------- END EXCHANGE END ----------*/
/*---------- START BINDING START ----------*/
@Bean
public Binding bindBlackHoleExchangeToTTLDLXExchange() {
return BindingBuilder.bind(moBlackHoleExchange()).to(moTTLDLXExchange()).with(MOEnums.BLACK_HOLE.getRoutingKey());
}
@Bean
public Binding bindTradeExchangeToTTLDLXExchange() {
return BindingBuilder.bind(moTradeExchange()).to(moTTLDLXExchange()).with(MOEnums.TRADE.getRoutingKey());
}
@Bean
public Binding bindClothoExchangeToTTLDLXExchange() {
return BindingBuilder.bind(moClothoExchange()).to(moTTLDLXExchange()).with(MOEnums.CLOTHO.getRoutingKey());
}
/*---------- END BINDING END ----------*/
}
package cn.quantgroup.tech.mq.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.*;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* <p>
* 考拉消息补偿表
* </p>
*
* @author rui
* @since 2020-04-28
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class KoalaRabbitmqMessage implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* 消息唯一id
*/
private String uniqueId;
/**
* 消息内容
*/
private String msgContent;
/**
* 消息状态 0未发送 1已发送 2失败(发送三次仍未ack则失败)
*/
private Integer msgStatus;
/**
* 交换机名称
*/
private String exchange;
/**
* 路由键
*/
private String routingKey;
/**
* 重试次数
*/
private Integer retryCount;
/**
* 消费次数
*/
private Integer consumeCount;
/**
* 最大消费次数
*/
private Integer maxConsumeCount;
/**
* 业务类型
*/
private Integer middleBusinessType;
/**
* 创建时间
*/
private LocalDateTime createdAt;
/**
* 更新时间
*/
private LocalDateTime updatedAt;
}
package cn.quantgroup.tech.mq.enums;
import lombok.Getter;
@Getter
public enum MOEnums {
BLACK_HOLE(1, "合同中心", "mo_black_hole_exchange", "mo.black_hole.#"),
TRADE(2, "交易系统", "mo_trade_exchange", "mo.trade.#"),
CLOTHO(3, "资金系统", "mo_clotho_exchange", "mo.clotho.#"),
;
int businessType;
String description;
String exchange;
String routingKey;
MOEnums(int businessType, String description, String exchange, String routingKey) {
this.businessType = businessType;
this.description = description;
this.exchange = exchange;
this.routingKey = routingKey;
}
}
package cn.quantgroup.tech.mq.mapper;
import cn.quantgroup.tech.mq.entity.KoalaRabbitmqMessage;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.time.LocalDate;
/**
* <p>
* 考拉消息补偿表 Mapper 接口
* </p>
*
* @author rui
* @since 2020-04-28
*/
public interface KoalaRabbitmqMessageMapperV1 extends BaseMapper<KoalaRabbitmqMessage> {
@Update("update koala_rabbitmq_message set msg_status = #{msgStatus} where unique_id = #{uniqueId}")
void updateMsgStatusByUniqueId(@Param("uniqueId") String uniqueId, @Param("msgStatus") Integer msgStatus);
@Update("update koala_rabbitmq_message set consume_count = consume_count + 1 where unique_id = #{uniqueId}")
void addConsumeCountByUniqueId(String uniqueId);
@Select("select consume_count consumeCount, max_consume_count maxConsumeCount, routing_key routingKey from koala_rabbitmq_message where unique_id = #{unique_id} limit 1")
KoalaRabbitmqMessage selectByUniqueId(String uniqueId);
@Update("update koala_rabbitmq_message set msg_content = #{msgContent} where unique_id = #{uniqueId}")
void updateMsgContentByUniqueId(@Param("uniqueId") String uniqueId, @Param("msgContent") String msgContent);
@Delete("delete from koala_rabbitmq_message where msg_status = 1 and created_at < #{createdAt}")
void deleteMsgStatusSuccessByCreatedAt(@Param("createdAt") LocalDate localDate);
}
package cn.quantgroup.tech.mq.service;
import cn.quantgroup.tech.mq.common.Constants;
import cn.quantgroup.tech.mq.entity.KoalaRabbitmqMessage;
import cn.quantgroup.tech.mq.enums.MOEnums;
import cn.quantgroup.tech.mq.mapper.KoalaRabbitmqMessageMapperV1;
import com.google.gson.Gson;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import javax.annotation.Resource;
import java.util.UUID;
public class MONotify {
@Resource
private RabbitTemplate moNotifyRabbitTemplate;
@Resource
private KoalaRabbitmqMessageMapperV1 koalaRabbitmqMessageMapperV1;
public void send(MOEnums moEnums, String routingKey, final Object object) {
String uniqueId = UUID.randomUUID().toString().replaceAll(Constants.CROSS, Constants.DEFAULT);
Gson gson = new Gson();
koalaRabbitmqMessageMapperV1.insert(KoalaRabbitmqMessage.builder()
.uniqueId(uniqueId)
.msgContent(gson.toJson(object))
.exchange(Constants.MO_TTL_EXCHANGE)
.routingKey(routingKey)
.maxConsumeCount(Constants.DEFAULT_CONSUME_COUNT)
.middleBusinessType(moEnums.getBusinessType())
.build());
CorrelationData correlationData = new CorrelationData(uniqueId);
moNotifyRabbitTemplate.convertAndSend(Constants.MO_TTL_EXCHANGE, routingKey, object, correlationData);
}
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.quantgroup.tech.mq.config.MORabbitMQConfiguration,\
cn.quantgroup.tech.mq.config.MOTTLConfiguration,\
cn.quantgroup.tech.mq.aop.RabbitListenerAspect
\ No newline at end of file
......@@ -22,6 +22,7 @@
<module>idgenerator-spring-boot-starter</module>
<module>enoch-agent-spring-boot-starter</module>
<module>elastic-job-lite-spring-boot-starter</module>
<module>mq-plus-spring-boot-starter</module>
</modules>
<packaging>pom</packaging>
......@@ -101,6 +102,11 @@
<artifactId>elastic-job-lite-spring-boot-starter</artifactId>
<version>${common.parent.version}</version>
</dependency>
<dependency>
<groupId>cn.quantgroup</groupId>
<artifactId>mq-plus-spring-boot-starter</artifactId>
<version>${common.parent.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment