Commit 24636ce9 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

transactionLog 数据清洗 main类修改123456789121

parent 8cb9fb73
...@@ -27,12 +27,12 @@ public class CleanningTransactionLogMain { ...@@ -27,12 +27,12 @@ public class CleanningTransactionLogMain {
System.out.println("读取hdfsPath完毕: "+JSON.toJSONString(hdfsArr)); System.out.println("读取hdfsPath完毕: "+JSON.toJSONString(hdfsArr));
Dataset<String> dataset = ss.read().textFile(hdfsArr); Dataset<String> dataset = ss.read().textFile(hdfsArr);
System.out.println("dataset: "+dataset.count()); System.out.println("dataset"+dataset.count());
dataset.repartition(4).foreachPartition(func -> { dataset.repartition(4).foreachPartition(line -> {
System.out.println("开始执行数据清洗"); System.out.println("开始执行数据清洗");
ArrayList<TransactionLog> transactionLogs = new ArrayList<>(); ArrayList<TransactionLog> transactionLogs = new ArrayList<>(500);
String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)"; String sql = "INSERT INTO `call_record` (`request_url`, `transaction_id`, `uuid`, `url_type`, `code`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?)";
func.forEachRemaining(item -> { line.forEachRemaining(item -> {
// System.out.println("单条数据清洗"); // System.out.println("单条数据清洗");
try { try {
String[] split = item.split("\t"); String[] split = item.split("\t");
...@@ -46,6 +46,7 @@ public class CleanningTransactionLogMain { ...@@ -46,6 +46,7 @@ public class CleanningTransactionLogMain {
transactionLog.setUrl_type(split[2]); transactionLog.setUrl_type(split[2]);
transactionLog.setUpdated_at(timestamp); transactionLog.setUpdated_at(timestamp);
transactionLogs.add(transactionLog); transactionLogs.add(transactionLog);
System.out.println("调用日志大小 "+transactionLogs.size());
if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) { if (transactionLogs.size() != 0 && transactionLogs.size() % 200 == 0) {
System.out.println("执行sql集合: "+transactionLogs.size()); System.out.println("执行sql集合: "+transactionLogs.size());
JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs); JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
...@@ -57,6 +58,7 @@ public class CleanningTransactionLogMain { ...@@ -57,6 +58,7 @@ public class CleanningTransactionLogMain {
e.printStackTrace(); e.printStackTrace();
} }
}); });
System.out.println("调用日志大小 "+transactionLogs.size());
JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs); JdbcExecuters.prepareBatchUpdateExecuteTransactionid(sql, transactionLogs);
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment