Commit cf1657c8 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

支持发送三大运营商数据至kafka(清洗前数据)

parent 21a1efbc
......@@ -6,10 +6,14 @@
<groupId>cn.quantgroup</groupId>
<artifactId>qg-data-service</artifactId>
<!--<version>1.0.0-qa-SNAPSHOT</version>-->
<version>1.3.0-SNAPSHOT</version>
<version>1.4.2-SNAPSHOT</version>
<packaging>jar</packaging>
<name>qg-data-service</name>
<properties>
</properties>
<dependencies>
<dependency>
......@@ -48,6 +52,12 @@
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0-kafka-2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
......
package cn.quantgroup.qgdataservice.config.kafka;
import cn.quantgroup.qgdataservice.constant.Constant;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.ResourceBundle;
/**
* @Author fengjunkai
* @Date 2019-08-20 19:01
*/
public enum KafkaProducers {
KAFKA_PRODUCER_API;
public KafkaProducer kafkaProducer;
KafkaProducers() {
if (kafkaProducer == null) {
ResourceBundle rs = ResourceBundle.getBundle(Constant.KAFKA.APPLICATION);
Map<String, Object> props = new HashMap<>();
String boot = rs.getString(Constant.KAFKA.BOOT_STRAPSERVER);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boot);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 30000);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 20000);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.ACKS_CONFIG, "all");
kafkaProducer = new KafkaProducer(props);
System.out.println("初始化kafka配置完成" + JSON.toJSONString(props));
}
}
}
......@@ -103,4 +103,10 @@ public class Constant {
public static final int BATCH_SIZE = 1000;
}
public static class KAFKA{
public static final String OPERATOR_CALL_TOPIC = "rc.comservice.call";
public static final String BOOT_STRAPSERVER = "kafka.bootStrapServer";
public static final String APPLICATION = "kafka_application";
}
}
package cn.quantgroup.qgdataservice.service.hbase;
import cn.quantgroup.qgdataservice.service.kafka.OperatorCallDetailListSendToKafkaService;
import cn.quantgroup.qgdataservice.service.tidb.OperatorInfoCleaningService;
import com.lkb.data.hbase.dataservice.SpiderUserItemDataService;
import com.lkb.data.hbase.dataservice.operators.PhoneBillDataService;
......@@ -66,6 +67,8 @@ public class OperatorInfoService {
else
MobileCallInfoDataService.put(mobileCallInfoRows);
OperatorCallDetailListSendToKafkaService.sendMobileCallDetailListMessage(mobileCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveMobileCallDetailInfo(mobileCallInfoRows, uuid, ka);
}
......@@ -117,6 +120,8 @@ public class OperatorInfoService {
else
TelecomCallInfoDataService.put(telecomCallInfoRows);
OperatorCallDetailListSendToKafkaService.sendTelecomCallDetailListMessage(telecomCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveTelecomCallDetailInfo(telecomCallInfoRows, uuid, ka);
}
......@@ -168,6 +173,8 @@ public class OperatorInfoService {
else
UnicomCallInfoDataService.put(unicomCallInfoRows);
OperatorCallDetailListSendToKafkaService.sendUnicomCallDetailListMessage(unicomCallInfoRows);
OperatorInfoCleaningService.cleaningAndSaveUnicomCallDetailInfo(unicomCallInfoRows, uuid, ka);
}
......
package cn.quantgroup.qgdataservice.service.kafka;
import cn.quantgroup.qgdataservice.config.kafka.KafkaProducers;
import cn.quantgroup.qgdataservice.constant.Constant;
import cn.quantgroup.qgdataservice.utils.ThreadPoolExecutor2Utils;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Stopwatch;
import com.lkb.data.hbase.row.operators.mobile.MobileCallInfoRow;
import com.lkb.data.hbase.row.operators.telecom.TelecomCallInfoRow;
import com.lkb.data.hbase.row.operators.unicom.UnicomCallInfoRow;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @Author fengjunkai
* @Date 2019-08-21 19:09
*/
public class OperatorCallDetailListSendToKafkaService {
private static final Logger log = LoggerFactory.getLogger(OperatorCallDetailListSendToKafkaService.class);
public static void sendMobileCallDetailListMessage(List<MobileCallInfoRow> mobileCallInfoRows) {
ThreadPoolExecutor2Utils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
for(int i=0;i<mobileCallInfoRows.size();i++){
ProducerRecord producerRecord = new ProducerRecord(Constant.KAFKA.OPERATOR_CALL_TOPIC, JSON.toJSONString(mobileCallInfoRows.get(i)));
KafkaProducers.KAFKA_PRODUCER_API.kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (Objects.nonNull(exception)) {
log.error("移动通话详单发送至kafka(清洗前)回调提示异常, 异常: {} ", exception);
}
}
});
}
log.info("移动通话详单发送至kafka(清洗前)数据结束, 耗时: {} ", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("移动通话详单发送至kafka(清洗前)数据异常", e);
}
}
});
}
public static void sendTelecomCallDetailListMessage(List<TelecomCallInfoRow> telecomCallInfoRows) {
ThreadPoolExecutor2Utils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
for(int i=0;i<telecomCallInfoRows.size();i++){
ProducerRecord producerRecord = new ProducerRecord(Constant.KAFKA.OPERATOR_CALL_TOPIC, JSON.toJSONString(telecomCallInfoRows.get(i)));
KafkaProducers.KAFKA_PRODUCER_API.kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(Objects.nonNull(exception)){
log.error("电信通话详单发送至kafka(清洗前)回调结果提示异常, {}", exception);
}
}
});
}
log.info("电信通话详单发送至kafka(清洗前)数据结束, 耗时: {} ", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("电信通话详单发送至kafka(清洗前)数据异常", e);
}
}
});
}
public static void sendUnicomCallDetailListMessage(List<UnicomCallInfoRow> unicomCallInfoRows) {
ThreadPoolExecutor2Utils.getThreadPoolUtil().execute(new Runnable() {
@Override
public void run() {
Stopwatch stopwatch = Stopwatch.createStarted();
try {
for(int i=0;i<unicomCallInfoRows.size();i++){
ProducerRecord producerRecord = new ProducerRecord(Constant.KAFKA.OPERATOR_CALL_TOPIC, JSON.toJSONString(unicomCallInfoRows.get(i)));
KafkaProducers.KAFKA_PRODUCER_API.kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(Objects.nonNull(exception)){
log.error("联通通话详单发送至kafka(清洗前)回调结果提示异常, {}", exception);
}
}
});
}
log.info("联通通话详单发送至kafka(清洗前)数据结束, 耗时: {} ", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("联通通话详单发送至kafka(清洗前)数据记异常", e);
}
}
});
}
}
package cn.quantgroup.qgdataservice.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutor2Utils {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolExecutor2Utils.class);
private static ThreadPoolExecutor2Utils threadPool = new ThreadPoolExecutor2Utils();
private ThreadPoolExecutor executor = null;
private ThreadPoolExecutor2Utils() {
executor = new ThreadPoolExecutor(50, 200, 50, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(20));
}
public static ThreadPoolExecutor2Utils getThreadPoolUtil() {
return threadPool;
}
public void execute(Runnable r) {
try {
int activeCount = executor.getActiveCount();
int size = executor.getQueue().size();
log.info("发送kafka线程池活跃数: {} , queueSize: {} ", activeCount, size);
} catch (Exception e) {
log.info("TASK_POOL KA Exception2 : {}", e);
}
executor.execute(r);
}
public void shutDown() {
executor.shutdown();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment