Commit 61a46fb6 authored by 郝彦辉's avatar 郝彦辉

tidb的call_record表历史数据初始化

parent a5b78873
......@@ -14,6 +14,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
......@@ -75,4 +76,9 @@ public class RiskDataSourceConfig {
public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean(name = "riskDatasourceJdbcTemplate")
public JdbcTemplate primaryJdbcTemplate(@Qualifier("masterDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
package cn.quantgroup.report.config.datasource.tidbrisk;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* -----------------------------------------------------------------------------<br>
* 描述: tidb中call_record日志表 <br>
* 作者:yanhui.Hao <br>
* 时间:2019.12.30 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Import(TidbRiskDatasourceProperties.class)
@Configuration
@Slf4j
@MapperScan(basePackages = TidbRiskDataSourceConfig.PACKAGE, sqlSessionFactoryRef = "tidbRiskSqlSessionFactory")
public class TidbRiskDataSourceConfig {
static final String PACKAGE = "cn.quantgroup.report.mapper.tidbrisk";
//@Value("${tidb.risk_datasource.mapper-locations}")
private String mapperLocations="classpath:cn/quantgroup/report/mapper/tidbrisk/*.xml";//resources *.xml
//@Value("${tidb.risk_datasource.type-aliases-package}")
private String typeAliasesPackage="cn.quantgroup.report.domain.tidbrisk";//实体pojo
@Value("${config-location}")
private String configLocation;
@Autowired
private TidbRiskDatasourceProperties tidbRiskDatasourceProperties;
@Bean(name = "tidbRiskDataSource")
public DataSource tidbRiskDataSource() {
HikariConfig config = new HikariConfig();
log.info("tidb.risk_datasource数据库地址:{}",tidbRiskDatasourceProperties.getTidbRiskDatasourceJdbcUrl());
config.setJdbcUrl(tidbRiskDatasourceProperties.getTidbRiskDatasourceJdbcUrl());
config.setPassword(tidbRiskDatasourceProperties.getTidbRiskDatasourcePassword());
config.setUsername(tidbRiskDatasourceProperties.getTidbRiskDatasourceUsername());
config.setMaximumPoolSize(tidbRiskDatasourceProperties.getMaxPoolSize());
config.setMinimumIdle(tidbRiskDatasourceProperties.getMinPoolSize());
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
return new HikariDataSource(config);
}
@Bean(name = "tidbRiskTransactionManager")
public DataSourceTransactionManager tidbRiskTransactionManager(@Qualifier("tidbRiskDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "tidbRiskSqlSessionFactory")
public SqlSessionFactory tidbRiskSqlSessionFactory(@Qualifier("tidbRiskDataSource") DataSource tidbRiskDataSource) throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(tidbRiskDataSource);
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources(mapperLocations));
sessionFactory.setTypeAliasesPackage(typeAliasesPackage);
sessionFactory.setConfigLocation(new PathMatchingResourcePatternResolver()
.getResource(configLocation));
return sessionFactory.getObject();
}
@Bean(name = "tidbRiskSqlSessionTemplate")
public SqlSessionTemplate tidbRiskSqlSessionTemplate(@Qualifier("tidbRiskSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean(name = "tidbRiskJdbcTemplate")
public JdbcTemplate tidbRiskJdbcTemplate(@Qualifier("tidbRiskDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
package cn.quantgroup.report.config.datasource.tidbrisk;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* -----------------------------------------------------------------------------<br>
* 描述: tidb中call_record日志表 <br>
* 作者:yanhui.Hao <br>
* 时间:2019.12.30 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Getter
@Setter
@Configuration
public class TidbRiskDatasourceProperties {
@Value("${db.driver}")
private String driverClass;
@Value("${db.minPoolSize}")
private int minPoolSize;
@Value("${db.maxPoolSize}")
private int maxPoolSize;
/**
* tidb.risk_datasource
*/
//@Value("${db.tidb.risk_datasource.fullurl}")
private String tidbRiskDatasourceJdbcUrl="jdbc:mysql://fengkong-tidb.quantgroups.com:4010/risk_datasource?useUnicode=true&characterEncoding=UTF8";
//@Value("${db.tidb.risk_datasource.username}")
private String tidbRiskDatasourceUsername="risk_datasource_w";
//@Value("${db.tidb.risk_datasource.password}")
private String tidbRiskDatasourcePassword="50GjQLd6hUOSeTMB";
}
......@@ -2,6 +2,7 @@ package cn.quantgroup.report.controller;
import cn.quantgroup.report.response.GlobalResponse;
import cn.quantgroup.report.service.common.CommonQueryService;
import cn.quantgroup.report.service.manualTool.CleanningTransactionLogService;
import cn.quantgroup.report.service.manualTool.ManualToolService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -19,6 +20,9 @@ public class ManualToolController {
@Autowired
private CommonQueryService queryService;
@Autowired
private CleanningTransactionLogService cleanningTransactionLogService;
/* @Autowired
private RedisTemplate redisTemplate;*/
......@@ -149,5 +153,10 @@ public class ManualToolController {
}
@RequestMapping("/synCallRecordNew")
public String synCallRecordNew(String newYnrTime){
cleanningTransactionLogService.synCallRecordNew(newYnrTime);
return "synCallRecordNew调度完成";
}
}
package cn.quantgroup.report.domain.master;
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;
/**
* @Author fengjunkai
*/
@Data
public class CallRecord1 implements Serializable {
private static final long serialVersionUID = 3380282142711506937L;
private String transactionId;
private String uuid;
private String urlType;
private String code;
private Timestamp created_at;
private Timestamp updated_at;
}
package cn.quantgroup.report.domain.tidbrisk;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
/**
* -----------------------------------------------------------------------------<br>
* 描述: 调用日志实体bean <br>
* 作者:yanhui.Hao <br>
* 时间:2019.12.30 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Setter
@Getter
@Builder
public class CallRecord2 {
private String channelType;//数据源调用批次号
private String requestUrl;//数据源调用批次号
private String channelId;//渠道号, 融360-333, 国美-159843
private String transactionId;//数据源调用批次号
private String uuid;//用户uuid
private String urlType;//数据源, QCloud43Md5AntiFraud-腾讯43版反欺诈模型分
private Integer code;//1001:请求成功且命中;1002:请求成功且未命中;1003:请求失败;1004:命中HBase;
//private String createdAt;//创建时间
}
package cn.quantgroup.report.domain.tidbrisk;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.sql.Timestamp;
/**
* -----------------------------------------------------------------------------<br>
* 描述: 调用日志实体bean <br>
* 作者:yanhui.Hao <br>
* 时间:2019.12.30 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CallRecord3 implements Serializable {
private String channelType;//数据源调用批次号
private String requestUrl;//数据源调用批次号
private String channelId;//渠道号, 融360-333, 国美-159843
private String transactionId;//数据源调用批次号
private String uuid;//用户uuid
private String urlType;//数据源, QCloud43Md5AntiFraud-腾讯43版反欺诈模型分
private Integer code;//1001:请求成功且命中;1002:请求成功且未命中;1003:请求失败;1004:命中HBase;
private Timestamp createdAt;
private Timestamp updatedAt;
}
package cn.quantgroup.report.mapper.tidbrisk;
import cn.quantgroup.report.domain.tidbrisk.CallRecord2;
import org.springframework.stereotype.Repository;
@Repository
public interface CallRecordMapper {
/**
* 描述: call_record表
* 迁移至fengkong-tidb.quantgroups.com的risk_datasource库<br/>
* 参数: <br/>
* 返回值: <br/>
* 创建人: yanhui.Hao <br/>
* 创建时间: 2019.12.30 <br/>
*/
int insertCallRecord(CallRecord2 callRecord);
}
\ No newline at end of file
package cn.quantgroup.report.service.manualTool;
import cn.quantgroup.report.domain.master.CallRecord1;
import cn.quantgroup.report.domain.tidbrisk.CallRecord3;
import cn.quantgroup.report.utils.JdbcUtils;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* -----------------------------------------------------------------------------<br>
* 描述: <br>
* 作者:yanhui.Hao <br>
* 时间:2019.12.31 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Slf4j
@Service
public class CleanningTransactionLogService {
@Autowired
private JdbcTemplate riskDatasourceJdbcTemplate;
@Autowired
private JdbcTemplate tidbRiskJdbcTemplate;
@Autowired
@Qualifier("tidbRiskDataSource")
private DataSource tidbRiskDataSource;
@Resource(name = "tidbRiskDataSource")
private DataSource tidbRiskDataSource2;
public static void main(String[] args) {
// LocalDateTime now = LocalDateTime.now();
try {
Stopwatch stopwatch = Stopwatch.createStarted();
Map<String, String> map = new HashMap<>();
Map<String, String> map1 = new HashMap<>();
for (int i = 0; i < 500000; i++) {
map.put(new StringBuffer(String.valueOf(i)).append(String.valueOf(i)).append(String.valueOf(i)).append(String.valueOf(i)).toString(), String.valueOf(i));
}
for (int i = 0; i < 500000; i++) {
map1.put(new StringBuffer(String.valueOf(i)).append(String.valueOf(i)).append(String.valueOf(i)).append(String.valueOf(i)).toString(), String.valueOf(i));
}
System.out.println(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
e.printStackTrace();
}
}
/* @Async
public void start() {
try {
String tmpTime = "2019-11-23";
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
LocalDateTime now = new Timestamp(simpleDateFormat1.parse(tmpTime).getTime()).toLocalDateTime();
for (int i = 0; i < 800; i++) {
String startTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
String endTime = now.minusDays(i + 1).format(DateTimeFormatter.ISO_DATE);
String sql = "select * from transaction_log where time_created > '" + endTime + "' and time_created < '" + startTime + "'";
Stopwatch queryStopwatch = Stopwatch.createStarted();
List<TransactionLogPO> transactionLogPOList = riskDatasourceJdbcTemplate.query(sql, new BeanPropertyRowMapper<>(TransactionLogPO.class));
log.info("transactionLog查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList) ? 0 : transactionLogPOList.size(), sql, queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isEmpty(transactionLogPOList)) {
log.info("查询数据为空跳过");
continue;
}
Map<String, TransactionLogPO> transactionLogPOMap = new HashMap<>();
Map<String, CallRecord1> callRecord1Map = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (int tran = 0; tran < transactionLogPOList.size(); tran++) {
TransactionLogPO transactionLogPO = transactionLogPOList.get(tran);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(transactionLogPO.getTimeCreated());
String key = new StringBuffer(transactionLogPO.getTransactionId()).append(StringUtils.isNotBlank(transactionLogPO.getUuid()) ? transactionLogPO.getUuid() : "").append(transactionLogPO.getUrlType()).append(transactionLogPO.getCode()).append(format).toString();
// System.out.println(key);
transactionLogPOMap.put(key, transactionLogPO);
}
log.info("transactionLog 组装数据完成 查询大小: {} , 组装后大小: {} , 耗时: {} ", transactionLogPOList.size(), transactionLogPOMap.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch callRStopwatch = Stopwatch.createStarted();
String sql1 = "select * from call_record where created_at > '" + endTime + "' and created_at < '" + startTime + "' order by created_at desc;";
List<CallRecord1> queryResult = riskDatasourceJdbcTemplate.query(sql1, new BeanPropertyRowMapper<>(CallRecord1.class));
log.info("callRecord查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(queryResult) ? 0 : queryResult.size(), sql1, callRStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isNotEmpty(queryResult)) {
Stopwatch callStopwatch = Stopwatch.createStarted();
for (int call = 0; call < queryResult.size(); call++) {
CallRecord1 callRecord1 = queryResult.get(call);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(callRecord1.getCreated_at().getTime()));
String key = new StringBuffer(callRecord1.getTransactionId()).append(StringUtils.isNotBlank(callRecord1.getUuid()) ? callRecord1.getUuid() : "").append(callRecord1.getUrlType()).append(callRecord1.getCode()).append(format).toString();
callRecord1Map.put(key, callRecord1);
}
log.info("callRecord 组装数据完成 查询大小: {} , 组装后大小: {} , 耗时: {} ", queryResult.size(), callRecord1Map.size(), callStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch delStopwatch = Stopwatch.createStarted();
Iterator<Map.Entry<String, TransactionLogPO>> iterator = transactionLogPOMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionLogPO> next = iterator.next();
String key = next.getKey();
if (callRecord1Map.containsKey(key)) {
iterator.remove();
}
}
log.info("去重 组装数据完成 查询大小: {} , 组装后大小: {} , 耗时: {} ", queryResult.size(), transactionLogPOMap.size(), delStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
if (transactionLogPOMap.size()>0) {
List<CallRecord1> callRecord1s = new ArrayList<>();
Stopwatch stopwatch1222 = Stopwatch.createStarted();
transactionLogPOMap.forEach((k, v)->{
TransactionLogPO transactionLogPO = v;
CallRecord1 callRecord1 = new CallRecord1();
callRecord1.setCode(String.valueOf(transactionLogPO.getCode()));
callRecord1.setUrlType(transactionLogPO.getUrlType());
callRecord1.setTransactionId(transactionLogPO.getTransactionId());
callRecord1.setUuid(transactionLogPO.getUuid());
Timestamp time = new Timestamp(transactionLogPO.getTimeCreated().getTime());
callRecord1.setCreated_at(time);
callRecord1.setUpdated_at(time);
callRecord1s.add(callRecord1);
if (callRecord1s.size() > 0 && callRecord1s.size() % 1000 == 0) {
Stopwatch stopwatch1 = Stopwatch.createStarted();
JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
callRecord1s.clear();
log.info("插入数据完成, 耗时: {}", stopwatch1.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
if (callRecord1s.size() > 0) {
JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
}
log.info("一天数据插入完成, 大小: {} , 耗时: {} ", transactionLogPOMap.size(), stopwatch1222.stop().elapsed(TimeUnit.MILLISECONDS));
}
}
log.info("完事");
} catch (Exception e) {
log.error("清洗数据异常", e);
}
}*/
// @Async
// public void start() {
//
// try {
// String tmpTime = "2019-12-21";
// SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
// LocalDateTime now = new Timestamp(simpleDateFormat1.parse(tmpTime).getTime()).toLocalDateTime();
// for (int i = 0; i < 800; i++) {
// String startTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
// String endTime = now.minusDays(i + 1).format(DateTimeFormatter.ISO_DATE);
// String sql = "select * from transaction_log where time_created > '"+ endTime +"' and time_created < '"+startTime+"'";
// Stopwatch queryStopwatch = Stopwatch.createStarted();
// List<TransactionLogPO> transactionLogPOList = riskDatasourceJdbcTemplate.query(sql, new BeanPropertyRowMapper<>(TransactionLogPO.class));
// log.info("查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList)?0:transactionLogPOList.size(), sql, queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
// if(CollectionUtils.isEmpty(transactionLogPOList)){
// log.info("查询数据为空跳过");
// continue;
// }
// Stopwatch oneStopwatch = Stopwatch.createStarted();
// List<CallRecord1> callRecord1s = new ArrayList<>();
// for (int j = 0; j < transactionLogPOList.size(); j++) {
// try{
// TransactionLogPO transactionLogPO = transactionLogPOList.get(j);
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// String d = simpleDateFormat.format(transactionLogPO.getTimeCreated());
// String sql1 = "";
// if(StringUtils.isNotBlank(transactionLogPO.getUuid())){
// sql1 = "select * from call_record where transaction_id = '"+transactionLogPO.getTransactionId()+"' and uuid = '" + transactionLogPO.getUuid() + "' and url_type = '" + transactionLogPO.getUrlType() + "' and code = '" + transactionLogPO.getCode() + "' and created_at = '"+d+"';";
// }else{
// sql1 = "select * from call_record where transaction_id = '"+transactionLogPO.getTransactionId()+"' and url_type = '" + transactionLogPO.getUrlType() + "' and code = '" + transactionLogPO.getCode() + "' and created_at = '"+d+"';";;
// }
// List<CallRecord1> queryResult = riskDatasourceJdbcTemplate.query(sql1, new BeanPropertyRowMapper<>(CallRecord1.class));
// if(CollectionUtils.isEmpty(queryResult)){
//// log.info("查询结果不为空,重新插入 sql: {} , resutl: {} ", sql1, JSON.toJSONString(queryResult));
// CallRecord1 callRecord1 = new CallRecord1();
// callRecord1.setCode(String.valueOf(transactionLogPO.getCode()));
// callRecord1.setUrlType(transactionLogPO.getUrlType());
// callRecord1.setTransactionId(transactionLogPO.getTransactionId());
// callRecord1.setUuid(transactionLogPO.getUuid());
// Timestamp time = new Timestamp(transactionLogPO.getTimeCreated().getTime());
// callRecord1.setCreated_at(time);
// callRecord1.setUpdated_at(time);
//
// callRecord1s.add(callRecord1);
// if(callRecord1s.size()>0 && callRecord1s.size()%1000==0){
// Stopwatch stopwatch = Stopwatch.createStarted();
// JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
// callRecord1s.clear();
// log.info("插入数据完成, 耗时: {}", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
// }
// }else{
// log.info("命中原有数据不插入");
// }
//
// }catch(Exception e){
// log.error("清洗插入callRecord异常", e);
// }
//
// }
// if(callRecord1s.size()>0){
// JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
// log.info("插入数据完成1, startTime: {} , endTime: {} , 大小: {} , 耗时: {} ", startTime, endTime, transactionLogPOList.size(), oneStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
// }
//
// }
// log.info("完事");
// } catch (Exception e) {
// log.error("清洗数据异常", e);
// }
// }
/*
@Async
public void synCallRecord(String newYnrTime) {//yyyy-MM-dd
try {
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
LocalDateTime now = new Timestamp(simpleDateFormat1.parse(newYnrTime).getTime()).toLocalDateTime();
//开始时间 2018-10-17 11:30:39
//结束时间 2019-12-31 10:36:34
//相差440天
for (int i = 0; i < 500; i++) {
String startTime = now.minusDays(i+1).format(DateTimeFormatter.ISO_DATE);
String endTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
String TRANSACTION_SQL = "select transaction_id, uuid, url_type, code, time_created " +
" from transaction_log where time_created >= '" + startTime + "' and time_created < '" + endTime + "'";
Stopwatch queryStopwatch = Stopwatch.createStarted();
List<TransactionLogPO> transactionLogPOList = riskDatasourceJdbcTemplate.query(TRANSACTION_SQL, new BeanPropertyRowMapper<>(TransactionLogPO.class));
log.info("transactionLog查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList) ? 0 : transactionLogPOList.size(), TRANSACTION_SQL, queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isEmpty(transactionLogPOList)) {
log.info("查询数据为空跳过, startTime: {} , endTime: {} ",startTime, endTime);
continue;
}
Map<String, TransactionLogPO> transactionLogPOMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (int tran = 0; tran < transactionLogPOList.size(); tran++) {
TransactionLogPO transactionLogPO = transactionLogPOList.get(tran);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(transactionLogPO.getTimeCreated());
String key = new StringBuffer(transactionLogPO.getTransactionId())
.append(StringUtils.isNotBlank(transactionLogPO.getUuid()) ? transactionLogPO.getUuid() : "")
.append(transactionLogPO.getUrlType())
.append(transactionLogPO.getCode())
.append(format).toString();
transactionLogPOMap.put(key, transactionLogPO);
}
log.info("transactionLog组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, transactionLogPOList.size(), transactionLogPOMap.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch callRStopwatch = Stopwatch.createStarted();
String CALL_SQL = "select request_url, channel_type, channel_id, transaction_id, uuid, url_type, code, created_at, updated_at " +
" from call_record where created_at >= '" + startTime + "' and created_at < '" + endTime + "' order by created_at desc;";
List<CallRecord1> queryResult = riskDatasourceJdbcTemplate.query(CALL_SQL, new BeanPropertyRowMapper<>(CallRecord1.class));
log.info("callRecord查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(queryResult) ? 0 : queryResult.size(), CALL_SQL, callRStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Map<String, CallRecord1> callRecord1Map = new HashMap<>();
if (CollectionUtils.isNotEmpty(queryResult)) {
Stopwatch callStopwatch = Stopwatch.createStarted();
for (int call = 0; call < queryResult.size(); call++) {
CallRecord1 callRecord1 = queryResult.get(call);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(callRecord1.getCreated_at().getTime()));
String key = new StringBuffer(callRecord1.getTransactionId())
.append(StringUtils.isNotBlank(callRecord1.getUuid()) ? callRecord1.getUuid() : "")
.append(callRecord1.getUrlType())
.append(callRecord1.getCode())
.append(format).toString();
callRecord1Map.put(key, callRecord1);
}
log.info("callRecord组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, queryResult.size(), callRecord1Map.size(), callStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch delStopwatch = Stopwatch.createStarted();
Iterator<Map.Entry<String, TransactionLogPO>> iterator = transactionLogPOMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, TransactionLogPO> next = iterator.next();
String key = next.getKey();
if (callRecord1Map.containsKey(key)) {
iterator.remove();
}
}
log.info("去重组装数据完成, startTime: {} , callRecord查询大小: {} , 组装后transactionLogPOMap大小: {} , 耗时: {} ", startTime, queryResult.size(), transactionLogPOMap.size(), delStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
if (transactionLogPOMap.size()>0) {
List<CallRecord1> callRecord1s = new ArrayList<>();
Stopwatch stopwatch1222 = Stopwatch.createStarted();
transactionLogPOMap.forEach((k, v)->{
TransactionLogPO transactionLogPO = v;
//c.request_url, c.channel_type, c.channel_id
CallRecord1 callRecord1 = new CallRecord1();
callRecord1.setCode(String.valueOf(transactionLogPO.getCode()));
callRecord1.setUrlType(transactionLogPO.getUrlType());
callRecord1.setTransactionId(transactionLogPO.getTransactionId());
callRecord1.setUuid(transactionLogPO.getUuid());
Timestamp time = new Timestamp(transactionLogPO.getTimeCreated().getTime());
callRecord1.setCreated_at(time);
callRecord1.setUpdated_at(time);
callRecord1s.add(callRecord1);
if (callRecord1s.size() > 0 && callRecord1s.size() % 1000 == 0) {
Stopwatch stopwatch1 = Stopwatch.createStarted();
JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
callRecord1s.clear();
log.info("插入数据完成, 耗时: {}", stopwatch1.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
if (callRecord1s.size() > 0) {
JdbcUtils.prepareBatchUpdateExecuteTransactionid("INSERT INTO `call_record` (`transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?)", callRecord1s);
}
log.info("一天数据插入完成, 大小: {} , 耗时: {} ", transactionLogPOMap.size(), stopwatch1222.stop().elapsed(TimeUnit.MILLISECONDS));
}
}
log.info("完事");
} catch (Exception e) {
log.error("清洗数据异常", e);
}
}
*/
@Async
public void synCallRecordNew(String newYnrTime, String isExecuteOnce) {//yyyy-MM-dd
try {
boolean executeOnce = false;
if(StringUtils.isNotEmpty(isExecuteOnce) && "true".equals(isExecuteOnce)){
executeOnce = true;
}
SimpleDateFormat simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd");
LocalDateTime now = new Timestamp(simpleDateFormat1.parse(newYnrTime).getTime()).toLocalDateTime();
//开始时间 2018-10-17 11:30:39
//结束时间 2019-12-31 10:36:34
//相差440天
for (int i = 0; i < 500; i++) {
String startTime = now.minusDays(i+1).format(DateTimeFormatter.ISO_DATE);
String endTime = now.minusDays(i).format(DateTimeFormatter.ISO_DATE);
String TRANSACTION_SQL = "select request_url, channel_type, channel_id, transaction_id, uuid, url_type, code, created_at, updated_at " +
" from call_record where created_at >= '" + startTime + "' and created_at < '" + endTime + "'";
Stopwatch queryStopwatch = Stopwatch.createStarted();
List<CallRecord3> transactionLogPOList = riskDatasourceJdbcTemplate.query(TRANSACTION_SQL, new BeanPropertyRowMapper<>(CallRecord3.class));
log.info("transactionLog查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(transactionLogPOList) ? 0 : transactionLogPOList.size(), TRANSACTION_SQL, queryStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (CollectionUtils.isEmpty(transactionLogPOList)) {
log.info("查询数据为空跳过, startTime: {} , endTime: {} ",startTime, endTime);
continue;
}
Map<String, CallRecord3> transactionLogPOMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createStarted();
for (int tran = 0; tran < transactionLogPOList.size(); tran++) {
CallRecord3 transactionLogPO = transactionLogPOList.get(tran);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(transactionLogPO.getCreatedAt());
String key = new StringBuffer(transactionLogPO.getTransactionId())
.append(StringUtils.isNotBlank(transactionLogPO.getUuid()) ? transactionLogPO.getUuid() : "")
.append(transactionLogPO.getUrlType())
.append(transactionLogPO.getCode())
.append(format).toString();
transactionLogPOMap.put(key, transactionLogPO);
}
log.info("transactionLog组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, transactionLogPOList.size(), transactionLogPOMap.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch callRStopwatch = Stopwatch.createStarted();
String CALL_SQL = "select transaction_id, uuid, url_type, code, created_at " +
" from call_record where created_at >= '" + startTime + "' and created_at < '" + endTime + "' order by created_at desc;";
List<CallRecord1> queryResult = tidbRiskJdbcTemplate.query(CALL_SQL, new BeanPropertyRowMapper<>(CallRecord1.class));
log.info("callRecord查询数据结束, startTime: {} , endTime: {} , size: {} , sql: {} , 耗时: {} ", startTime, endTime, CollectionUtils.isEmpty(queryResult) ? 0 : queryResult.size(), CALL_SQL, callRStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Map<String, CallRecord1> callRecord1Map = new HashMap<>();
if (CollectionUtils.isNotEmpty(queryResult)) {
Stopwatch callStopwatch = Stopwatch.createStarted();
for (int call = 0; call < queryResult.size(); call++) {
CallRecord1 callRecord1 = queryResult.get(call);
String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(callRecord1.getCreated_at().getTime()));
String key = new StringBuffer(callRecord1.getTransactionId())
.append(StringUtils.isNotBlank(callRecord1.getUuid()) ? callRecord1.getUuid() : "")
.append(callRecord1.getUrlType())
.append(callRecord1.getCode())
.append(format).toString();
callRecord1Map.put(key, callRecord1);
}
log.info("callRecord组装数据完成, startTime: {} , 查询大小: {} , 组装后大小: {} , 耗时: {} ", startTime, queryResult.size(), callRecord1Map.size(), callStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
Stopwatch delStopwatch = Stopwatch.createStarted();
Iterator<Map.Entry<String, CallRecord3>> iterator = transactionLogPOMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, CallRecord3> next = iterator.next();
String key = next.getKey();
if (callRecord1Map.containsKey(key)) {
iterator.remove();
}
}
log.info("去重组装数据完成, startTime: {} , callRecord查询大小: {} , 组装后transactionLogPOMap大小: {} , 耗时: {} ", startTime, queryResult.size(), transactionLogPOMap.size(), delStopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
if (transactionLogPOMap.size()>0) {
List<CallRecord3> callRecord3List = new ArrayList<>();
Stopwatch stopwatch1222 = Stopwatch.createStarted();
AtomicInteger pageNum = new AtomicInteger();
String INSERT_SQL = "INSERT INTO `call_record` (`request_url`, `channel_type`, `channel_id`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ";
transactionLogPOMap.forEach((k, v)->{
callRecord3List.add(v);
if (callRecord3List.size() > 0 && callRecord3List.size() % 1000 == 0) {
Stopwatch stopwatch1 = Stopwatch.createStarted();
JdbcUtils.batchUpdateExecuteCallRecord(tidbRiskDataSource,INSERT_SQL, callRecord3List);
callRecord3List.clear();
pageNum.getAndIncrement();
log.info("插入数据完成, pageNum: {} , 耗时: {} ", pageNum.get(), stopwatch1.stop().elapsed(TimeUnit.MILLISECONDS));
}
});
if (callRecord3List.size() > 0) {
JdbcUtils.batchUpdateExecuteCallRecord(tidbRiskDataSource,INSERT_SQL, callRecord3List);
pageNum.getAndIncrement();
}
log.info("一天数据插入完成, startTime: {} , pageNum: {} , 大小: {} , 耗时: {} ",startTime, pageNum.get(), transactionLogPOMap.size(), stopwatch1222.stop().elapsed(TimeUnit.MILLISECONDS));
}else{
log.info("一天数据插入完成, startTime: {} , transactionLogPOMap size is Empty!",startTime);
}
if(executeOnce){
log.info("一天数据插入完成, newYnrTime: {} , isExecuteOnce: {} , startTime: {} , endTime: {} ,",newYnrTime, isExecuteOnce, startTime, endTime);
break;
}
}
log.info("----All清洗数据结束----");
} catch (Exception e) {
log.error("清洗数据异常", e);
}
}
}
package cn.quantgroup.report.utils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
/**
* Created by renfeng on 2019/4/22.
*/
public enum HikDataSource {
HIK_DATA_SOURCE;
public DataSource dataSourceJdbc;
public DataSource dataSource2;
HikDataSource(){
if(dataSourceJdbc==null){
System.out.println("======创建数据库连接dataSource1======");
HikariConfig config = new HikariConfig();
// config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v2_w");
// config.setPassword("w9pr8IPJkLmUSBe4");
//以下是腾讯云的tidb的配置
// config.setJdbcUrl("jdbc:mysql://172.18.3.21:4010/rc_comservice_data_pool_v3?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v3_w");
// config.setPassword("L4letZzT1LmPCxAt");
// config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v2_w");
// config.setPassword("w9pr8IPJkLmUSBe4");
// config.setJdbcUrl("jdbc:mysql://xyqb-app-db.quantgroups.com:6607/business_flow?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("business_flow_w");
// config.setPassword("w5XM330jD7kyR8ZH");
// config.setDriverClassName("com.mysql.jdbc.Driver");
// config.setMaximumPoolSize(50);
// config.setMinimumIdle(20);
// config.addDataSourceProperty("cachePrepStmts", "true");
// config.addDataSourceProperty("prepStmtCacheSize", "250");
// config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
// dataSource = new HikariDataSource(config);
config.setJdbcUrl("jdbc:mysql://xyqb-rule-db.quantgroups.com:6606/risk_datasource?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("risk_datasource_w");
config.setPassword("50GjQLd6hUOSeTMB");
// config.setJdbcUrl("jdbc:mysql://172.30.220.9:3306/project?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("qa");
// config.setPassword("qatest");
config.setDriverClassName("com.mysql.jdbc.Driver");
config.setMaximumPoolSize(50);
config.setMinimumIdle(20);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSourceJdbc = new HikariDataSource(config);
System.out.println("创建数据库连接完毕");
}
// if(dataSource2==null){
// System.out.println("======创建数据库连接dataSource2======");
// HikariConfig config = new HikariConfig();
// config.setJdbcUrl("jdbc:mysql://172.20.6.29:4010/rc_real_time_data_pool?useUnicode=true&characterEncoding=UTF8");
// config.setDriverClassName("com.mysql.jdbc.Driver");
//// config.setUsername("rc_comservice_data_pool_v2_w");
//// config.setPassword("w9pr8IPJkLmUSBe4");
// config.setUsername("rc_real_time_data_pool_r");
// config.setPassword("0lSEnS1HtvysJbP5");
// config.setMaximumPoolSize(5);
// config.setMinimumIdle(5);
// config.addDataSourceProperty("cachePrepStmts", "true");
// config.addDataSourceProperty("prepStmtCacheSize", "250");
// config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
// dataSource2 = new HikariDataSource(config);
// }else{
// System.out.println("======无需在创建======");
// }
}
}
package cn.quantgroup.report.utils;
import cn.quantgroup.report.domain.tidbrisk.CallRecord3;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import javax.sql.DataSource;
import java.sql.*;
import java.util.List;
/**
* @Author fengjunkai
*/
@Slf4j
public class JdbcUtils {
/*
public static void prepareBatchUpdateExecuteTransactionid(String sql, List<CallRecord1> callRecord1s){
Connection conn = null;
PreparedStatement ps = null;
try {
conn = HIK_DATA_SOURCE.dataSourceJdbc.getConnection();
ps = conn.prepareStatement(sql);
conn.setAutoCommit(false);
// String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
for (int i = 0; i < callRecord1s.size(); i++) {
CallRecord1 callRecord1 = callRecord1s.get(i);
ps.setString(1, callRecord1.getTransactionId());
ps.setString(2, callRecord1.getUuid());
ps.setString(3, callRecord1.getUrlType());
ps.setString(4, callRecord1.getCode());
ps.setTimestamp(5, callRecord1.getCreated_at());
ps.setTimestamp(6, callRecord1.getUpdated_at());
ps.addBatch();
}
ps.executeBatch();
conn.commit();
}catch(Exception e){
System.out.println("======执行sqlException异常======"+sql+"\r\n");
System.out.println("");
e.printStackTrace();
}finally {
close(conn, ps,null);
}
}
*/
private static void close(Connection conn, Statement st, ResultSet rs){
try {
if(rs!=null)
rs.close();
if(st!=null)
st.close();
if(conn!=null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void batchUpdateExecuteCallRecord(DataSource dataSourceJdbc, String sql, List<CallRecord3> callRecord3List){
Connection conn = null;
PreparedStatement ps = null;
try {
conn = dataSourceJdbc.getConnection();
ps = conn.prepareStatement(sql);
conn.setAutoCommit(false);
//"INSERT INTO `call_record` (`request_url`, `channel_type`, `channel_id`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);"
for (int i = 0; i < callRecord3List.size(); i++) {
CallRecord3 callRecord3 = callRecord3List.get(i);
ps.setString(1, callRecord3.getRequestUrl());
ps.setString(2, callRecord3.getChannelType());
ps.setString(3, callRecord3.getChannelId());
ps.setString(4, callRecord3.getTransactionId());
ps.setString(5, callRecord3.getUuid());
ps.setString(6, callRecord3.getUrlType());
ps.setInt(7, callRecord3.getCode());
ps.setTimestamp(8, callRecord3.getCreatedAt());
ps.setTimestamp(9, callRecord3.getUpdatedAt());
ps.addBatch();
}
ps.executeBatch();
conn.commit();
}catch(Exception e){
log.error("======执行batchUpdateExecuteCallRecord异常======"+sql+"\r\n",e);
log.error("方法batchUpdateExecuteCallRecord异常, DATA: {} \n", JSON.toJSON(callRecord3List));
e.printStackTrace();
}finally {
close(conn, ps,null);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.quantgroup.report.mapper.tidbrisk.CallRecordMapper">
<!--
<resultMap id="BaseResultMap" type="cn.quantgroup.risk.datasource.domain.master.TransactionLogPO">
<result column="transaction_id" jdbcType="VARCHAR" property="transactionId" />
<result column="uuid" jdbcType="VARCHAR" property="uuid" />
<result column="url_type" jdbcType="VARCHAR" property="urlType" />
<result column="code" jdbcType="VARCHAR" property="code" />
<result column="time_created" jdbcType="TIMESTAMP" property="timeCreated" />
</resultMap>
<insert id="insert" parameterType="cn.quantgroup.risk.datasource.domain.master.TransactionLogPO">
insert into transaction_log (transaction_id, uuid, url_type, code, time_created)
values (#{transactionId,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{urlType,jdbcType=VARCHAR}, #{code,jdbcType=INTEGER},
#{timeCreated,jdbcType=TIMESTAMP}
)
</insert>
-->
<insert id="insertCallRecord" parameterType="cn.quantgroup.report.domain.tidbrisk.CallRecord2">
INSERT INTO call_record (channel_type,channel_id,transaction_id,uuid,url_type,`code` ,request_url,created_at, updated_at)
VALUES
( #{channelType,jdbcType=VARCHAR},
#{channelId,jdbcType=VARCHAR},
#{transactionId,jdbcType=VARCHAR},
#{uuid,jdbcType=VARCHAR},
#{urlType,jdbcType=VARCHAR},
#{code,jdbcType=INTEGER},
#{requestUrl,jdbcType=VARCHAR},
now(),
now()
)
</insert>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment