Commit 7d8f118e authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

transactionLog 数据清洗 main类修改12

parent 903bc8db
......@@ -21,52 +21,42 @@ public class CleanningTransactionLogMain {
public static void main(String[] args) {
SparkSession ss = SparkSession.builder().appName("DBC").getOrCreate();
System.out.println("开始加载数据");
List<String> textFileList = new ArrayList<>();
String hdfsPath = "hdfs:///app/user/data/user_data/feng.ren/transactionLog20191226/id=%s/part-m-00000";
List<String> argsArray = Arrays.asList(args).subList(2, args.length);
for (int i = 0; i < argsArray.size(); i++) {
textFileList.add(String.format(hdfsPath, argsArray.get(i)));
String hdfsPath = "hdfs:///app/user/data/user_data/feng.ren/transactionLog20191226/id=";
String[] hdfsArr = new String[args.length];
for (int i = 0; i < args.length; i++) {
hdfsArr[i] = hdfsPath + args[i];
}
for(int i=0;i<textFileList.size();i++){
System.out.println("读取hdfs地址文件: "+ textFileList.get(i));
}
System.out.println("读取hdfsPath完毕: "+JSON.toJSONString(hdfsArr));
Dataset<String> dataset = ss.read().textFile(hdfsArr);
dataset.repartition(4).foreachPartition(func -> {
Map<String, String[]> textFileMap = PagingUtil.paging(textFileList, Integer.valueOf(args[1]));
ArrayList<TransactionLog> transactionLogs = new ArrayList<>();
String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
func.forEachRemaining(item -> {
try {
String[] split = item.split("\t");
TransactionLog transactionLog = new TransactionLog();
transactionLog.setCode(split[3]);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Timestamp timestamp = new Timestamp(simpleDateFormat.parse(split[4]).getTime());
transactionLog.setCreated_at(timestamp);
transactionLog.setTransaction_id(split[0]);
transactionLog.setUuid(split[1]);
transactionLog.setUrl_type(split[2]);
transactionLog.setUpdated_at(timestamp);
textFileMap.forEach((k, v) -> {
System.out.println("======" + k + "读取的hdfs======" + JSON.toJSONString(v));
// Dataset<String> dataset = ss.read().textFile(v);
//
// List<TransactionLog> transactionLogs = new ArrayList<>();
// String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
// dataset.foreach(o -> {
// try {
// String[] split = o.split("\t");
// TransactionLog transactionLog = new TransactionLog();
// transactionLog.setCode(split[3]);
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Timestamp timestamp = new Timestamp(simpleDateFormat.parse(split[4]).getTime());
// transactionLog.setCreated_at(timestamp);
// transactionLog.setTransaction_id(split[0]);
// transactionLog.setUuid(split[1]);
// transactionLog.setUrl_type(split[2]);
// transactionLog.setUpdated_at(timestamp);
//
//
// if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) {
//// JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
// System.out.println("执行sql集合: "+JSON.toJSONString(transactionLogs));
// transactionLogs.clear();
// }else{
// transactionLogs.add(transactionLog);
// }
// } catch (Exception e) {
// System.out.println("单个数据拼装异常: "+o);
// e.printStackTrace();
// }
// });
if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) {
// JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
System.out.println("执行sql集合: "+JSON.toJSONString(transactionLogs));
transactionLogs.clear();
}
} catch (Exception e) {
System.out.println("单个数据拼装异常: "+item);
e.printStackTrace();
}
});
// JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment