Commit 8c4020b0 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

申请单数据清洗test

parent 654608c7
......@@ -12,6 +12,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
/**
* @Author fengjunkai
......@@ -85,23 +86,28 @@ public class ApplyListExportToBizFlowMain {
System.out.println("创建三张表视图完成");
SQLContext sqlContext = ss.sqlContext();
// String sql = "INSERT INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`) VALUES (?,?,?,?,?,?,?,?)";
String sql = "INSERT INTO `apply_list` (`apply_no`) VALUES (?)";
String sql = "INSERT INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`) VALUES (?,?,?,?,?,?,?,?)";
// String sql = "INSERT INTO `apply_list` (`apply_no`) VALUES (?)";
Dataset<Row> dataset = sqlContext.sql("select a.product_type,a.channel,a.order_no,a.receive_at,a.credit_status,b.apply_time,b.apply_status,b.risk_notify_time,c.uuid from quotaCreditDataSetMapView a left join applyQuotaRecordDataSetMapView b on a.order_no = b.order_no left join userInfoListDataSetMapView c on b.user_id = c.id");
System.out.println("dataset" + dataset.count());
LongAccumulator longAccumulator = ss.sparkContext().longAccumulator();
LongAccumulator channelLongAccumulator = ss.sparkContext().longAccumulator();
LongAccumulator productTypeLongAccumulator = ss.sparkContext().longAccumulator();
LongAccumulator uuidLongAccumulator = ss.sparkContext().longAccumulator();
LongAccumulator orderNoLongAccumulator = ss.sparkContext().longAccumulator();
LongAccumulator applyStatusLongAccumulator = ss.sparkContext().longAccumulator();
dataset.foreachPartition(line -> {
List<ApplyListRow> applyListRows = new ArrayList<>(1000);
line.forEachRemaining(result -> {
try {
ApplyListRow applyListRow = new ApplyListRow();
// ApplyListRow applyListRow = new ApplyListRow();
// applyListRow.setChannelGroupNo(result.getAs("channel"));
// applyListRow.setChannelId(result.getAs("channel"));
// applyListRow.setProductType(result.getAs("product_type"));
// applyListRow.setUuid(result.getAs("uuid"));
applyListRow.setApplyNo(result.getAs("order_no"));
//
// applyListRow.setApplyNo(result.getAs("order_no"));
////
// String credit_status = result.getAs("credit_status");
// applyListRow.setApplyStatus(getApplyStatus(credit_status, result.getAs("apply_status")));
//
......@@ -113,23 +119,54 @@ public class ApplyListExportToBizFlowMain {
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// Date parse = dateFormat.parse(str);
// applyListRow.setApplySubmitTime(receive_at.compareTo(new Timestamp(parse.getTime())) == 0 ? null : receive_at);
applyListRows.add(applyListRow);
longAccumulator.add(1);
if (applyListRows.size() != 0 && applyListRows.size() % 200 == 0) {
JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
applyListRows.clear();
// applyListRows.add(applyListRow);
//
// longAccumulator.add(1);
//
// if (applyListRows.size() != 0 && applyListRows.size() % 200 == 0) {
// JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
// applyListRows.clear();
// }
Object channel = result.getAs("channel");
if(Objects.isNull(channel)){
System.out.println("channel 为空--->"+result.toString());
channelLongAccumulator.add(1);
}
Object product_type = result.getAs("product_type");
if(Objects.isNull(product_type)){
System.out.println("product_type 为空--->"+result.toString());
productTypeLongAccumulator.add(1);
}
Object uuid = result.getAs("uuid");
if(Objects.isNull(uuid)){
System.out.println("uuid 为空--->"+result.toString());
uuidLongAccumulator.add(1);
}
Object order_no = result.getAs("order_no");
if(Objects.isNull(order_no)){
System.out.println("order_no 为空--->"+result.toString());
orderNoLongAccumulator.add(1);
}
Object apply_status = result.getAs("apply_status");
if(Objects.isNull(apply_status)){
System.out.println("apply_status 为空--->"+result.toString());
applyStatusLongAccumulator.add(1);
}
} catch (Exception e) {
System.out.println("申请单清洗异常" + result.toString() + "----" + JSON.toJSONString(applyListRows));
e.printStackTrace();
}
});
JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
// JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
});
System.out.println("longAccumulator大小:"+longAccumulator.count());
System.out.println("longAccumulator 大小:"+longAccumulator.count());
System.out.println("channelLongAccumulator 大小:"+channelLongAccumulator.count());
System.out.println("productTypeLongAccumulator 大小:"+productTypeLongAccumulator.count());
System.out.println("uuidLongAccumulator 大小:"+uuidLongAccumulator.count());
System.out.println("orderNoLongAccumulator 大小:"+orderNoLongAccumulator.count());
System.out.println("applyStatusLongAccumulator 大小:"+applyStatusLongAccumulator.count());
ss.stop();
System.out.println("完事");
......
......@@ -127,15 +127,15 @@ public class JdbcExecuters {
conn.setAutoCommit(false);
for (int i = 0; i < applyListRows.size(); i++) {
ApplyListRow applyListRow = applyListRows.get(i);
ps.setString(1, applyListRow.getApplyNo());
// ps.setString(1, applyListRow.getChannelGroupNo());
// ps.setString(2, applyListRow.getChannelId());
// ps.setString(3, applyListRow.getProductType());
// ps.setString(4, applyListRow.getUuid());
// ps.setString(5, applyListRow.getApplyNo());
// ps.setString(6, applyListRow.getApplyStatus());
// ps.setString(7, applyListRow.getAuditResult());
// ps.setTimestamp(8, applyListRow.getApplySubmitTime()==null?null:applyListRow.getApplySubmitTime());
// ps.setString(1, applyListRow.getApplyNo());
ps.setString(1, applyListRow.getChannelGroupNo());
ps.setString(2, applyListRow.getChannelId());
ps.setString(3, applyListRow.getProductType());
ps.setString(4, applyListRow.getUuid());
ps.setString(5, applyListRow.getApplyNo());
ps.setString(6, applyListRow.getApplyStatus());
ps.setString(7, applyListRow.getAuditResult());
ps.setTimestamp(8, applyListRow.getApplySubmitTime()==null?null:applyListRow.getApplySubmitTime());
ps.addBatch();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment