Commit c8f76dc1 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

transactionLog 数据清洗

parent 78a25d8e
Pipeline #411 canceled with stages
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<!--<version>unicom-call-detail-info</version>--> <!--<version>unicom-call-detail-info</version>-->
<!--<version>2-telecom-detail-info</version>--> <!--<version>2-telecom-detail-info</version>-->
<!--<version>1.0</version>--> <!--<version>1.0</version>-->
<version>apply-list-info</version> <version>transactionLog</version>
<!--<version>telecom-sms-detail-info</version>--> <!--<version>telecom-sms-detail-info</version>-->
<!--<version>mobile-sms-detail-info</version>--> <!--<version>mobile-sms-detail-info</version>-->
<!--<version>unicom-sms-detail-info</version>--> <!--<version>unicom-sms-detail-info</version>-->
......
package cn.quantgroup.dbc.spark.transactionlog;
import cn.quantgroup.dbc.utils.JdbcExecuters;
import cn.quantgroup.dbc.utils.PagingUtil;
import com.alibaba.fastjson.JSON;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* @Author fengjunkai
*/
public class CleanningTransactionLogMain {
public static void main(String[] args) {
SparkSession ss = SparkSession.builder().appName("DBC").getOrCreate();
System.out.println("开始加载数据");
List<String> textFileList = new ArrayList<>();
String hdfsPath = "hdfs:///app/user/data/user_data/feng.ren/transactionLog20191226/id=%s/part-m-00000";
List<String> argsArray = Arrays.asList(args).subList(2, args.length);
for (int i = 0; i < argsArray.size(); i++) {
textFileList.add(String.format(hdfsPath, argsArray.get(i)));
}
for(int i=0;i<textFileList.size();i++){
System.out.println("读取hdfs地址文件: "+ textFileList.get(i));
}
Map<String, String[]> textFileMap = PagingUtil.paging(textFileList, Integer.valueOf(args[1]));
textFileMap.forEach((k, v) -> {
System.out.println("======" + k + "读取的hdfs======" + JSON.toJSONString(v));
Dataset<String> dataset = ss.read().textFile(v);
List<TransactionLog> transactionLogs = new ArrayList<>();
String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
dataset.foreach(o -> {
try {
String[] split = o.split("\t");
TransactionLog transactionLog = new TransactionLog();
transactionLog.setCode(split[3]);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Timestamp timestamp = new Timestamp(simpleDateFormat.parse(split[4]).getTime());
transactionLog.setCreated_at(timestamp);
transactionLog.setTransaction_id(split[0]);
transactionLog.setUuid(split[1]);
transactionLog.setUrl_type(split[2]);
transactionLog.setUpdated_at(timestamp);
if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) {
// JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
System.out.println("执行sql集合: "+JSON.toJSONString(transactionLogs));
transactionLogs.clear();
}else{
transactionLogs.add(transactionLog);
}
} catch (Exception e) {
System.out.println("单个数据拼装异常: "+o);
e.printStackTrace();
}
});
JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
});
ss.stop();
System.out.println("完事");
}
}
package cn.quantgroup.dbc.spark.transactionlog;
import cn.quantgroup.dbc.utils.JdbcExecuters;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
/**
* @Author fengjunkai
*/
public class TransactionLog {
private String transaction_id;
private String uuid;
private String url_type;
private String code;
private Timestamp created_at;
private Timestamp updated_at;
public Timestamp getCreated_at() {
return created_at;
}
public void setCreated_at(Timestamp created_at) {
this.created_at = created_at;
}
public Timestamp getUpdated_at() {
return updated_at;
}
public void setUpdated_at(Timestamp updated_at) {
this.updated_at = updated_at;
}
public String getTransaction_id() {
return transaction_id;
}
public void setTransaction_id(String transaction_id) {
this.transaction_id = transaction_id;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getUrl_type() {
return url_type;
}
public void setUrl_type(String url_type) {
this.url_type = url_type;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public static void main(String[] args) {
try{
String str = "5a3b9545edaf4d0a968b3725229758b4\t6352705c-8f53-4470-9db5-cc26f38d1ec8\tBaiRongSpecialList_c\t1002\t2019-01-08 09:58:35.0";
String[] tmp = str.split("\t");
TransactionLog transactionLog = new TransactionLog();
transactionLog.setCode(tmp[3]);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Timestamp timestamp = new Timestamp(simpleDateFormat.parse(tmp[4]).getTime());
transactionLog.setCreated_at(timestamp);
transactionLog.setTransaction_id(tmp[0]);
transactionLog.setUuid(tmp[1]);
transactionLog.setUrl_type(tmp[2]);
transactionLog.setUpdated_at(timestamp);
List<TransactionLog> logs = new ArrayList<>();
logs.add(transactionLog);
String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, logs);
}catch(Exception e){
e.printStackTrace();
}
}
}
...@@ -31,9 +31,19 @@ public enum HikDataSource { ...@@ -31,9 +31,19 @@ public enum HikDataSource {
// config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true"); // config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v2_w"); // config.setUsername("rc_comservice_data_pool_v2_w");
// config.setPassword("w9pr8IPJkLmUSBe4"); // config.setPassword("w9pr8IPJkLmUSBe4");
config.setJdbcUrl("jdbc:mysql://xyqb-app-db.quantgroups.com:6607/business_flow?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true"); // config.setJdbcUrl("jdbc:mysql://xyqb-app-db.quantgroups.com:6607/business_flow?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("business_flow_w"); // config.setUsername("business_flow_w");
config.setPassword("w5XM330jD7kyR8ZH"); // config.setPassword("w5XM330jD7kyR8ZH");
// config.setDriverClassName("com.mysql.jdbc.Driver");
// config.setMaximumPoolSize(50);
// config.setMinimumIdle(20);
// config.addDataSourceProperty("cachePrepStmts", "true");
// config.addDataSourceProperty("prepStmtCacheSize", "250");
// config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
// dataSource = new HikariDataSource(config);
config.setJdbcUrl("jdbc:mysql://172.30.220.9:3306/project?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("qa");
config.setPassword("qatest");
config.setDriverClassName("com.mysql.jdbc.Driver"); config.setDriverClassName("com.mysql.jdbc.Driver");
config.setMaximumPoolSize(50); config.setMaximumPoolSize(50);
config.setMinimumIdle(20); config.setMinimumIdle(20);
......
package cn.quantgroup.dbc.utils; package cn.quantgroup.dbc.utils;
import cn.quantgroup.dbc.spark.applyList.ApplyListRow; import cn.quantgroup.dbc.spark.applyList.ApplyListRow;
import cn.quantgroup.dbc.spark.transactionlog.TransactionLog;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.*; import java.sql.*;
...@@ -160,6 +161,35 @@ public class JdbcExecuters { ...@@ -160,6 +161,35 @@ public class JdbcExecuters {
} }
} }
public static void prepareBatchUpdateExecuteTransactionid(String sql, List<TransactionLog> transactionLogs){
Connection conn = null;
PreparedStatement ps = null;
try {
conn = HIK_DATA_SOURCE.dataSource.getConnection();
ps = conn.prepareStatement(sql);
conn.setAutoCommit(false);
for (int i = 0; i < transactionLogs.size(); i++) {
TransactionLog transactionLog = transactionLogs.get(i);
ps.setString(1, "");
ps.setString(2, transactionLog.getTransaction_id());
ps.setString(3, transactionLog.getUuid());
ps.setString(4, transactionLog.getUrl_type());
ps.setString(5, transactionLog.getCode());
ps.setTimestamp(6, transactionLog.getCreated_at());
ps.setTimestamp(7, transactionLog.getUpdated_at());
ps.addBatch();
}
ps.executeBatch();
conn.commit();
}catch(Exception e){
System.out.println("======执行sqlException异常======"+sql+"\r\n");
System.out.println("");
e.printStackTrace();
}finally {
close(conn, ps,null);
}
}
/** /**
* 批量插入 * 批量插入
* @param sql * @param sql
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment