Commit 407160dd authored by data爬虫-冯 军凯's avatar data爬虫-冯 军凯

申请单数据清洗test

parent 51a262c0
<component name="libraryTable">
<library name="Maven: org.slf4j:slf4j-log4j12:1.7.16">
<CLASSES>
<root url="jar://$USER_HOME$/maven/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$USER_HOME$/maven/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$USER_HOME$/maven/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
......@@ -16,7 +16,8 @@
<!--<version>unicom-sms-detail-info</version>-->
<!--<version>unicom-call-detail-info</version>-->
<!--<version>2-telecom-detail-info</version>-->
<version>1.0</version>
<!--<version>1.0</version>-->
<version>apply-list-info</version>
<!--<version>telecom-sms-detail-info</version>-->
<!--<version>mobile-sms-detail-info</version>-->
<!--<version>unicom-sms-detail-info</version>-->
......@@ -172,7 +173,7 @@
<classpathScope>compile</classpathScope>
<!--<mainClass>cn.quantgroup.dbc.spark.DbcSparkMain</mainClass>-->
<!--<mainClass>cn.quantgroup.dbc.spark.PhonePaymentInfoMain</mainClass>-->
<mainClass>cn.quantgroup.dbc.spark.SparkTest</mainClass>
<mainClass>cn.quantgroup.dbc.spark.applyList.ApplyListExportToBizFlowMain</mainClass>
</configuration>
</plugin>
......
package cn.quantgroup.dbc.spark.applyList;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author fengjunkai
* @Date 2019-09-03 14:56
*/
public class ApplyListExportToBizFlowMain {
public static void main(String[] args) {
Map<String, String> userInfoMap = new HashMap<>();
Map<String, QuotaCredit> quotaCreditMap = new HashMap<>(); //orderNo--quotaCredit
Map<String, ApplyQuotaRecord> applyQuotaRecordMap = new HashMap<>(); //orderNo--applyQuotaRecord
List<ApplyListRow> applyListRows = new ArrayList<>();
SparkSession ss = SparkSession.builder().appName("DBC").getOrCreate();
System.out.println("开始加载数据...");
Dataset<String> userInfoListDataSet = ss.read().textFile("hdfs:///app/user/data/user_data/feng.ren/applyList/userInfoNew/part-m-00000");
System.out.println("加载用户信息完毕");
// Dataset<String> quotaCreditDataSet = ss.read().textFile("hdfs:///app/user/data/user_data/feng.ren/applyList/userInfo");
// System.out.println("加载申请单完毕");
// Dataset<String> applyQuotaRecordDataSet = ss.read().textFile("hdfs:///app/user/data/user_data/feng.ren/applyList/userInfo");
// System.out.println("加载提交过申请单完毕");
//
// userInfoListDataSet.foreach(o -> {
// String userInfoTmp[] = o.split("\t");
// userInfoMap.put(userInfoTmp[0], userInfoTmp[1]);
// });
// System.out.println("用户信息转换Map完成");
//
// quotaCreditDataSet.foreach(quotaCredit -> {
// String quotaCreditTmp[] = quotaCredit.split("\t");
// QuotaCredit quotaCreditRow = new QuotaCredit();
// quotaCreditRow.setUser_id(quotaCreditTmp[1]);
// quotaCreditRow.setCustomer_id(quotaCreditTmp[2]);
// quotaCreditRow.setProduct_id(Long.valueOf(quotaCreditTmp[3]));
// quotaCreditRow.setProduct_type(Integer.valueOf(quotaCreditTmp[4]));
// quotaCreditRow.setChannel(Long.valueOf(quotaCreditTmp[6]));
// quotaCreditRow.setOrder_no(quotaCreditTmp[7]);
// quotaCreditRow.setReceive_at(Timestamp.valueOf(quotaCreditTmp[11]));
// quotaCreditRow.setCredit_status(Integer.valueOf(quotaCreditTmp[12]));
// quotaCreditMap.put(quotaCreditTmp[7], quotaCreditRow);
// });
// System.out.println("用户申请单转换Bean完成");
//
// applyQuotaRecordDataSet.foreach(applyQuotaRecord -> {
// String applyQuotaRecordTmp[] = applyQuotaRecord.split("\t");
// ApplyQuotaRecord applyQuotaRecordRow = new ApplyQuotaRecord();
// applyQuotaRecordRow.setUser_id(applyQuotaRecordTmp[1]);
// applyQuotaRecordRow.setUuid(applyQuotaRecordTmp[3]);
// applyQuotaRecordRow.setApply_time(Timestamp.valueOf(applyQuotaRecordTmp[4]));
// applyQuotaRecordRow.setApply_from(Long.valueOf(applyQuotaRecordTmp[5]));
// applyQuotaRecordRow.setProduct_id(Long.valueOf(applyQuotaRecordTmp[6]));
// applyQuotaRecordRow.setProduct_type(Integer.valueOf(applyQuotaRecordTmp[7]));
// applyQuotaRecordRow.setApply_status(Integer.valueOf(applyQuotaRecordTmp[8]));
// applyQuotaRecordRow.setRisk_notify_time(Timestamp.valueOf(applyQuotaRecordTmp[11]));
// applyQuotaRecordRow.setOrder_no(applyQuotaRecordTmp[12]);
// applyQuotaRecordRow.setApply_type(Boolean.valueOf(applyQuotaRecordTmp[13]));
// applyQuotaRecordMap.put(applyQuotaRecordTmp[12], applyQuotaRecordRow);
// });
// System.out.println("用户提交过的申请单转换Bean完成");
// String sql = "INSERT INTO `apply_list` (`channel_group_no`, `channel_id`, `product_type`, `uuid`, `apply_no`, `apply_status`, `audit_result`, `audit_valid_time`, `assets_result`, `assets_finish_time`, `apply_submit_time`, `enable`, `created_at`, `updated_at`) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);";
// quotaCreditMap.forEach((orderNo, quotaCredit) -> {
//
// String userId = quotaCredit.getUser_id();
// ApplyQuotaRecord quotaRecord = applyQuotaRecordMap.get(orderNo);
// ApplyListRow applyListRow = new ApplyListRow();
// applyListRow.setChannelGroupNo(String.valueOf(quotaCredit.getChannel()));
// applyListRow.setChannelId(String.valueOf(quotaCredit.getChannel()));
// applyListRow.setProductType(quotaCredit.getProduct_type());
// applyListRow.setUuid(userInfoMap.get(userId));
// applyListRow.setApplyNo(orderNo);
// Integer credit_status = quotaCredit.getCredit_status();
// applyListRow.setApplyStatus(getApplyStatus(credit_status, quotaRecord));
// Integer apply_status = quotaRecord.getApply_status();
// applyListRow.setAuditResult(0 == apply_status ? null : 1 == apply_status ? 1 : 0);
// applyListRow.setAuditValidTime((1==apply_status||2==apply_status)?quotaRecord.getRisk_notify_time():null);
//
//// applyListRow.setAssetsFinishTime();
//// applyListRow.setAssetsResult();
//
// applyListRow.setApplySubmitTime((4 == credit_status || 3 == credit_status || 2 == credit_status || 0==credit_status) ? null : quotaCredit.getReceive_at());
// applyListRows.add(applyListRow);
//
// if(applyListRows.size()!=0 && applyListRows.size()%1000==0){
// JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
// applyListRows.clear();
// }else{
// JdbcExecuters.prepareBatchUpdateExecuteApply(sql, applyListRows);
// }
// });
}
public static Integer getApplyStatus(Integer creditStatus, ApplyQuotaRecord quotaRecord) {
if (2 == creditStatus || 3 == creditStatus || 4 == creditStatus) {
return 1;
} else if (1 == creditStatus) {
return 2;
} else if (0 == quotaRecord.getApply_status()) {
return 3;
} else {
return 1;
}
}
}
package cn.quantgroup.dbc.spark.applyList;
import java.sql.Timestamp;
/**
* @Author fengjunkai
* @Date 2019-09-04 11:00
*/
public class ApplyListRow {
private Long id;
private String channelGroupNo;
private String channelId;
private Integer productType;
private String uuid;
private String applyNo;
private Integer applyStatus;
private Integer auditResult;
private Timestamp auditValidTime;
private Integer assetsResult;
private Timestamp assetsFinishTime;
private Timestamp applySubmitTime;
private Integer enable;
private Timestamp createdAt;
private Timestamp updatedAt;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getChannelGroupNo() {
return channelGroupNo;
}
public void setChannelGroupNo(String channelGroupNo) {
this.channelGroupNo = channelGroupNo;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public Integer getProductType() {
return productType;
}
public void setProductType(Integer productType) {
this.productType = productType;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getApplyNo() {
return applyNo;
}
public void setApplyNo(String applyNo) {
this.applyNo = applyNo;
}
public Integer getApplyStatus() {
return applyStatus;
}
public void setApplyStatus(Integer applyStatus) {
this.applyStatus = applyStatus;
}
public Integer getAuditResult() {
return auditResult;
}
public void setAuditResult(Integer auditResult) {
this.auditResult = auditResult;
}
public Timestamp getAuditValidTime() {
return auditValidTime;
}
public void setAuditValidTime(Timestamp auditValidTime) {
this.auditValidTime = auditValidTime;
}
public Integer getAssetsResult() {
return assetsResult;
}
public void setAssetsResult(Integer assetsResult) {
this.assetsResult = assetsResult;
}
public Timestamp getAssetsFinishTime() {
return assetsFinishTime;
}
public void setAssetsFinishTime(Timestamp assetsFinishTime) {
this.assetsFinishTime = assetsFinishTime;
}
public Timestamp getApplySubmitTime() {
return applySubmitTime;
}
public void setApplySubmitTime(Timestamp applySubmitTime) {
this.applySubmitTime = applySubmitTime;
}
public Integer getEnable() {
return enable;
}
public void setEnable(Integer enable) {
this.enable = enable;
}
public Timestamp getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Timestamp createdAt) {
this.createdAt = createdAt;
}
public Timestamp getUpdatedAt() {
return updatedAt;
}
public void setUpdatedAt(Timestamp updatedAt) {
this.updatedAt = updatedAt;
}
}
package cn.quantgroup.dbc.spark.applyList;
import java.sql.Timestamp;
/**
* @Author fengjunkai
* @Date 2019-09-04 10:23
*/
public class ApplyQuotaRecord {
private long id;
private String user_id;
private String customer_id;
private String uuid;
private Timestamp apply_time;
private long apply_from;
private long product_id;
private Integer product_type;
private Integer apply_status;
private Timestamp updated_at;
private Timestamp created_at;
private Timestamp risk_notify_time;
private String order_no;
private boolean apply_type;
private String remark;
private Timestamp fund_result_at;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getUser_id() {
return user_id;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public String getCustomer_id() {
return customer_id;
}
public void setCustomer_id(String customer_id) {
this.customer_id = customer_id;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public Timestamp getApply_time() {
return apply_time;
}
public void setApply_time(Timestamp apply_time) {
this.apply_time = apply_time;
}
public long getApply_from() {
return apply_from;
}
public void setApply_from(long apply_from) {
this.apply_from = apply_from;
}
public long getProduct_id() {
return product_id;
}
public void setProduct_id(long product_id) {
this.product_id = product_id;
}
public Integer getProduct_type() {
return product_type;
}
public void setProduct_type(Integer product_type) {
this.product_type = product_type;
}
public Integer getApply_status() {
return apply_status;
}
public void setApply_status(Integer apply_status) {
this.apply_status = apply_status;
}
public Timestamp getUpdated_at() {
return updated_at;
}
public void setUpdated_at(Timestamp updated_at) {
this.updated_at = updated_at;
}
public Timestamp getCreated_at() {
return created_at;
}
public void setCreated_at(Timestamp created_at) {
this.created_at = created_at;
}
public Timestamp getRisk_notify_time() {
return risk_notify_time;
}
public void setRisk_notify_time(Timestamp risk_notify_time) {
this.risk_notify_time = risk_notify_time;
}
public String getOrder_no() {
return order_no;
}
public void setOrder_no(String order_no) {
this.order_no = order_no;
}
public boolean isApply_type() {
return apply_type;
}
public void setApply_type(boolean apply_type) {
this.apply_type = apply_type;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
public Timestamp getFund_result_at() {
return fund_result_at;
}
public void setFund_result_at(Timestamp fund_result_at) {
this.fund_result_at = fund_result_at;
}
}
package cn.quantgroup.dbc.spark.applyList;
import java.sql.Timestamp;
/**
* @Author fengjunkai
* @Date 2019-09-04 10:23
*/
public class QuotaCredit {
private long id;
private String user_id;
private String customer_id;
private Long product_id;
private Integer product_type;
private Boolean credit_source;
private Long channel;
private String order_no;
private String remark;
private String app_channel;
private String loan_vest_info;
private Timestamp receive_at;
private Integer credit_status;
private Timestamp created_at;
private Timestamp updated_at;
private Boolean is_active;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getUser_id() {
return user_id;
}
public void setUser_id(String user_id) {
this.user_id = user_id;
}
public String getCustomer_id() {
return customer_id;
}
public void setCustomer_id(String customer_id) {
this.customer_id = customer_id;
}
public Long getProduct_id() {
return product_id;
}
public void setProduct_id(Long product_id) {
this.product_id = product_id;
}
public Integer getProduct_type() {
return product_type;
}
public void setProduct_type(Integer product_type) {
this.product_type = product_type;
}
public Boolean getCredit_source() {
return credit_source;
}
public void setCredit_source(Boolean credit_source) {
this.credit_source = credit_source;
}
public Long getChannel() {
return channel;
}
public void setChannel(Long channel) {
this.channel = channel;
}
public String getOrder_no() {
return order_no;
}
public void setOrder_no(String order_no) {
this.order_no = order_no;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
public String getApp_channel() {
return app_channel;
}
public void setApp_channel(String app_channel) {
this.app_channel = app_channel;
}
public String getLoan_vest_info() {
return loan_vest_info;
}
public void setLoan_vest_info(String loan_vest_info) {
this.loan_vest_info = loan_vest_info;
}
public Timestamp getReceive_at() {
return receive_at;
}
public void setReceive_at(Timestamp receive_at) {
this.receive_at = receive_at;
}
public Integer getCredit_status() {
return credit_status;
}
public void setCredit_status(Integer credit_status) {
this.credit_status = credit_status;
}
public Timestamp getCreated_at() {
return created_at;
}
public void setCreated_at(Timestamp created_at) {
this.created_at = created_at;
}
public Timestamp getUpdated_at() {
return updated_at;
}
public void setUpdated_at(Timestamp updated_at) {
this.updated_at = updated_at;
}
public Boolean getIs_active() {
return is_active;
}
public void setIs_active(Boolean is_active) {
this.is_active = is_active;
}
}
package cn.quantgroup.dbc.spark.applyList;
/**
* @Author fengjunkai
* @Date 2019-09-04 10:23
*/
public class UserInfoDataRow {
private Long id;
private String uuid;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
}
......@@ -25,9 +25,12 @@ public enum HikDataSource {
// config.setPassword("w9pr8IPJkLmUSBe4");
//以下是腾讯云的tidb的配置
config.setJdbcUrl("jdbc:mysql://172.18.3.21:4010/rc_comservice_data_pool_v3?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("rc_comservice_data_pool_v3_w");
config.setPassword("L4letZzT1LmPCxAt");
// config.setJdbcUrl("jdbc:mysql://172.18.3.21:4010/rc_comservice_data_pool_v3?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
// config.setUsername("rc_comservice_data_pool_v3_w");
// config.setPassword("L4letZzT1LmPCxAt");
config.setJdbcUrl("jdbc:mysql://10.17.115.6:4010/rc_comservice_data_pool_v2?useUnicode=true&characterEncoding=UTF8&rewriteBatchedStatements=true");
config.setUsername("rc_comservice_data_pool_v2_w");
config.setPassword("w9pr8IPJkLmUSBe4");
config.setDriverClassName("com.mysql.jdbc.Driver");
config.setMaximumPoolSize(50);
config.setMinimumIdle(20);
......
package cn.quantgroup.dbc.utils;
import org.apache.commons.lang3.StringUtils;
import cn.quantgroup.dbc.spark.applyList.ApplyListRow;
import java.sql.*;
import java.util.ArrayList;
......@@ -118,6 +118,64 @@ public class JdbcExecuters {
}
public static void prepareBatchUpdateExecuteApply(String sql, List<ApplyListRow> applyListRows){
Connection conn = null;
PreparedStatement ps = null;
try {
conn = HIK_DATA_SOURCE.dataSource.getConnection();
ps = conn.prepareStatement(sql);
conn.setAutoCommit(false);
for (int i = 0; i < applyListRows.size(); i++) {
// private String channelGroupNo;
// private String channelId;
// private Integer productType;
// private String uuid;
// private String applyNo;
// private Integer applyStatus;
// private Integer auditResult;
// private Timestamp auditValidTime;
// private Integer assetsResult;
// private Timestamp assetsFinishTime;
// private Timestamp applySubmitTime;
// private Integer enable;
// private Timestamp createdAt;
// private Timestamp updatedAt;
ApplyListRow applyListRow = applyListRows.get(i);
ps.setString(1, applyListRow.getChannelGroupNo());
ps.setString(2, applyListRow.getChannelId());
ps.setInt(3, applyListRow.getProductType());
ps.setString(4, applyListRow.getUuid());
ps.setString(5, applyListRow.getApplyNo());
ps.setInt(6, applyListRow.getApplyStatus());
ps.setInt(7, applyListRow.getAuditResult());
ps.setTimestamp(8, applyListRow.getAuditValidTime()==null?null:applyListRow.getAuditValidTime());
ps.setInt(9, applyListRow.getAssetsResult()==null?null:applyListRow.getAssetsResult());
ps.setTimestamp(10, applyListRow.getAssetsFinishTime()==null?null:applyListRow.getAssetsFinishTime());
ps.setBoolean(11, true);
ps.setTimestamp(12, new Timestamp(System.currentTimeMillis()));
ps.setTimestamp(13, new Timestamp(System.currentTimeMillis()));
ps.addBatch();
if (i > 0 && i % 1000 == 0) {
ps.executeBatch();
conn.commit();
ps.clearBatch();
}
}
ps.executeBatch();
conn.commit();
}catch(Exception e){
System.out.println("======执行sqlException异常======"+sql+"\r\n");
e.printStackTrace();
}finally {
close(conn, ps,null);
}
}
/**
* 批量插入
* @param sql
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment