Commit 26196b09 authored by 郝彦辉's avatar 郝彦辉

检测tidb的call_record表数据是否有重复数据工具

parent fdb213c5
......@@ -171,4 +171,10 @@ public class ManualToolController {
return "checkCallRecordCFByTransactionLog调度完成";
}
@RequestMapping("/checkCallRecordCFByTransactionLog2")
public String checkCallRecordCFByTransactionLog2(String newYnrTime,String isExecuteOnce){
cleanningTransactionLogService.checkCallRecordCFByTransactionLog2(newYnrTime,isExecuteOnce);
return "checkCallRecordCFByTransactionLog2调度完成";
}
}
......@@ -4,9 +4,11 @@ import cn.quantgroup.report.domain.master.CallRecord1;
import cn.quantgroup.report.domain.master.TransactionLogPO;
import cn.quantgroup.report.domain.tidbrisk.CallRecord3;
import cn.quantgroup.report.utils.JdbcUtils;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
......@@ -16,6 +18,8 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
......@@ -654,4 +658,129 @@ public class CleanningTransactionLogService {
}
}
@Async
public void checkCallRecordCFByTransactionLog2(String newYnrTime, String isExecuteOnce) {//yyyy-MM-dd
try {
boolean executeOnce = false;
if(StringUtils.isNotEmpty(isExecuteOnce) && "true".equals(isExecuteOnce)){
executeOnce = true;
}
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
LocalDateTime now = new Timestamp(simpleDateFormat1.parse(newYnrTime).getTime()).toLocalDateTime();
//开始时间 2018-10-17 11:30:39
//结束时间 2019-12-31 10:36:34
//相差440天
for (int i = 0; i < 500; i++) {
String startTime = now.minusDays(i+1).format(DateTimeFormatter.ISO_DATE);
String endTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
String TRANSACTION_SQL = "select transaction_id, uuid, url_type, code, time_created " +
" from transaction_log where time_created >= '" + startTime + "' and time_created < '" + endTime + "'";
Stopwatch queryStopwatch = Stopwatch.createStarted();
List<TransactionLogPO> transactionLogPOList = riskDatasourceJdbcTemplate.query(TRANSACTION_SQL, new BeanPropertyRowMapper<>(TransactionLogPO.class));
log.info("transactionLog查询数据结束, startTime: {} , endTime: {} , size: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList) ? 0 : transactionLogPOList.size(), queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isEmpty(transactionLogPOList)) {
log.info("查询数据为空跳过, startTime: {} , endTime: {} ",startTime, endTime);
continue;
}
Map<String, TransactionLogPO> transactionLogPOMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (int tran = 0; tran < transactionLogPOList.size(); tran++) {
TransactionLogPO transactionLogPO = transactionLogPOList.get(tran);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(transactionLogPO.getTimeCreated());
String key = new StringBuffer(transactionLogPO.getTransactionId()).append(",")
.append(StringUtils.isNotBlank(transactionLogPO.getUuid()) ? transactionLogPO.getUuid() : "").append(",")
.append(transactionLogPO.getUrlType()).append(",")
.append(transactionLogPO.getCode()).append(",")
.append(format).toString();
transactionLogPOMap.put(key, transactionLogPO);
}
log.info("transactionLog组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, transactionLogPOList.size(), transactionLogPOMap.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch callRStopwatch = Stopwatch.createStarted();
String CALL_SQL = "select transaction_id, uuid, url_type, code, created_at " +
" from call_record where created_at >= '" + startTime + "' and created_at < '" + endTime + "' order by created_at desc;";
List<CallRecord1> queryResult = tidbRiskJdbcTemplate.query(CALL_SQL, new BeanPropertyRowMapper<>(CallRecord1.class));
log.info("callRecord查询数据结束, startTime: {} , endTime: {} , size: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(queryResult) ? 0 : queryResult.size(), callRStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Map<String, CallRecord1> callRecord1Map = new HashMap<>();
if (CollectionUtils.isNotEmpty(queryResult)) {
Stopwatch callStopwatch = Stopwatch.createStarted();
for (int call = 0; call < queryResult.size(); call++) {
CallRecord1 callRecord1 = queryResult.get(call);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(callRecord1.getCreated_at().getTime()));
String key = new StringBuffer(callRecord1.getTransactionId()).append(",")
.append(StringUtils.isNotBlank(callRecord1.getUuid()) ? callRecord1.getUuid() : "").append(",")
.append(callRecord1.getUrlType()).append(",")
.append(callRecord1.getCode()).append(",")
.append(format).toString();
callRecord1Map.put(key, callRecord1);
}
log.info("callRecord组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, queryResult.size(), callRecord1Map.size(), callStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch delStopwatch = Stopwatch.createStarted();
Iterator<Map.Entry<String, TransactionLogPO>> iterator = transactionLogPOMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionLogPO> next = iterator.next();
String key = next.getKey();
if (callRecord1Map.containsKey(key)) {
iterator.remove();
callRecord1Map.remove(key);
}
}
log.info("去相同据完成, startTime: {} , callRecord1Map大小: {} , transactionLogPOMap大小: {} , 耗时: {} ", startTime, callRecord1Map.size(), transactionLogPOMap.size(), delStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
String path = "/home/quant_group/baihang-report/logs/transaction_call/";
if (transactionLogPOMap.size()>0) {
Stopwatch stopwatch1222 = Stopwatch.createStarted();
AtomicInteger countNum = new AtomicInteger();
transactionLogPOMap.forEach((k, v)->{
countNum.getAndIncrement();
try {
FileUtils.write(new File(path +"transaction_start_"+startTime+".txt"), k+"\r\n", "UTF-8", true);
} catch (IOException e) {
e.printStackTrace();
}
});
log.info("写入transaction_log多出的数据结束, startTime: {} , countNum: {} , Map大小: {} , 耗时: {} ",startTime, countNum.get(), transactionLogPOMap.size(), stopwatch1222.stop().elapsed(TimeUnit.MILLISECONDS));
}else{
log.info("写入transaction_log多出的数据结束, startTime: {} , transactionLogPOMap size is Empty!",startTime);
}
if (callRecord1Map.size()>0) {
Stopwatch stopwatch3 = Stopwatch.createStarted();
AtomicInteger countNum3 = new AtomicInteger();
callRecord1Map.forEach((key, v)->{
countNum3.getAndIncrement();
try {
FileUtils.write(new File(path +"callRecord_start_"+startTime+".txt"), key+"\r\n", "UTF-8", true);
} catch (IOException e) {
e.printStackTrace();
}
});
log.info("写入call_record多出的数据结束, startTime: {} , countNum: {} , Map大小: {} , 耗时: {} ",startTime, countNum3.get(), callRecord1Map.size(), stopwatch3.stop().elapsed(TimeUnit.MILLISECONDS));
}else{
log.info("写入call_record多出的数据结束, startTime: {} , transactionLogPOMap size is Empty!",startTime);
}
if(executeOnce){
log.info("比较数据完成, newYnrTime: {} , isExecuteOnce: {} , startTime: {} , endTime: {} ,",newYnrTime, isExecuteOnce, startTime, endTime);
break;
}
}
log.info("----All比较数据完成----");
} catch (Exception e) {
log.error("比较数据异常", e);
}
}
}
......@@ -5,6 +5,7 @@ import cn.quantgroup.report.config.aop.DistributedLock;
import cn.quantgroup.report.config.aop.Monitor;
import cn.quantgroup.report.config.aop.MonitorType;
import cn.quantgroup.report.domain.baihang.*;
import cn.quantgroup.report.domain.master.TransactionLogPO;
import cn.quantgroup.report.mapper.baihang.LoanInfoMapper;
import cn.quantgroup.report.mapper.master.ApplyLoanInfoDbMapper;
import cn.quantgroup.report.mapper.master.LoanInfoDbMapper;
......@@ -2554,6 +2555,61 @@ public class ManualToolService implements CommonSuperService {
//---------------------------------------------------------------------
private static void transaction_call() {
try{
String path = "D:\\JavaTeam\\test\\transaction_call\\";
List<String> jsonList_1 = ReadOrWriteTxt.readTxtList(path + "transaction_log.txt");
HashMap<String,String> map_transaction = new HashMap<>();
int i=0;
for(String str : jsonList_1){
map_transaction.put(str,""+(++i));
}
log.info("read files jsonList_1 size:{} , map_transaction siez:{} ", jsonList_1.size(), map_transaction.size());
jsonList_1 = null;
manualGC();
List<String> jsonList_2 = ReadOrWriteTxt.readTxtList(path + "call_record.txt");
HashMap<String,String> map_call = new HashMap<>();
int j=0;
for(String str : jsonList_2){
map_call.put(str,""+(++j));
}
log.info("read files jsonList_2 size:{} , map_call siez:{} ", jsonList_2.size(), map_call.size());
jsonList_2 = null;
manualGC();
Iterator<Map.Entry<String, String>> iterator_call = map_call.entrySet().iterator();
while (iterator_call.hasNext()) {
Map.Entry<String, String> next = iterator_call.next();
String key = next.getKey();
if (map_transaction.containsKey(key)) {
iterator_call.remove();
map_transaction.remove(key);
}
}
log.info("去重过滤后 map_transaction size:{} , map_call siez:{} ", map_transaction.size(), map_call.size());
for(String str1 : map_transaction.keySet()){
FileUtils.write(new File(path + "not_transaction_log.txt"), str1+"\n", "UTF-8", true);
}
for(String str2 : map_call.keySet()){
FileUtils.write(new File(path + "not_call_record.txt"), str2+"\n", "UTF-8", true);
}
log.info("All去重过滤后保存放款记录结束");
}catch (Exception e){
log.error("处理放款记录异常!", e);
}
}
public static void main(String[] args) {
......@@ -2623,7 +2679,10 @@ public class ManualToolService implements CommonSuperService {
// build_reqID_recordId();
build_d2_1226();
//build_d2_1226();
transaction_call();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment