Commit 022284df authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

申请单数据清洗test

parent dedaaf8b
...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON; ...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.util.LongAccumulator;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
...@@ -88,6 +89,7 @@ public class ApplyListExportToBizFlowMain { ...@@ -88,6 +89,7 @@ public class ApplyListExportToBizFlowMain {
Dataset<Row> dataset = sqlContext.sql("select a.product_type,a.channel,a.order_no,a.receive_at,a.credit_status,b.apply_time,b.apply_status,b.risk_notify_time,c.uuid from quotaCreditDataSetMapView a left join applyQuotaRecordDataSetMapView b on a.order_no = b.order_no left join userInfoListDataSetMapView c on b.user_id = c.id"); Dataset<Row> dataset = sqlContext.sql("select a.product_type,a.channel,a.order_no,a.receive_at,a.credit_status,b.apply_time,b.apply_status,b.risk_notify_time,c.uuid from quotaCreditDataSetMapView a left join applyQuotaRecordDataSetMapView b on a.order_no = b.order_no left join userInfoListDataSetMapView c on b.user_id = c.id");
System.out.println("dataset" + dataset.count()); System.out.println("dataset" + dataset.count());
LongAccumulator longAccumulator = ss.sparkContext().longAccumulator();
dataset.foreachPartition(line -> { dataset.foreachPartition(line -> {
List<ApplyListRow> applyListRows = new ArrayList<>(1000); List<ApplyListRow> applyListRows = new ArrayList<>(1000);
line.forEachRemaining(result -> { line.forEachRemaining(result -> {
...@@ -112,19 +114,21 @@ public class ApplyListExportToBizFlowMain { ...@@ -112,19 +114,21 @@ public class ApplyListExportToBizFlowMain {
applyListRow.setApplySubmitTime(receive_at.compareTo(new Timestamp(parse.getTime())) == 0 ? null : receive_at); applyListRow.setApplySubmitTime(receive_at.compareTo(new Timestamp(parse.getTime())) == 0 ? null : receive_at);
applyListRows.add(applyListRow); applyListRows.add(applyListRow);
longAccumulator.add(1);
if (applyListRows.size() != 0 && applyListRows.size() % 200 == 0) { // if (applyListRows.size() != 0 && applyListRows.size() % 200 == 0) {
JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows); // JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
applyListRows.clear(); // applyListRows.clear();
} // }
} catch (Exception e) { } catch (Exception e) {
System.out.println("申请单清洗异常" + result.toString() + "----" + JSON.toJSONString(applyListRows)); System.out.println("申请单清洗异常" + result.toString() + "----" + JSON.toJSONString(applyListRows));
e.printStackTrace(); e.printStackTrace();
} }
}); });
JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows); // JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
}); });
System.out.println("longAccumulator大小:"+longAccumulator.count());
ss.stop(); ss.stop();
System.out.println("完事"); System.out.println("完事");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment