Commit 8d65ad12 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

支持发送三大运营商数据至kafka(清洗后数据)

parent 4005affd
......@@ -6,7 +6,7 @@
<groupId>cn.quantgroup</groupId>
<artifactId>qg-data-service</artifactId>
<!--<version>1.0.0-qa-SNAPSHOT</version>-->
<version>1.4.5-SNAPSHOT</version>
<version>1.4.6-SNAPSHOT</version>
<packaging>jar</packaging>
<name>qg-data-service</name>
......
......@@ -108,7 +108,8 @@ public class Constant {
}
public static class KAFKA{
public static final String OPERATOR_CALL_TOPIC = "rc.comservice.call";
// public static final String OPERATOR_CALL_TOPIC = "rc.comservice.call";
public static final String OPERATOR_CALL_TOPIC = "rc.comservice.calls";
public static final String BOOT_STRAPSERVER = "kafka.bootStrapServer";
public static final String APPLICATION = "kafka_application";
}
......
......@@ -8,6 +8,9 @@ import cn.quantgroup.qgdataservice.service.kafka.OperatorCallDetailListSendToKaf
import cn.quantgroup.qgdataservice.utils.*;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Stopwatch;
import com.lkb.data.hbase.dataservice.operators.mobile.MobileCallInfoDataService;
import com.lkb.data.hbase.dataservice.operators.telecom.TelecomCallInfoDataService;
import com.lkb.data.hbase.dataservice.operators.unicom.UnicomCallInfoDataService;
import com.lkb.data.hbase.row.SpiderUserItemDataRow;
import com.lkb.data.hbase.row.operators.PhoneBillDataRow;
import com.lkb.data.hbase.row.operators.PhonePaymentDataRow;
......@@ -369,7 +372,7 @@ public class OperatorInfoCleaningService {
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.TELECOM_DETAIL_INFOS);
OperatorCallDetailListSendToKafkaService.sendTelecomCallDetailListMessage(telecomCallDetailInfos);
log.info("电信通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, telecomCallInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
......@@ -565,7 +568,7 @@ public class OperatorInfoCleaningService {
}
int size = JdbcExecuters.batchExecute(sqls, Constant.SQL.UNICOM_DETAIL_INFOS);
OperatorCallDetailListSendToKafkaService.sendUnicomCallDetailListMessage(unicomCallDetailInfos);
log.info("联通通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} ", uuid, unicomCallInfoRows.size(), sqls.size(), size, ka, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
......@@ -863,5 +866,4 @@ public class OperatorInfoCleaningService {
return unicomCallDetailInfo;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment