Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Q
qg-rt-dc
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
data-spider
qg-rt-dc
Commits
4005affd
Commit
4005affd
authored
Aug 28, 2019
by
data爬虫-冯 军凯
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
支持发送三大运营商数据至kafka(清洗后数据)
parent
cf1657c8
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
558 additions
and
31 deletions
+558
-31
pom.xml
qg-data-service/pom.xml
+1
-1
Constant.java
...n/java/cn/quantgroup/qgdataservice/constant/Constant.java
+4
-0
MobileCallDetailInfo.java
...roup/qgdataservice/model/mobile/MobileCallDetailInfo.java
+131
-0
TelecomCallDetailInfo.java
...oup/qgdataservice/model/telcom/TelecomCallDetailInfo.java
+156
-0
UnicomCallDetailInfo.java
...roup/qgdataservice/model/unicom/UnicomCallDetailInfo.java
+166
-0
OperatorInfoService.java
...roup/qgdataservice/service/hbase/OperatorInfoService.java
+0
-7
OperatorCallDetailListSendToKafkaService.java
...rvice/kafka/OperatorCallDetailListSendToKafkaService.java
+21
-21
OperatorInfoCleaningService.java
...dataservice/service/tidb/OperatorInfoCleaningService.java
+79
-2
No files found.
qg-data-service/pom.xml
View file @
4005affd
...
@@ -6,7 +6,7 @@
...
@@ -6,7 +6,7 @@
<groupId>
cn.quantgroup
</groupId>
<groupId>
cn.quantgroup
</groupId>
<artifactId>
qg-data-service
</artifactId>
<artifactId>
qg-data-service
</artifactId>
<!--<version>1.0.0-qa-SNAPSHOT</version>-->
<!--<version>1.0.0-qa-SNAPSHOT</version>-->
<version>
1.4.
2
-SNAPSHOT
</version>
<version>
1.4.
5
-SNAPSHOT
</version>
<packaging>
jar
</packaging>
<packaging>
jar
</packaging>
<name>
qg-data-service
</name>
<name>
qg-data-service
</name>
...
...
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/constant/Constant.java
View file @
4005affd
...
@@ -76,6 +76,10 @@ public class Constant {
...
@@ -76,6 +76,10 @@ public class Constant {
public
static
final
String
PHONE_RECHARGE_RECORDS
=
"PHONE_RECHARGE_RECORDS"
;
public
static
final
String
PHONE_RECHARGE_RECORDS
=
"PHONE_RECHARGE_RECORDS"
;
public
static
final
String
MOBILE
=
"MOBILE"
;
public
static
final
String
TELECOM
=
"TELECOM"
;
public
static
final
String
UNICOM
=
"UNICOM"
;
}
}
...
...
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/model/mobile/MobileCallDetailInfo.java
0 → 100644
View file @
4005affd
package
cn
.
quantgroup
.
qgdataservice
.
model
.
mobile
;
import
java.io.Serializable
;
/**
* 清洗后的移动通话详单bean
*
* @Author fengjunkai
* @Date 2019-08-28 10:25
*/
public
class
MobileCallDetailInfo
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
5777037271728788149L
;
private
String
uuid
;
private
String
cTime
;
private
String
tradeAddr
;
private
String
tradeWay
;
private
String
tradeType
;
private
String
receiverPhone
;
private
String
tradeTime
;
private
String
taocan
;
private
String
onlinePay
;
private
String
phone
;
private
String
iscm
;
private
String
timestamp
;
private
String
source
;
public
String
getUuid
()
{
return
uuid
;
}
public
void
setUuid
(
String
uuid
)
{
this
.
uuid
=
uuid
;
}
public
String
getcTime
()
{
return
cTime
;
}
public
void
setcTime
(
String
cTime
)
{
this
.
cTime
=
cTime
;
}
public
String
getTradeAddr
()
{
return
tradeAddr
;
}
public
void
setTradeAddr
(
String
tradeAddr
)
{
this
.
tradeAddr
=
tradeAddr
;
}
public
String
getTradeWay
()
{
return
tradeWay
;
}
public
void
setTradeWay
(
String
tradeWay
)
{
this
.
tradeWay
=
tradeWay
;
}
public
String
getTradeType
()
{
return
tradeType
;
}
public
void
setTradeType
(
String
tradeType
)
{
this
.
tradeType
=
tradeType
;
}
public
String
getReceiverPhone
()
{
return
receiverPhone
;
}
public
void
setReceiverPhone
(
String
receiverPhone
)
{
this
.
receiverPhone
=
receiverPhone
;
}
public
String
getTradeTime
()
{
return
tradeTime
;
}
public
void
setTradeTime
(
String
tradeTime
)
{
this
.
tradeTime
=
tradeTime
;
}
public
String
getTaocan
()
{
return
taocan
;
}
public
void
setTaocan
(
String
taocan
)
{
this
.
taocan
=
taocan
;
}
public
String
getOnlinePay
()
{
return
onlinePay
;
}
public
void
setOnlinePay
(
String
onlinePay
)
{
this
.
onlinePay
=
onlinePay
;
}
public
String
getPhone
()
{
return
phone
;
}
public
void
setPhone
(
String
phone
)
{
this
.
phone
=
phone
;
}
public
String
getIscm
()
{
return
iscm
;
}
public
void
setIscm
(
String
iscm
)
{
this
.
iscm
=
iscm
;
}
public
String
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
String
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getSource
()
{
return
source
;
}
public
void
setSource
(
String
source
)
{
this
.
source
=
source
;
}
}
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/model/telcom/TelecomCallDetailInfo.java
0 → 100644
View file @
4005affd
package
cn
.
quantgroup
.
qgdataservice
.
model
.
telcom
;
import
java.io.Serializable
;
/**
* @Author fengjunkai
* @Date 2019-08-28 10:28
*/
public
class
TelecomCallDetailInfo
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
8864372587346649363L
;
private
String
uuid
;
private
String
tradeType
;
private
String
cTime
;
private
String
tradeTime
;
private
String
callWay
;
private
String
receiverPhone
;
private
String
tradeAddr
;
private
String
basePay
;
private
String
longPay
;
private
String
infoPay
;
private
String
otherPay
;
private
String
allPay
;
private
String
phone
;
private
String
iscm
;
private
String
timestamp
;
private
String
source
;
public
String
getUuid
()
{
return
uuid
;
}
public
void
setUuid
(
String
uuid
)
{
this
.
uuid
=
uuid
;
}
public
String
getTradeType
()
{
return
tradeType
;
}
public
void
setTradeType
(
String
tradeType
)
{
this
.
tradeType
=
tradeType
;
}
public
String
getcTime
()
{
return
cTime
;
}
public
void
setcTime
(
String
cTime
)
{
this
.
cTime
=
cTime
;
}
public
String
getTradeTime
()
{
return
tradeTime
;
}
public
void
setTradeTime
(
String
tradeTime
)
{
this
.
tradeTime
=
tradeTime
;
}
public
String
getCallWay
()
{
return
callWay
;
}
public
void
setCallWay
(
String
callWay
)
{
this
.
callWay
=
callWay
;
}
public
String
getReceiverPhone
()
{
return
receiverPhone
;
}
public
void
setReceiverPhone
(
String
receiverPhone
)
{
this
.
receiverPhone
=
receiverPhone
;
}
public
String
getTradeAddr
()
{
return
tradeAddr
;
}
public
void
setTradeAddr
(
String
tradeAddr
)
{
this
.
tradeAddr
=
tradeAddr
;
}
public
String
getBasePay
()
{
return
basePay
;
}
public
void
setBasePay
(
String
basePay
)
{
this
.
basePay
=
basePay
;
}
public
String
getLongPay
()
{
return
longPay
;
}
public
void
setLongPay
(
String
longPay
)
{
this
.
longPay
=
longPay
;
}
public
String
getInfoPay
()
{
return
infoPay
;
}
public
void
setInfoPay
(
String
infoPay
)
{
this
.
infoPay
=
infoPay
;
}
public
String
getOtherPay
()
{
return
otherPay
;
}
public
void
setOtherPay
(
String
otherPay
)
{
this
.
otherPay
=
otherPay
;
}
public
String
getAllPay
()
{
return
allPay
;
}
public
void
setAllPay
(
String
allPay
)
{
this
.
allPay
=
allPay
;
}
public
String
getPhone
()
{
return
phone
;
}
public
void
setPhone
(
String
phone
)
{
this
.
phone
=
phone
;
}
public
String
getIscm
()
{
return
iscm
;
}
public
void
setIscm
(
String
iscm
)
{
this
.
iscm
=
iscm
;
}
public
String
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
String
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getSource
()
{
return
source
;
}
public
void
setSource
(
String
source
)
{
this
.
source
=
source
;
}
}
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/model/unicom/UnicomCallDetailInfo.java
0 → 100644
View file @
4005affd
package
cn
.
quantgroup
.
qgdataservice
.
model
.
unicom
;
import
java.io.Serializable
;
import
java.util.List
;
/**
* @Author fengjunkai
* @Date 2019-08-28 10:29
*/
public
class
UnicomCallDetailInfo
implements
Serializable
{
private
static
final
long
serialVersionUID
=
3138935392599159952L
;
private
String
uuid
;
private
String
businessType
;
private
String
cTime
;
private
String
tradeTime
;
private
String
callType
;
private
String
receiverPhone
;
private
String
tradeAddr
;
private
String
tradeType
;
private
String
basePay
;
private
String
ldPay
;
private
String
otherPay
;
private
String
totalPay
;
private
String
phone
;
private
String
iscm
;
private
String
reductionPay
;
private
String
timestamp
;
private
String
source
;
public
String
getUuid
()
{
return
uuid
;
}
public
void
setUuid
(
String
uuid
)
{
this
.
uuid
=
uuid
;
}
public
String
getBusinessType
()
{
return
businessType
;
}
public
void
setBusinessType
(
String
businessType
)
{
this
.
businessType
=
businessType
;
}
public
String
getcTime
()
{
return
cTime
;
}
public
void
setcTime
(
String
cTime
)
{
this
.
cTime
=
cTime
;
}
public
String
getTradeTime
()
{
return
tradeTime
;
}
public
void
setTradeTime
(
String
tradeTime
)
{
this
.
tradeTime
=
tradeTime
;
}
public
String
getCallType
()
{
return
callType
;
}
public
void
setCallType
(
String
callType
)
{
this
.
callType
=
callType
;
}
public
String
getReceiverPhone
()
{
return
receiverPhone
;
}
public
void
setReceiverPhone
(
String
receiverPhone
)
{
this
.
receiverPhone
=
receiverPhone
;
}
public
String
getTradeAddr
()
{
return
tradeAddr
;
}
public
void
setTradeAddr
(
String
tradeAddr
)
{
this
.
tradeAddr
=
tradeAddr
;
}
public
String
getTradeType
()
{
return
tradeType
;
}
public
void
setTradeType
(
String
tradeType
)
{
this
.
tradeType
=
tradeType
;
}
public
String
getBasePay
()
{
return
basePay
;
}
public
void
setBasePay
(
String
basePay
)
{
this
.
basePay
=
basePay
;
}
public
String
getLdPay
()
{
return
ldPay
;
}
public
void
setLdPay
(
String
ldPay
)
{
this
.
ldPay
=
ldPay
;
}
public
String
getOtherPay
()
{
return
otherPay
;
}
public
void
setOtherPay
(
String
otherPay
)
{
this
.
otherPay
=
otherPay
;
}
public
String
getTotalPay
()
{
return
totalPay
;
}
public
void
setTotalPay
(
String
totalPay
)
{
this
.
totalPay
=
totalPay
;
}
public
String
getPhone
()
{
return
phone
;
}
public
void
setPhone
(
String
phone
)
{
this
.
phone
=
phone
;
}
public
String
getIscm
()
{
return
iscm
;
}
public
void
setIscm
(
String
iscm
)
{
this
.
iscm
=
iscm
;
}
public
String
getReductionPay
()
{
return
reductionPay
;
}
public
void
setReductionPay
(
String
reductionPay
)
{
this
.
reductionPay
=
reductionPay
;
}
public
String
getTimestamp
()
{
return
timestamp
;
}
public
void
setTimestamp
(
String
timestamp
)
{
this
.
timestamp
=
timestamp
;
}
public
String
getSource
()
{
return
source
;
}
public
void
setSource
(
String
source
)
{
this
.
source
=
source
;
}
}
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/service/hbase/OperatorInfoService.java
View file @
4005affd
package
cn
.
quantgroup
.
qgdataservice
.
service
.
hbase
;
package
cn
.
quantgroup
.
qgdataservice
.
service
.
hbase
;
import
cn.quantgroup.qgdataservice.service.kafka.OperatorCallDetailListSendToKafkaService
;
import
cn.quantgroup.qgdataservice.service.tidb.OperatorInfoCleaningService
;
import
cn.quantgroup.qgdataservice.service.tidb.OperatorInfoCleaningService
;
import
com.lkb.data.hbase.dataservice.SpiderUserItemDataService
;
import
com.lkb.data.hbase.dataservice.SpiderUserItemDataService
;
import
com.lkb.data.hbase.dataservice.operators.PhoneBillDataService
;
import
com.lkb.data.hbase.dataservice.operators.PhoneBillDataService
;
...
@@ -67,8 +66,6 @@ public class OperatorInfoService {
...
@@ -67,8 +66,6 @@ public class OperatorInfoService {
else
else
MobileCallInfoDataService
.
put
(
mobileCallInfoRows
);
MobileCallInfoDataService
.
put
(
mobileCallInfoRows
);
OperatorCallDetailListSendToKafkaService
.
sendMobileCallDetailListMessage
(
mobileCallInfoRows
);
OperatorInfoCleaningService
.
cleaningAndSaveMobileCallDetailInfo
(
mobileCallInfoRows
,
uuid
,
ka
);
OperatorInfoCleaningService
.
cleaningAndSaveMobileCallDetailInfo
(
mobileCallInfoRows
,
uuid
,
ka
);
}
}
...
@@ -120,8 +117,6 @@ public class OperatorInfoService {
...
@@ -120,8 +117,6 @@ public class OperatorInfoService {
else
else
TelecomCallInfoDataService
.
put
(
telecomCallInfoRows
);
TelecomCallInfoDataService
.
put
(
telecomCallInfoRows
);
OperatorCallDetailListSendToKafkaService
.
sendTelecomCallDetailListMessage
(
telecomCallInfoRows
);
OperatorInfoCleaningService
.
cleaningAndSaveTelecomCallDetailInfo
(
telecomCallInfoRows
,
uuid
,
ka
);
OperatorInfoCleaningService
.
cleaningAndSaveTelecomCallDetailInfo
(
telecomCallInfoRows
,
uuid
,
ka
);
}
}
...
@@ -173,8 +168,6 @@ public class OperatorInfoService {
...
@@ -173,8 +168,6 @@ public class OperatorInfoService {
else
else
UnicomCallInfoDataService
.
put
(
unicomCallInfoRows
);
UnicomCallInfoDataService
.
put
(
unicomCallInfoRows
);
OperatorCallDetailListSendToKafkaService
.
sendUnicomCallDetailListMessage
(
unicomCallInfoRows
);
OperatorInfoCleaningService
.
cleaningAndSaveUnicomCallDetailInfo
(
unicomCallInfoRows
,
uuid
,
ka
);
OperatorInfoCleaningService
.
cleaningAndSaveUnicomCallDetailInfo
(
unicomCallInfoRows
,
uuid
,
ka
);
}
}
...
...
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/service/kafka/OperatorCallDetailListSendToKafkaService.java
View file @
4005affd
...
@@ -2,12 +2,12 @@ package cn.quantgroup.qgdataservice.service.kafka;
...
@@ -2,12 +2,12 @@ package cn.quantgroup.qgdataservice.service.kafka;
import
cn.quantgroup.qgdataservice.config.kafka.KafkaProducers
;
import
cn.quantgroup.qgdataservice.config.kafka.KafkaProducers
;
import
cn.quantgroup.qgdataservice.constant.Constant
;
import
cn.quantgroup.qgdataservice.constant.Constant
;
import
cn.quantgroup.qgdataservice.model.mobile.MobileCallDetailInfo
;
import
cn.quantgroup.qgdataservice.model.telcom.TelecomCallDetailInfo
;
import
cn.quantgroup.qgdataservice.model.unicom.UnicomCallDetailInfo
;
import
cn.quantgroup.qgdataservice.utils.ThreadPoolExecutor2Utils
;
import
cn.quantgroup.qgdataservice.utils.ThreadPoolExecutor2Utils
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
;
import
com.google.common.base.Stopwatch
;
import
com.google.common.base.Stopwatch
;
import
com.lkb.data.hbase.row.operators.mobile.MobileCallInfoRow
;
import
com.lkb.data.hbase.row.operators.telecom.TelecomCallInfoRow
;
import
com.lkb.data.hbase.row.operators.unicom.UnicomCallInfoRow
;
import
org.apache.kafka.clients.producer.Callback
;
import
org.apache.kafka.clients.producer.Callback
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.ProducerRecord
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
import
org.apache.kafka.clients.producer.RecordMetadata
;
...
@@ -26,7 +26,7 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -26,7 +26,7 @@ public class OperatorCallDetailListSendToKafkaService {
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
OperatorCallDetailListSendToKafkaService
.
class
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
OperatorCallDetailListSendToKafkaService
.
class
);
public
static
void
sendMobileCallDetailListMessage
(
List
<
MobileCall
InfoRow
>
mobileCallInfoRow
s
)
{
public
static
void
sendMobileCallDetailListMessage
(
List
<
MobileCall
DetailInfo
>
mobileCallDetailInfo
s
)
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
@Override
@Override
...
@@ -36,24 +36,24 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -36,24 +36,24 @@ public class OperatorCallDetailListSendToKafkaService {
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
for
(
int
i
=
0
;
i
<
mobileCall
InfoRow
s
.
size
();
i
++){
for
(
int
i
=
0
;
i
<
mobileCall
DetailInfo
s
.
size
();
i
++){
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
mobileCall
InfoRow
s
.
get
(
i
)));
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
mobileCall
DetailInfo
s
.
get
(
i
)));
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
@Override
@Override
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
if
(
Objects
.
nonNull
(
exception
))
{
if
(
Objects
.
nonNull
(
exception
))
{
log
.
error
(
"移动通话详单发送至kafka(清洗
前
)回调提示异常, 异常: {} "
,
exception
);
log
.
error
(
"移动通话详单发送至kafka(清洗
后
)回调提示异常, 异常: {} "
,
exception
);
}
}
}
}
});
});
}
}
log
.
info
(
"移动通话详单发送至kafka(清洗
前
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
log
.
info
(
"移动通话详单发送至kafka(清洗
后
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"移动通话详单发送至kafka(清洗
前
)数据异常"
,
e
);
log
.
error
(
"移动通话详单发送至kafka(清洗
后
)数据异常"
,
e
);
}
}
}
}
...
@@ -61,7 +61,7 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -61,7 +61,7 @@ public class OperatorCallDetailListSendToKafkaService {
}
}
public
static
void
sendTelecomCallDetailListMessage
(
List
<
TelecomCall
InfoRow
>
telecomCallInfoRow
s
)
{
public
static
void
sendTelecomCallDetailListMessage
(
List
<
TelecomCall
DetailInfo
>
telecomCallDetailInfo
s
)
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
@Override
@Override
...
@@ -69,24 +69,24 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -69,24 +69,24 @@ public class OperatorCallDetailListSendToKafkaService {
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
try
{
try
{
for
(
int
i
=
0
;
i
<
telecomCall
InfoRow
s
.
size
();
i
++){
for
(
int
i
=
0
;
i
<
telecomCall
DetailInfo
s
.
size
();
i
++){
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
telecomCall
InfoRow
s
.
get
(
i
)));
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
telecomCall
DetailInfo
s
.
get
(
i
)));
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
@Override
@Override
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
if
(
Objects
.
nonNull
(
exception
)){
if
(
Objects
.
nonNull
(
exception
)){
log
.
error
(
"电信通话详单发送至kafka(清洗
前
)回调结果提示异常, {}"
,
exception
);
log
.
error
(
"电信通话详单发送至kafka(清洗
后
)回调结果提示异常, {}"
,
exception
);
}
}
}
}
});
});
}
}
log
.
info
(
"电信通话详单发送至kafka(清洗
前
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
log
.
info
(
"电信通话详单发送至kafka(清洗
后
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"电信通话详单发送至kafka(清洗
前
)数据异常"
,
e
);
log
.
error
(
"电信通话详单发送至kafka(清洗
后
)数据异常"
,
e
);
}
}
}
}
...
@@ -94,7 +94,7 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -94,7 +94,7 @@ public class OperatorCallDetailListSendToKafkaService {
}
}
public
static
void
sendUnicomCallDetailListMessage
(
List
<
UnicomCall
InfoRow
>
unicomCallInfoRow
s
)
{
public
static
void
sendUnicomCallDetailListMessage
(
List
<
UnicomCall
DetailInfo
>
unicomCallDetailInfo
s
)
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
ThreadPoolExecutor2Utils
.
getThreadPoolUtil
().
execute
(
new
Runnable
()
{
@Override
@Override
...
@@ -104,24 +104,24 @@ public class OperatorCallDetailListSendToKafkaService {
...
@@ -104,24 +104,24 @@ public class OperatorCallDetailListSendToKafkaService {
try
{
try
{
for
(
int
i
=
0
;
i
<
unicomCall
InfoRow
s
.
size
();
i
++){
for
(
int
i
=
0
;
i
<
unicomCall
DetailInfo
s
.
size
();
i
++){
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
unicomCall
InfoRow
s
.
get
(
i
)));
ProducerRecord
producerRecord
=
new
ProducerRecord
(
Constant
.
KAFKA
.
OPERATOR_CALL_TOPIC
,
JSON
.
toJSONString
(
unicomCall
DetailInfo
s
.
get
(
i
)));
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
KafkaProducers
.
KAFKA_PRODUCER_API
.
kafkaProducer
.
send
(
producerRecord
,
new
Callback
()
{
@Override
@Override
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
public
void
onCompletion
(
RecordMetadata
metadata
,
Exception
exception
)
{
if
(
Objects
.
nonNull
(
exception
)){
if
(
Objects
.
nonNull
(
exception
)){
log
.
error
(
"联通通话详单发送至kafka(清洗
前
)回调结果提示异常, {}"
,
exception
);
log
.
error
(
"联通通话详单发送至kafka(清洗
后
)回调结果提示异常, {}"
,
exception
);
}
}
}
}
});
});
}
}
log
.
info
(
"联通通话详单发送至kafka(清洗
前
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
log
.
info
(
"联通通话详单发送至kafka(清洗
后
)数据结束, 耗时: {} "
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"联通通话详单发送至kafka(清洗
前
)数据记异常"
,
e
);
log
.
error
(
"联通通话详单发送至kafka(清洗
后
)数据记异常"
,
e
);
}
}
}
}
...
...
qg-data-service/src/main/java/cn/quantgroup/qgdataservice/service/tidb/OperatorInfoCleaningService.java
View file @
4005affd
package
cn
.
quantgroup
.
qgdataservice
.
service
.
tidb
;
package
cn
.
quantgroup
.
qgdataservice
.
service
.
tidb
;
import
cn.quantgroup.qgdataservice.constant.Constant
;
import
cn.quantgroup.qgdataservice.constant.Constant
;
import
cn.quantgroup.qgdataservice.model.mobile.MobileCallDetailInfo
;
import
cn.quantgroup.qgdataservice.model.telcom.TelecomCallDetailInfo
;
import
cn.quantgroup.qgdataservice.model.unicom.UnicomCallDetailInfo
;
import
cn.quantgroup.qgdataservice.service.kafka.OperatorCallDetailListSendToKafkaService
;
import
cn.quantgroup.qgdataservice.utils.*
;
import
cn.quantgroup.qgdataservice.utils.*
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
;
import
com.google.common.base.Stopwatch
;
import
com.google.common.base.Stopwatch
;
...
@@ -116,6 +120,7 @@ public class OperatorInfoCleaningService {
...
@@ -116,6 +120,7 @@ public class OperatorInfoCleaningService {
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
MobileCallDetailInfo
>
mobileCallDetailInfos
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
mobileCallInfoRows
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
mobileCallInfoRows
.
size
();
i
++)
{
...
@@ -160,6 +165,8 @@ public class OperatorInfoCleaningService {
...
@@ -160,6 +165,8 @@ public class OperatorInfoCleaningService {
sqls
.
add
(
list
);
sqls
.
add
(
list
);
mobileCallDetailInfos
.
add
(
mobileList2Bean
(
list
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"移动通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
mobileCallInfoRow
),
e
);
log
.
error
(
"移动通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
mobileCallInfoRow
),
e
);
}
}
...
@@ -168,6 +175,8 @@ public class OperatorInfoCleaningService {
...
@@ -168,6 +175,8 @@ public class OperatorInfoCleaningService {
int
size
=
JdbcExecuters
.
batchExecute
(
sqls
,
Constant
.
SQL
.
MOBILE_DETAIL_INFOS
);
int
size
=
JdbcExecuters
.
batchExecute
(
sqls
,
Constant
.
SQL
.
MOBILE_DETAIL_INFOS
);
OperatorCallDetailListSendToKafkaService
.
sendMobileCallDetailListMessage
(
mobileCallDetailInfos
);
log
.
info
(
"移动通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} "
,
uuid
,
mobileCallInfoRows
.
size
(),
sqls
.
size
(),
size
,
ka
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
log
.
info
(
"移动通话详单清洗完成, uuid: {} , 原始大小: {} , 清洗后大小: {} , 入库大小: {} , ka: {} , 耗时: {} "
,
uuid
,
mobileCallInfoRows
.
size
(),
sqls
.
size
(),
size
,
ka
,
stopwatch
.
stop
().
elapsed
(
TimeUnit
.
MILLISECONDS
));
}
}
});
});
...
@@ -299,6 +308,7 @@ public class OperatorInfoCleaningService {
...
@@ -299,6 +308,7 @@ public class OperatorInfoCleaningService {
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
TelecomCallDetailInfo
>
telecomCallDetailInfos
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
telecomCallInfoRows
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
telecomCallInfoRows
.
size
();
i
++)
{
TelecomCallInfoRow
telecomCallInfoRow
=
telecomCallInfoRows
.
get
(
i
);
TelecomCallInfoRow
telecomCallInfoRow
=
telecomCallInfoRows
.
get
(
i
);
...
@@ -351,7 +361,7 @@ public class OperatorInfoCleaningService {
...
@@ -351,7 +361,7 @@ public class OperatorInfoCleaningService {
list
.
add
(
TimeUtils
.
timeStamp2Date
(
String
.
valueOf
(
System
.
currentTimeMillis
()),
Constant
.
OPERATOR
.
TELECOM_USERSOURCE_CALL
,
phoneNo
));
list
.
add
(
TimeUtils
.
timeStamp2Date
(
String
.
valueOf
(
System
.
currentTimeMillis
()),
Constant
.
OPERATOR
.
TELECOM_USERSOURCE_CALL
,
phoneNo
));
sqls
.
add
(
list
);
sqls
.
add
(
list
);
telecomCallDetailInfos
.
add
(
telecomList2Bean
(
list
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"电信通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
telecomCallInfoRow
),
e
);
log
.
error
(
"电信通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
telecomCallInfoRow
),
e
);
}
}
...
@@ -491,6 +501,7 @@ public class OperatorInfoCleaningService {
...
@@ -491,6 +501,7 @@ public class OperatorInfoCleaningService {
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
Stopwatch
stopwatch
=
Stopwatch
.
createStarted
();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
List
<
String
>>
sqls
=
new
ArrayList
<>();
List
<
UnicomCallDetailInfo
>
unicomCallDetailInfos
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
unicomCallInfoRows
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
unicomCallInfoRows
.
size
();
i
++)
{
UnicomCallInfoRow
unicomCallInfoRow
=
unicomCallInfoRows
.
get
(
i
);
UnicomCallInfoRow
unicomCallInfoRow
=
unicomCallInfoRows
.
get
(
i
);
...
@@ -546,7 +557,7 @@ public class OperatorInfoCleaningService {
...
@@ -546,7 +557,7 @@ public class OperatorInfoCleaningService {
list
.
add
(
TimeUtils
.
timeStamp2Date
(
String
.
valueOf
(
System
.
currentTimeMillis
()),
Constant
.
OPERATOR
.
UNICOM_USERSOURCE_CALL
,
phone
));
list
.
add
(
TimeUtils
.
timeStamp2Date
(
String
.
valueOf
(
System
.
currentTimeMillis
()),
Constant
.
OPERATOR
.
UNICOM_USERSOURCE_CALL
,
phone
));
sqls
.
add
(
list
);
sqls
.
add
(
list
);
unicomCallDetailInfos
.
add
(
unicomList2Bean
(
list
));
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"联通通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
unicomCallInfoRow
),
e
);
log
.
error
(
"联通通话详单清洗异常, uuid: {} , ka: {} , param: {} "
,
uuid
,
ka
,
JSON
.
toJSONString
(
unicomCallInfoRow
),
e
);
}
}
...
@@ -787,4 +798,70 @@ public class OperatorInfoCleaningService {
...
@@ -787,4 +798,70 @@ public class OperatorInfoCleaningService {
}
}
public
static
MobileCallDetailInfo
mobileList2Bean
(
List
<
String
>
list
){
// uuid,cTime,tradeAddr,tradeWay,tradeType,receiverPhone,tradeTime,taocan,onlinePay,phone,iscm,timestamp
MobileCallDetailInfo
mobileCallDetailInfo
=
new
MobileCallDetailInfo
();
mobileCallDetailInfo
.
setUuid
(
list
.
get
(
0
));
mobileCallDetailInfo
.
setcTime
(
list
.
get
(
1
));
mobileCallDetailInfo
.
setTradeAddr
(
list
.
get
(
2
));
mobileCallDetailInfo
.
setTradeWay
(
list
.
get
(
3
));
mobileCallDetailInfo
.
setTradeType
(
list
.
get
(
4
));
mobileCallDetailInfo
.
setReceiverPhone
(
list
.
get
(
5
));
mobileCallDetailInfo
.
setTradeTime
(
list
.
get
(
6
));
mobileCallDetailInfo
.
setTaocan
(
list
.
get
(
7
));
mobileCallDetailInfo
.
setOnlinePay
(
list
.
get
(
8
));
mobileCallDetailInfo
.
setPhone
(
list
.
get
(
9
));
mobileCallDetailInfo
.
setIscm
(
list
.
get
(
10
));
mobileCallDetailInfo
.
setTimestamp
(
list
.
get
(
11
));
mobileCallDetailInfo
.
setSource
(
Constant
.
OPERATOR
.
MOBILE
);
return
mobileCallDetailInfo
;
}
public
static
TelecomCallDetailInfo
telecomList2Bean
(
List
<
String
>
list
){
//uuid,tradeType,cTime,tradeTime,callWay,receiverPhone,tradeAddr,basePay,longPay,infoPay,otherPay,allPay,phone,iscm,timestamp
TelecomCallDetailInfo
telecomCallDetailInfo
=
new
TelecomCallDetailInfo
();
telecomCallDetailInfo
.
setUuid
(
list
.
get
(
0
));
telecomCallDetailInfo
.
setTradeType
(
list
.
get
(
1
));
telecomCallDetailInfo
.
setcTime
(
list
.
get
(
2
));
telecomCallDetailInfo
.
setTradeTime
(
list
.
get
(
3
));
telecomCallDetailInfo
.
setCallWay
(
list
.
get
(
4
));
telecomCallDetailInfo
.
setReceiverPhone
(
list
.
get
(
5
));
telecomCallDetailInfo
.
setTradeAddr
(
list
.
get
(
6
));
telecomCallDetailInfo
.
setBasePay
(
list
.
get
(
7
));
telecomCallDetailInfo
.
setLongPay
(
list
.
get
(
8
));
telecomCallDetailInfo
.
setInfoPay
(
list
.
get
(
9
));
telecomCallDetailInfo
.
setOtherPay
(
list
.
get
(
10
));
telecomCallDetailInfo
.
setAllPay
(
list
.
get
(
11
));
telecomCallDetailInfo
.
setPhone
(
list
.
get
(
12
));
telecomCallDetailInfo
.
setIscm
(
list
.
get
(
13
));
telecomCallDetailInfo
.
setTimestamp
(
list
.
get
(
14
));
telecomCallDetailInfo
.
setSource
(
Constant
.
OPERATOR
.
TELECOM
);
return
telecomCallDetailInfo
;
}
public
static
UnicomCallDetailInfo
unicomList2Bean
(
List
<
String
>
list
){
//uuid,businessType,cTime,tradeTime,callType,receiverPhone,tradeAddr,tradeType,basePay,ldPay,otherPay,totalPay,phone,iscm,reductionPay,timestamp
UnicomCallDetailInfo
unicomCallDetailInfo
=
new
UnicomCallDetailInfo
();
unicomCallDetailInfo
.
setUuid
(
list
.
get
(
0
));
unicomCallDetailInfo
.
setBusinessType
(
list
.
get
(
1
));
unicomCallDetailInfo
.
setcTime
(
list
.
get
(
2
));
unicomCallDetailInfo
.
setTradeTime
(
list
.
get
(
3
));
unicomCallDetailInfo
.
setReceiverPhone
(
list
.
get
(
5
));
unicomCallDetailInfo
.
setCallType
(
list
.
get
(
4
));
unicomCallDetailInfo
.
setTradeAddr
(
list
.
get
(
6
));
unicomCallDetailInfo
.
setTradeType
(
list
.
get
(
7
));
unicomCallDetailInfo
.
setBasePay
(
list
.
get
(
8
));
unicomCallDetailInfo
.
setLdPay
(
list
.
get
(
9
));
unicomCallDetailInfo
.
setOtherPay
(
list
.
get
(
10
));
unicomCallDetailInfo
.
setTotalPay
(
list
.
get
(
11
));
unicomCallDetailInfo
.
setPhone
(
list
.
get
(
12
));
unicomCallDetailInfo
.
setIscm
(
list
.
get
(
13
));
unicomCallDetailInfo
.
setReductionPay
(
list
.
get
(
14
));
unicomCallDetailInfo
.
setTimestamp
(
list
.
get
(
15
));
unicomCallDetailInfo
.
setSource
(
Constant
.
OPERATOR
.
UNICOM
);
return
unicomCallDetailInfo
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment