Commit 312b4af9 authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

申请单数据清洗test

parent a0d30d04
...@@ -95,13 +95,13 @@ public class ApplyListExportToBizFlowMain { ...@@ -95,13 +95,13 @@ public class ApplyListExportToBizFlowMain {
SQLContext sqlContext = ss.sqlContext(); SQLContext sqlContext = ss.sqlContext();
// String sql = "INSERT INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`) VALUES (?,?,?,?,?,?,?,?)"; // String sql = "INSERT INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`) VALUES (?,?,?,?,?,?,?,?)";
// String sql = "INSERT IGNORE INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`, `audit_valid_time`, `assets_result`, `assets_finish_time`, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"; String sql = "INSERT IGNORE INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`, `audit_valid_time`, `assets_result`, `assets_finish_time`, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)";
String sql = "INSERT IGNORE INTO `apply_list_test` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`, `audit_valid_time`, `assets_result`, `assets_finish_time`, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"; // String sql = "INSERT IGNORE INTO `apply_list_test` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `apply_submit_time`, `audit_valid_time`, `assets_result`, `assets_finish_time`, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)";
Dataset<Row> dataset = sqlContext.sql("select a.product_type,a.channel,a.order_no,a.receive_at,a.credit_status,b.apply_time,a.created_at, b.apply_status,b.risk_notify_time,b.fund_result_at, c.uuid from quotaCreditDataSetMapView a left join applyQuotaRecordDataSetMapView b on a.order_no = b.order_no left join userInfoListDataSetMapView c on a.user_id = c.id"); Dataset<Row> dataset = sqlContext.sql("select a.product_type,a.channel,a.order_no,a.receive_at,a.credit_status,b.apply_time,a.created_at, b.apply_status,b.risk_notify_time,b.fund_result_at, c.uuid from quotaCreditDataSetMapView a left join applyQuotaRecordDataSetMapView b on a.order_no = b.order_no left join userInfoListDataSetMapView c on a.user_id = c.id");
System.out.println("dataset" + dataset.count()); System.out.println("dataset" + dataset.count());
dataset.repartition(2).foreachPartition(line -> { // dataset.repartition(2).foreachPartition(line -> {
// dataset.foreachPartition(line -> { dataset.foreachPartition(line -> {
List<ApplyListRow> applyListRows = new ArrayList<>(1000); List<ApplyListRow> applyListRows = new ArrayList<>(1000);
line.forEachRemaining(result -> { line.forEachRemaining(result -> {
try { try {
...@@ -110,14 +110,15 @@ public class ApplyListExportToBizFlowMain { ...@@ -110,14 +110,15 @@ public class ApplyListExportToBizFlowMain {
applyListRow.setChannelId(result.getAs("channel")); applyListRow.setChannelId(result.getAs("channel"));
applyListRow.setProductType(result.getAs("product_type")); applyListRow.setProductType(result.getAs("product_type"));
applyListRow.setUuid(result.getAs("uuid")); applyListRow.setUuid(result.getAs("uuid"));
applyListRow.setApplyNo(result.getAs("order_no")); String orderNo = result.getAs("order_no");
applyListRow.setApplyNo(orderNo);
String credit_status = result.getAs("credit_status"); String credit_status = result.getAs("credit_status");
String apply_status = result.getAs("apply_status"); String apply_status = result.getAs("apply_status");
applyListRow.setApplyStatus(getApplyStatus(credit_status, apply_status)); applyListRow.setApplyStatus(getApplyStatus(credit_status, apply_status));
// applyListRow.setAuditResult("0".equals(apply_status) ? null : "1".equals(apply_status) ? "0" : "1"); // applyListRow.setAuditResult("0".equals(apply_status) ? null : "1".equals(apply_status) ? "0" : "1");
applyListRow.setAuditResult(getAuditResult(apply_status)); applyListRow.setAuditResult(getAuditResult(apply_status, orderNo));
Timestamp receive_at = result.getAs("receive_at"); Timestamp receive_at = result.getAs("receive_at");
String str = "2000-01-01 00:00:00"; String str = "2000-01-01 00:00:00";
...@@ -126,11 +127,11 @@ public class ApplyListExportToBizFlowMain { ...@@ -126,11 +127,11 @@ public class ApplyListExportToBizFlowMain {
applyListRow.setApplySubmitTime(receive_at.compareTo(new Timestamp(parse.getTime())) == 0 ? null : receive_at); applyListRow.setApplySubmitTime(receive_at.compareTo(new Timestamp(parse.getTime())) == 0 ? null : receive_at);
Timestamp risk_notify_time = result.getAs("risk_notify_time"); Timestamp risk_notify_time = result.getAs("risk_notify_time");
applyListRow.setAuditValidTime(getAuditValidTime(risk_notify_time, parse)); applyListRow.setAuditValidTime(getAuditValidTime(risk_notify_time, parse, apply_status, orderNo));
Timestamp fund_result_at = result.getAs("fund_result_at"); Timestamp fund_result_at = result.getAs("fund_result_at");
applyListRow.setAssetsResult(getAssetsResult(apply_status, fund_result_at)); applyListRow.setAssetsResult(getAssetsResult(apply_status, fund_result_at));
applyListRow.setAssetsFinishTime(getAssetsFinishTime(fund_result_at, apply_status)); applyListRow.setAssetsFinishTime(getAssetsFinishTime(fund_result_at, apply_status, risk_notify_time, orderNo));
Timestamp applyNoCreatedAt = result.getAs("created_at"); Timestamp applyNoCreatedAt = result.getAs("created_at");
applyListRow.setCreatedAt(applyNoCreatedAt); applyListRow.setCreatedAt(applyNoCreatedAt);
...@@ -156,67 +157,76 @@ public class ApplyListExportToBizFlowMain { ...@@ -156,67 +157,76 @@ public class ApplyListExportToBizFlowMain {
} }
public static String getAuditResult(String apply_status){ public static String getAuditResult(String apply_status, String orderNo){ //已提交申请表
// "0".equals(apply_status) ? null : "1".equals(apply_status) ? "0" : "1" if(StringUtils.isBlank(apply_status) || "0".equals(apply_status)){ //未提交申请的 直接返回null //0发起申请风控审核中
if(StringUtils.isBlank(apply_status)){
return null; return null;
} }
if("0".equals(apply_status)){
return null; if("1".equals(apply_status)){ // 1 审核拒绝
return "0"; //biz 拒绝
}
if("2".equals(apply_status) || "3".equals(apply_status)){ // 2风控审核通过 3 资方拒绝
return "1"; //biz 风控审核一定是通过
} }
if("1".equals(apply_status)){ System.out.println("getAuditResult出现未知结果"+orderNo);
return "0"; return null; //未知情况为空
} }
return "1";
public static Timestamp getAssetsFinishTime(Timestamp fund_result_at, String apply_status, Timestamp risk_notify_time, String orderNo) {
if(StringUtils.isBlank(apply_status)){ //apply_status = null 一定是未提交申请单
return null;
} }
public static Timestamp getAssetsFinishTime(Timestamp fund_result_at, String apply_status) { if(fund_result_at!=null || "3".equals(apply_status)){ //fund_result_at 不为空 即 apply_status = 3 一定不为空
if ("3".equals(apply_status)) {
return fund_result_at; return fund_result_at;
} }
if("1".equals(apply_status)){ //说明该申请单提交成功授信了 1-风控审核拒绝
return risk_notify_time; //返回风控审核通知时间
}
System.out.println("getAssetsFinishTime出现未知结果"+orderNo);
return null; return null;
} }
public static String getAssetsResult(String apply_status, Timestamp fund_result_at) { public static String getAssetsResult(String apply_status, Timestamp fund_result_at) { // fund_result_at为null 且 apply_status为2 并不清楚资方是否审核通过还是拒绝
if (fund_result_at == null && "2".equals(apply_status)) { if("3".equals(apply_status)){ //资方审核拒绝 只有这个时候才知道资产分发状态是成功还是失败
return "1"; return "0"; //biz 0 失败
} }
if(StringUtils.isBlank(apply_status)){
return null; return null;
} }
return "0";
}
public static Timestamp getAuditValidTime(Timestamp riskNotifyTime, Date parse) { public static Timestamp getAuditValidTime(Timestamp riskNotifyTime, Date parse, String apply_status, String orderNo) {
if (riskNotifyTime==null || new Timestamp(parse.getTime()).compareTo(riskNotifyTime) == 0) { if (riskNotifyTime==null || new Timestamp(parse.getTime()).compareTo(riskNotifyTime) == 0) { // null 未提交申请 风控没有通知审核结果比如apply_status为3的时候 默认"2000-01-01 00:00:00" 都返回null
return null; return null;
} }
LocalDateTime riskNotifyTimeLocaDateTime = riskNotifyTime.toLocalDateTime().plusDays(30); if(StringUtils.equalsAny(apply_status, "1", "2", "3")){ //1 拒绝 2 通过 3 资方拒绝 这个时候一定会有风控审核完成通知时间
LocalDateTime riskNotifyTimeLocaDateTime = riskNotifyTime.toLocalDateTime().plusDays(30); //已提交 且风控通知审核结果
return Timestamp.valueOf(riskNotifyTimeLocaDateTime); return Timestamp.valueOf(riskNotifyTimeLocaDateTime);
} }
System.out.println("获取审核有效期字段值出现未知结果"+orderNo);
return null; //未知情况返回null;
}
public static String getApplyStatus(String creditStatus, String applyStatus) { public static String getApplyStatus(String creditStatus, String applyStatus) {
if (StringUtils.isNotBlank(applyStatus)) {
if ("0".equals(applyStatus)) { if(StringUtils.isNotBlank(applyStatus)){ //已提交的申请
return "3"; if("0".equals(applyStatus)){ //发起授信
} else if ("1".equals(applyStatus) || "2".equals(applyStatus)) { return "3"; //biz 授信审核中
return "4"; }else if(StringUtils.equalsAny(applyStatus, "1", "2", "3")){ //拒绝 通过 资方拒绝
} else if ("3".equals(applyStatus)) { return "5"; //biz 资产分发完成
return "5"; }
} }
}/* else { if("1".equals(creditStatus)){ //未提交的申请 授信成功
if ("0".equals(creditStatus) || "2".equals(creditStatus) || "3".equals(creditStatus) || "4".equals(creditStatus)) { return "2"; //biz 已填写为授信
return "1"; }else{ // 0发起授信 2提交申请失败 3授信处理中
} else if ("1".equals(creditStatus)) { return "1"; // biz 创建申请单
return "2"; }
}
}*/
if ("1".equals(creditStatus)) {
return "2";
}
return "1";
} }
} }
...@@ -28,12 +28,12 @@ public enum HikDataSource { ...@@ -28,12 +28,12 @@ public enum HikDataSource {
// config.setJdbcUrl("jdbc:mysql://172.18.3.21:4010/rc_comservice_data_pool_v3?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true"); // config.setJdbcUrl("jdbc:mysql://172.18.3.21:4010/rc_comservice_data_pool_v3?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v3_w"); // config.setUsername("rc_comservice_data_pool_v3_w");
// config.setPassword("L4letZzT1LmPCxAt"); // config.setPassword("L4letZzT1LmPCxAt");
// config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true"); config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v2_w"); config.setUsername("rc_comservice_data_pool_v2_w");
// config.setPassword("w9pr8IPJkLmUSBe4"); config.setPassword("w9pr8IPJkLmUSBe4");
config.setJdbcUrl("jdbc:mysql://xyqb-app-db.quantgroups.com:6607/business_flow?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true"); // config.setJdbcUrl("jdbc:mysql://xyqb-app-db.quantgroups.com:6607/business_flow?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("business_flow_w"); // config.setUsername("business_flow_w");
config.setPassword("w5XM330jD7kyR8ZH"); // config.setPassword("w5XM330jD7kyR8ZH");
config.setDriverClassName("com.mysql.jdbc.Driver"); config.setDriverClassName("com.mysql.jdbc.Driver");
config.setMaximumPoolSize(50); config.setMaximumPoolSize(50);
config.setMinimumIdle(20); config.setMinimumIdle(20);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment