Commit 057bd538 authored by 郝彦辉's avatar 郝彦辉

检测tidb的call_record表数据是否有重复数据工具

parent cc6fe2d0
......@@ -165,4 +165,10 @@ public class ManualToolController {
return "checkCallRecordCF调度完成";
}
@RequestMapping("/checkCallRecordCFByTransactionLog")
public String checkCallRecordCFByTransactionLog(String newYnrTime,String isExecuteOnce){
cleanningTransactionLogService.checkCallRecordCFByTransactionLog(newYnrTime,isExecuteOnce);
return "checkCallRecordCFByTransactionLog调度完成";
}
}
......@@ -394,7 +394,7 @@ public class BaiHangZhuDaiService {
RepaymentInfoZhuDai repaymentLoanInfo1 = repaymentLoanInfoDbMapper.findLastOne(BaiHangRepayment.builder().loanId(repaymentLoanInfo.getLoanId()).termNo(repaymentLoanInfo.getTermNo()).build());
if (j > 0 && Objects.nonNull(repaymentLoanInfo1)) {
tg_atomicInt.getAndIncrement();
log.info("量化派助贷TO百行报送-实时还款逾期跳过报送, reqId_log: {} , loanId: {} , termNo: {} , startnyr: {} , endnyr: {} , bean: {} ", reqId_log, repaymentLoanInfo.getLoanId(), repaymentLoanInfo.getTermNo(), startnyr, endnyr, JSON.toJSONString(repaymentLoanInfo1));
log.info("量化派助贷TO百行报送-实时还款逾期跳过报送, reqId_log: {} , loanId: {} , termNo: {} , startnyr: {} , endnyr: {} , bean: {} , param: {} ", reqId_log, repaymentLoanInfo.getLoanId(), repaymentLoanInfo.getTermNo(), startnyr, endnyr, JSON.toJSONString(repaymentLoanInfo1),JSON.toJSONString(repaymentLoanInfo));
continue;
}
totail_atomicInt.getAndIncrement();
......
package cn.quantgroup.report.service.manualTool;
import cn.quantgroup.report.domain.master.CallRecord1;
import cn.quantgroup.report.domain.master.TransactionLogPO;
import cn.quantgroup.report.domain.tidbrisk.CallRecord3;
import cn.quantgroup.report.utils.JdbcUtils;
import com.google.common.base.Stopwatch;
......@@ -546,4 +547,111 @@ public class CleanningTransactionLogService {
log.error("检测重复数据异常", e);
}
}
@Async
public void checkCallRecordCFByTransactionLog(String newYnrTime, String isExecuteOnce) {//yyyy-MM-dd
try {
boolean executeOnce = false;
if(StringUtils.isNotEmpty(isExecuteOnce) && "true".equals(isExecuteOnce)){
executeOnce = true;
}
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
LocalDateTime now = new Timestamp(simpleDateFormat1.parse(newYnrTime).getTime()).toLocalDateTime();
//开始时间 2018-10-17 11:30:39
//结束时间 2019-12-31 10:36:34
//相差440天
for (int i = 0; i < 500; i++) {
String startTime = now.minusDays(i+1).format(DateTimeFormatter.ISO_DATE);
String endTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
String TRANSACTION_SQL = "select transaction_id, uuid, url_type, code, time_created " +
" from transaction_log where time_created >= '" + startTime + "' and time_created < '" + endTime + "'";
Stopwatch queryStopwatch = Stopwatch.createStarted();
List<TransactionLogPO> transactionLogPOList = riskDatasourceJdbcTemplate.query(TRANSACTION_SQL, new BeanPropertyRowMapper<>(TransactionLogPO.class));
log.info("transactionLog查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList) ? 0 : transactionLogPOList.size(), TRANSACTION_SQL, queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isEmpty(transactionLogPOList)) {
log.info("查询数据为空跳过, startTime: {} , endTime: {} ",startTime, endTime);
continue;
}
Map<String, TransactionLogPO> transactionLogPOMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (int tran = 0; tran < transactionLogPOList.size(); tran++) {
TransactionLogPO transactionLogPO = transactionLogPOList.get(tran);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(transactionLogPO.getTimeCreated());
String key = new StringBuffer(transactionLogPO.getTransactionId())
.append(StringUtils.isNotBlank(transactionLogPO.getUuid()) ? transactionLogPO.getUuid() : "")
.append(transactionLogPO.getUrlType())
.append(transactionLogPO.getCode())
.append(format).toString();
transactionLogPOMap.put(key, transactionLogPO);
}
log.info("transactionLog组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, transactionLogPOList.size(), transactionLogPOMap.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch callRStopwatch = Stopwatch.createStarted();
String CALL_SQL = "select transaction_id, uuid, url_type, code, created_at " +
" from call_record where created_at >= '" + startTime + "' and created_at < '" + endTime + "' order by created_at desc;";
List<CallRecord1> queryResult = tidbRiskJdbcTemplate.query(CALL_SQL, new BeanPropertyRowMapper<>(CallRecord1.class));
log.info("callRecord查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(queryResult) ? 0 : queryResult.size(), CALL_SQL, callRStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Map<String, CallRecord1> callRecord1Map = new HashMap<>();
if (CollectionUtils.isNotEmpty(queryResult)) {
Stopwatch callStopwatch = Stopwatch.createStarted();
for (int call = 0; call < queryResult.size(); call++) {
CallRecord1 callRecord1 = queryResult.get(call);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(callRecord1.getCreated_at().getTime()));
String key = new StringBuffer(callRecord1.getTransactionId())
.append(StringUtils.isNotBlank(callRecord1.getUuid()) ? callRecord1.getUuid() : "")
.append(callRecord1.getUrlType())
.append(callRecord1.getCode())
.append(format).toString();
callRecord1Map.put(key, callRecord1);
}
log.info("callRecord组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, queryResult.size(), callRecord1Map.size(), callStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch delStopwatch = Stopwatch.createStarted();
Iterator<Map.Entry<String, TransactionLogPO>> iterator = transactionLogPOMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionLogPO> next = iterator.next();
String key = next.getKey();
if (callRecord1Map.containsKey(key)) {
iterator.remove();
}
}
log.info("去重组装数据完成, startTime: {} , callRecord查询大小: {} , 组装后transactionLogPOMap大小: {} , 耗时: {} ", startTime, queryResult.size(), transactionLogPOMap.size(), delStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
if (transactionLogPOMap.size()>0) {
Stopwatch stopwatch1222 = Stopwatch.createStarted();
AtomicInteger countNum = new AtomicInteger();
transactionLogPOMap.forEach((k, v)->{
countNum.getAndIncrement();
log.info("检测transactionLogPOMap不为空, startTime: {} , ST: {} , mapkey: {} ",startTime, countNum.get(), k);
});
log.info("一天数据插入完成, startTime: {} , countNum: {} , Map大小: {} , 耗时: {} ",startTime, countNum.get(), transactionLogPOMap.size(), stopwatch1222.stop().elapsed(TimeUnit.MILLISECONDS));
}else{
log.info("一天数据插入完成, startTime: {} , transactionLogPOMap size is Empty!",startTime);
}
if(executeOnce){
log.info("一天数据插入完成, newYnrTime: {} , isExecuteOnce: {} , startTime: {} , endTime: {} ,",newYnrTime, isExecuteOnce, startTime, endTime);
break;
}
}
log.info("----All清洗数据结束----");
} catch (Exception e) {
log.error("清洗数据异常", e);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment