Commit 3ed51ff1 authored by data-爬虫-任锋's avatar data-爬虫-任锋

添加rabbitmq

parent 115bc0dd
...@@ -80,35 +80,12 @@ ...@@ -80,35 +80,12 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.16.16</version> <version>1.16.16</version>
</dependency> </dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>${drools-version}</version>
</dependency>
<!-- 添加权限 --> <!-- 添加权限 -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId> <artifactId>spring-boot-starter-security</artifactId>
</dependency> </dependency>
<!--swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.0.3</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.0.3</version>
</dependency>
<!--swagger -->
<!--spring--> <!--spring-->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
...@@ -135,6 +112,10 @@ ...@@ -135,6 +112,10 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency> <dependency>
<groupId>redis.clients</groupId> <groupId>redis.clients</groupId>
...@@ -299,10 +280,10 @@ ...@@ -299,10 +280,10 @@
<artifactId>shutdown-spring-boot-starter</artifactId> <artifactId>shutdown-spring-boot-starter</artifactId>
</dependency> </dependency>
<!--调用链追踪. 开启吧, 以后的监控,数据统计. 都从这里出数--> <!--调用链追踪. 开启吧, 以后的监控,数据统计. 都从这里出数-->
<!-- <dependency> <dependency>
<groupId>cn.quantgroup</groupId> <groupId>cn.quantgroup</groupId>
<artifactId>brave-spring-boot-starter</artifactId> <artifactId>brave-spring-boot-starter</artifactId>
</dependency> --> </dependency>
<!--全局唯一的IDGenerator, 试运行阶段. 百度开发者+刘志国结合量化派场景, 倾情定制.--> <!--全局唯一的IDGenerator, 试运行阶段. 百度开发者+刘志国结合量化派场景, 倾情定制.-->
<dependency> <dependency>
<groupId>cn.quantgroup</groupId> <groupId>cn.quantgroup</groupId>
......
package com.quantgroup.asset.distribution.config.rabbitmq;
import cn.quantgroup.tech.brave.service.ITechRabbitBuilder;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import javax.annotation.Resource;
import java.io.IOException;
@Slf4j
@Configuration
@EnableRabbit
public class ConsumerConfig implements RabbitListenerConfigurer {
@Resource
private ITechRabbitBuilder techRabbitBuilder;
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("assetDistributionConnectionFactory")CachingConnectionFactory assetDistributionConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = techRabbitBuilder.createSimpleRabbitListenerContainerFactory(assetDistributionConnectionFactory);
factory.setPrefetchCount(100);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "funding.state"), exchange = @Exchange(value = "xyqb.change.fanout"))})
public void process(Message message, Channel channel) throws IOException {
try{
log.info("======接收到消息====== : {}",new String(message.getBody(),"utf-8"));
// 采用手动应答模式, 手动确认应答更为安全稳定
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch(IOException e){
//重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
package com.quantgroup.asset.distribution.config.rabbitmq;
import cn.quantgroup.tech.brave.service.ITechRabbitBuilder;
import com.google.common.collect.ImmutableMap;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.annotation.Resource;
/**
* Created by Miraculous on 16/5/18.
*/
@Configuration
@EnableRabbit
public class RabbitMqConfig {
@Value("${rabbitmq.connection.host}")
private String host;
@Value("${rabbitmq.connection.port}")
private Integer port;
@Value("${rabbitmq.connection.user}")
private String user;
@Value("${rabbitmq.connection.password}")
private String password;
@Value("${rabbitmq.connection.virtual-host}")
private String virtualHost;
@Value("${rabbitmq.connection.funding.exchange}")
private String fundingStateChangeExchange;
@Value("${rabbitmq.connection.funding.statechange.queue}")
private String fundingStateChangeQueue;
@Resource
private ITechRabbitBuilder techRabbitBuilder;
@Bean(name = "assetDistributionConnectionFactory")
@Primary
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setChannelCacheSize(1024);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
connectionFactory.setChannelCacheSize(180 * 1000);
connectionFactory.setConnectionCacheSize(1024);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherReturns(false);
connectionFactory.setPublisherConfirms(false);
return connectionFactory;
}
@Bean(name="assetDistributionRabbitAdmin")
@Primary
public RabbitAdmin rabbitAdmin(@Qualifier("assetDistributionConnectionFactory")CachingConnectionFactory assetDistributionConnectionFactory) {
return new RabbitAdmin(assetDistributionConnectionFactory);
}
/**
* 声明一个交换机
* @param assetDistributionRabbitAdmin
* @return
*/
@Bean(name = "fundingStateExchange")
@Primary
public FanoutExchange fundingStateExchange(@Qualifier("assetDistributionRabbitAdmin")RabbitAdmin assetDistributionRabbitAdmin) {
FanoutExchange fanoutExchange = new FanoutExchange(fundingStateChangeExchange);
assetDistributionRabbitAdmin.declareExchange(fanoutExchange);
return fanoutExchange;
}
/**
* 声明一个队列
* @param assetDistributionRabbitAdmin
* @return
*/
@Bean(name = "fundingStateChangeQueue")
@Primary
public Queue loanOrderQueue(@Qualifier("assetDistributionRabbitAdmin")RabbitAdmin assetDistributionRabbitAdmin){
//消息过期时间
ImmutableMap<String,Object> map = ImmutableMap.of("x-message-ttl", 300000);//5分钟
Queue queue = new Queue(fundingStateChangeQueue, true,false,false,map);
assetDistributionRabbitAdmin.declareQueue(queue);
return queue;
}
/**
* 队列绑定到交换机上
* @param assetDistributionRabbitAdmin
* @param fundingStateChangeQueue
* @param fundingStateExchange
* @return
*/
@Bean(name = "fundingStateQueueBinding")
@Primary
public Binding bindingLoanOrderQueue(@Qualifier("assetDistributionRabbitAdmin")AmqpAdmin assetDistributionRabbitAdmin,
@Qualifier("fundingStateChangeQueue")Queue fundingStateChangeQueue,
@Qualifier("fundingStateExchange")FanoutExchange fundingStateExchange) {
Binding binding = BindingBuilder.bind(fundingStateChangeQueue).to(fundingStateExchange);
assetDistributionRabbitAdmin.declareBinding(binding);
return binding;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment