Commit 8cb9fb73 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

transactionLog 数据清洗 main类修改12345678912

parent b296ffe7
...@@ -27,6 +27,7 @@ public class CleanningTransactionLogMain { ...@@ -27,6 +27,7 @@ public class CleanningTransactionLogMain {
System.out.println("读取hdfsPath完毕: "+JSON.toJSONString(hdfsArr)); System.out.println("读取hdfsPath完毕: "+JSON.toJSONString(hdfsArr));
Dataset<String> dataset = ss.read().textFile(hdfsArr); Dataset<String> dataset = ss.read().textFile(hdfsArr);
System.out.println("dataset: "+dataset.count());
dataset.repartition(4).foreachPartition(func -> { dataset.repartition(4).foreachPartition(func -> {
System.out.println("开始执行数据清洗"); System.out.println("开始执行数据清洗");
ArrayList<TransactionLog> transactionLogs = new ArrayList<>(); ArrayList<TransactionLog> transactionLogs = new ArrayList<>();
...@@ -44,12 +45,13 @@ public class CleanningTransactionLogMain { ...@@ -44,12 +45,13 @@ public class CleanningTransactionLogMain {
transactionLog.setUuid(split[1]); transactionLog.setUuid(split[1]);
transactionLog.setUrl_type(split[2]); transactionLog.setUrl_type(split[2]);
transactionLog.setUpdated_at(timestamp); transactionLog.setUpdated_at(timestamp);
transactionLogs.add(transactionLog);
if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) { if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) {
System.out.println("执行sql集合: "+transactionLogs.size()); System.out.println("执行sql集合: "+transactionLogs.size());
JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs); JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
transactionLogs.clear(); transactionLogs.clear();
} }
} catch (Exception e) { } catch (Exception e) {
System.out.println("单个数据拼装异常: "+item); System.out.println("单个数据拼装异常: "+item);
e.printStackTrace(); e.printStackTrace();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment