Commit bc4f9ffb authored by 郝彦辉's avatar 郝彦辉

黑灰名单结果表、明细表增加数据源2020.04.17

parent 1ea85ec0
...@@ -133,4 +133,20 @@ public class BlackListToolsManagerController { ...@@ -133,4 +133,20 @@ public class BlackListToolsManagerController {
} }
} }
@RequestMapping("/importBlackGrey")
public GlobalResponse importBlackGrey(String privateKey,String txtTableFile){
if(StringUtils.isNotEmpty(privateKey) && "aHVhdDI0V0JvT3hwTkJwQmk1d2Q4dz09".equals(privateKey)){
if(StringUtils.isBlank(txtTableFile)){
return GlobalResponse.error("importBlackGrey参数为空!");
}
String result= iBlackListToolsManagerService.importBlackGrey(txtTableFile);
return GlobalResponse.success(result);
}else{
return GlobalResponse.error("importBlackGrey私钥为空或不正确,请联系管理员!");
}
}
} }
package cn.quantgroup.qgblservice.job.blimport;
import cn.quantgroup.qgblservice.constant.Constant;
import cn.quantgroup.qgblservice.constant.ConstantBlackGrey;
import cn.quantgroup.qgblservice.service.IBlackGreyListService;
import cn.quantgroup.qgblservice.service.IBlackListManagerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;
/**
* -----------------------------------------------------------------------------<br>
* 类描述: 黑灰名单明天T+1定时任务 <br>
* 创建人: yanhui.Hao <br>
* 创建时间: 2020.04.16 10:22 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Slf4j
@Component
public class BlackGreyListJob {
@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private IBlackGreyListService blackGreyListService;
private Boolean increment(String key){
Long increment = redisTemplate.opsForValue().increment(key, 1);
return increment <= 1;
}
private void setRedisValStr(String key, long timeout) {
try {
redisTemplate.opsForValue().set(key, "true", timeout, TimeUnit.MINUTES);
} catch (Exception e) {
log.error("Redis中set值异常, key: {} ", key, e);
}
}
private boolean getRedisValStr(String key) {
String value = null;
try {
value = redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.error("Redis中get值异常, key: {} ", key, e);
}
if (StringUtils.isNotEmpty(value) && "true".equals(value)) {
return true;
} else {
return false;
}
}
/**
* -----------------------------------------------------------------------------<br>
* 描 述: 每天现金贷/vcc新增逾期15天黑名单 <br>
* 创建人: yanhui.Hao <br>
* 创建时间: 2020.04.16 14:15 <br>
* 最后修改人: <br>
* 最后修改时间: 2020.04.16 14:15 <br>
* 入参说明: <br>
* 出参说明: <br>
* -----------------------------------------------------------------------------
*/
//@Scheduled(cron = "0 30 1 * * ?") //每天凌晨1点30
public void blackListImportJob() {
try {
if(increment(ConstantBlackGrey.REDIS_KEY.JOB_ADD_BLACK_LIST_INCREMENT_KEY)){
redisTemplate.expire(ConstantBlackGrey.REDIS_KEY.JOB_ADD_BLACK_LIST_INCREMENT_KEY, 10, TimeUnit.SECONDS);
//yyyy-MM-dd
String todayNyr = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
String xianJinDai = null;
try {
xianJinDai = blackGreyListService.importXianJinDaiBlackGreyList();
}catch (Exception e){
log.error(todayNyr+", 每天执行新增现金分期15+逾期黑名单异常", e);
}
String vcc = null;
try {
vcc = blackGreyListService.importVccBlackGreyList();
}catch (Exception e){
log.error(todayNyr+", 每天执行新增Vcc15+逾期黑名单异常", e);
}
if(StringUtils.isNotEmpty(xianJinDai) && StringUtils.isNotEmpty(vcc)){
setRedisValStr(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_BLACK_TO_GREY_KEY, 15);
}
}
} catch (Exception e) {
log.error("每天执行新增15+逾期黑名单异常", e);
}
}
/**
* -----------------------------------------------------------------------------<br>
* 描 述: 每天将逾期已还清黑名单>>转灰名单 <br>
* 创建人: yanhui.Hao <br>
* 创建时间: 2020.04.16 14:16 <br>
* 最后修改人: <br>
* 最后修改时间: 2020.04.16 14:16 <br>
* 入参说明: <br>
* 出参说明: <br>
* -----------------------------------------------------------------------------
*/
//@Scheduled(cron = "0 */10 * * * ?") //每天10分钟扫描一次
public void blackToGreyListJob() {
try {
if(increment(ConstantBlackGrey.REDIS_KEY.JOB_REMOVE_BLACK_TO_GREY_INCREMENT_KEY)){
redisTemplate.expire(ConstantBlackGrey.REDIS_KEY.JOB_REMOVE_BLACK_TO_GREY_INCREMENT_KEY, 10, TimeUnit.SECONDS);
//判断是否有执行
if (getRedisValStr(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_BLACK_TO_GREY_KEY)) {
//yyyy-MM-dd
String todayNyr = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
String removeBlackToGrey = null;
try {
//删掉执行标记
redisTemplate.delete(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_BLACK_TO_GREY_KEY);
removeBlackToGrey = blackGreyListService.removeBlackToGreyList();
}catch (Exception e){
log.error(todayNyr+", 每天执行将逾期已还清黑名单转灰名单异常", e);
}
if(StringUtils.isNotEmpty(removeBlackToGrey)){
setRedisValStr(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_UPDATE_BLACK_OVERDUEDAY_KEY, 15);
}
}
}
} catch (Exception e) {
log.error("每天执行将逾期已还清黑名单转灰名单异常", e);
}
}
/**
* -----------------------------------------------------------------------------<br>
* 描 述: 每天更新黑名单>>最大逾期天数和累计逾期天数 <br>
* 创建人: yanhui.Hao <br>
* 创建时间: 2020.04.16 14:16 <br>
* 最后修改人: <br>
* 最后修改时间: 2020.04.16 14:16 <br>
* 入参说明: <br>
* 出参说明: <br>
* -----------------------------------------------------------------------------
*/
//@Scheduled(cron = "0 */10 * * * ?") //每天10分钟扫描一次
public void updateBlackListOverdueDayJob() {
try {
if(increment(ConstantBlackGrey.REDIS_KEY.JOB_UPDATE_BLACK_OVERDUEDAY_INCREMENT_KEY)){
redisTemplate.expire(ConstantBlackGrey.REDIS_KEY.JOB_UPDATE_BLACK_OVERDUEDAY_INCREMENT_KEY, 10, TimeUnit.SECONDS);
//判断是否有执行
if (getRedisValStr(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_UPDATE_BLACK_OVERDUEDAY_KEY)) {
//yyyy-MM-dd
String todayNyr = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
String updateBLOverdueDay = null;
try {
//删掉执行标记
redisTemplate.delete(ConstantBlackGrey.REDIS_KEY.SIGN_IS_RUN_UPDATE_BLACK_OVERDUEDAY_KEY);
updateBLOverdueDay = blackGreyListService.updateBlackListOverdueDay();
}catch (Exception e){
log.error(todayNyr+", 每天执行更新黑名单最大逾期天数和累计逾期天数异常", e);
}
}
}
} catch (Exception e) {
log.error("每天执行更新黑名单最大逾期天数和累计逾期天数异常", e);
}
}
}
...@@ -28,5 +28,9 @@ public class BlackGreyListQueryVo implements Serializable { ...@@ -28,5 +28,9 @@ public class BlackGreyListQueryVo implements Serializable {
private String blackType; private String blackType;
private String type;
private Integer status;
} }
...@@ -20,7 +20,15 @@ public interface BlackGreyListMapper { ...@@ -20,7 +20,15 @@ public interface BlackGreyListMapper {
BlackGreyListResult findBGLResultByRid(String rId); BlackGreyListResult findBGLResultByRid(String rId);
List<BlackGreyListResult> findBlackGreyListResult(BlackGreyListQueryVo queryParams); List<BlackGreyListResult> findBlackGreyListResultByParam(BlackGreyListQueryVo queryParams);
//通过三要素查询
List<BlackGreyListResult> findBlackGreyListResultBy3YS(BlackGreyListQueryVo queryParams);
List<BlackGreyListDetails> findBlackGreyListDetails(BlackGreyListQueryVo queryParams);
int insertBlackGreyListResult(BlackGreyListResult result); int insertBlackGreyListResult(BlackGreyListResult result);
......
...@@ -64,6 +64,68 @@ ...@@ -64,6 +64,68 @@
</trim> </trim>
</select> </select>
<select id="findBlackGreyListResultBy3YS" parameterType="cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListQueryVo" resultMap="BlackGreyListResultMap">
select * from black_grey_list_result
where
<choose>
<when test = "phoneNo == null">
phone_no is NULL
</when>
<otherwise>
phone_no = #{phoneNo,jdbcType=VARCHAR}
</otherwise>
</choose>
<choose>
<when test = "name == null">
and name is NULL
</when>
<otherwise>
and name = #{name,jdbcType=VARCHAR}
</otherwise>
</choose>
<choose>
<when test = "idNo == null">
and id_no is NULL
</when>
<otherwise>
and id_no = #{idNo,jdbcType=VARCHAR}
</otherwise>
</choose>
</select>
<select id="findBlackGreyListDetails" parameterType="cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListQueryVo" resultMap="BlackGreyListDetailsMap">
select * from black_grey_list_details
<trim prefix="where" prefixOverrides="and">
<if test="rId != null">
and r_id = #{rId,jdbcType=VARCHAR}
</if>
<if test="phoneNo != null">
and phone_no = #{phoneNo,jdbcType=VARCHAR}
</if>
<if test="name != null">
and name = #{name,jdbcType=VARCHAR}
</if>
<if test="idNo != null">
and id_no = #{idNo,jdbcType=VARCHAR}
</if>
<if test="type != null">
and type = #{type,jdbcType=VARCHAR}
</if>
<if test="status != null">
and type = #{status,jdbcType=INTEGER}
</if>
</trim>
</select>
<insert id="insertBlackGreyListResult" <insert id="insertBlackGreyListResult"
parameterType="cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListResult"> parameterType="cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListResult">
......
...@@ -17,18 +17,22 @@ import java.util.List; ...@@ -17,18 +17,22 @@ import java.util.List;
public interface IBlackGreyListService { public interface IBlackGreyListService {
/** //往black_grey_list_result表及black_grey_list_details表插入
* -----------------------------------------------------------------------------<br> int saveBlackGreyList(TmpBlackGreyList blackGreyObj);
* 描述: 往black_grey_list_result表及black_grey_list_details表插入<br>
* 作者:yanhui.Hao <br> //往black_grey_list_result表及black_grey_list_details表插入
* 时间:2020.04.02 <br> int saveBlackGreyListByJdbc(List<TmpBlackGreyList> tmpQueryList) throws SQLException;
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br> //现金分期,每日逾期15+用户 每日执行导入新的黑名单表
* ----------------------------------------------------------------------------- String importXianJinDaiBlackGreyList();
*/
//vcc逾期,每日逾期15+用户 每日执行导入新的黑名单表
String importVccBlackGreyList();
int saveBlackRreyList(TmpBlackGreyList blackGreyObj); //逾期已还清黑名单>>转灰名单
String removeBlackToGreyList();
int saveBlackRreyListByJdbc(List<TmpBlackGreyList> tmpQueryList) throws SQLException; //更新黑名单>>逾期天数
String updateBlackListOverdueDay();
} }
...@@ -46,4 +46,6 @@ public interface IBlackListToolsManagerService { ...@@ -46,4 +46,6 @@ public interface IBlackListToolsManagerService {
*/ */
public void cleanTableData(String privateKey, String operatType); public void cleanTableData(String privateKey, String operatType);
public String importBlackGrey(String txtTableFile);
} }
package cn.quantgroup.qgblservice.service.impl; package cn.quantgroup.qgblservice.service.impl;
import cn.quantgroup.qgblservice.constant.Constant; import cn.quantgroup.qgblservice.constant.Constant;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListDetails;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListQueryVo;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackListChannelExpireConfigVo0; import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackListChannelExpireConfigVo0;
import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.BlackListQueryTidbVo0; import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.BlackListQueryTidbVo0;
import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.TmpBlackGreyList; import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.TmpBlackGreyList;
import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.TmpBlackGreyListRowMapper; import cn.quantgroup.qgblservice.repository.mybatis.entity.tidb.TmpBlackGreyListRowMapper;
import cn.quantgroup.qgblservice.repository.mybatis.entity.xyqbuser.XyqbUser; import cn.quantgroup.qgblservice.repository.mybatis.entity.xyqbuser.XyqbUser;
import cn.quantgroup.qgblservice.repository.mybatis.mapper.blacklist.BlackGreyListMapper;
import cn.quantgroup.qgblservice.response.GlobalResponse; import cn.quantgroup.qgblservice.response.GlobalResponse;
import cn.quantgroup.qgblservice.service.IBlackGreyListService; import cn.quantgroup.qgblservice.service.IBlackGreyListService;
import cn.quantgroup.qgblservice.service.IBlackListToolsManagerService; import cn.quantgroup.qgblservice.service.IBlackListToolsManagerService;
...@@ -73,6 +76,9 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS ...@@ -73,6 +76,9 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS
@Autowired @Autowired
private IBlackGreyListService blackGreyListService; private IBlackGreyListService blackGreyListService;
@Autowired
private BlackGreyListMapper blackGreyListMapper;
@PostConstruct @PostConstruct
public void initChannelBlackListExpireConfig() { public void initChannelBlackListExpireConfig() {
List<BlackListChannelExpireConfigVo0> queryBlackListChannelExpireConfigVo0List = blackListJdbcTemplate.query(Constant.SQL.BLACK_LIST_NEW_QUERY_CHANNEL_BLACK_LIST_EXPIRE_CONFIG_SQL, new BeanPropertyRowMapper<>(BlackListChannelExpireConfigVo0.class)); List<BlackListChannelExpireConfigVo0> queryBlackListChannelExpireConfigVo0List = blackListJdbcTemplate.query(Constant.SQL.BLACK_LIST_NEW_QUERY_CHANNEL_BLACK_LIST_EXPIRE_CONFIG_SQL, new BeanPropertyRowMapper<>(BlackListChannelExpireConfigVo0.class));
...@@ -1246,29 +1252,39 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS ...@@ -1246,29 +1252,39 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS
" WHERE id > ? AND id<=?;"; " WHERE id > ? AND id<=?;";
int maxId = 0; int maxId = 0;
int BEGIN_ID = 0, startId = 0, endId = 0;
try{ try{
String value = stringRedisTemplate.opsForValue().get("UPDATE_HUITOHEI_MAXID_TYPE1"); String value_maxId = stringRedisTemplate.opsForValue().get("UPDATE_HUITOHEI_MAXID_TYPE1");
if(StringUtils.isNotEmpty(value)){ if(StringUtils.isNotEmpty(value_maxId)){
maxId = Integer.parseInt(value); maxId = Integer.parseInt(value_maxId);
}
String value_beginId = stringRedisTemplate.opsForValue().get("UPDATE_HUITOHEI_MAXID_BEGINID");
if(StringUtils.isNotEmpty(value_beginId)){
BEGIN_ID = Integer.parseInt(value_beginId);
} }
}catch (Exception e){ }catch (Exception e){
log.error("获取redis里值异常, key: {} ", "UPDATE_HUITOHEI_MAXID_TYPE1", e); log.error("获取redis里值异常, key: {} ", "UPDATE_HUITOHEI_MAXID_TYPE1", e);
} }
log.info("方法tmpBlackGreyToProduct()查询缓存, maxId : {} ", maxId); log.info("方法tmpBlackGreyToProduct()查询缓存, maxId : {} , BEGIN_ID : {} , startId : {} , endId: {} ", maxId, BEGIN_ID, startId, endId);
if(maxId<=0){ if(maxId<=0){
return; return;
} }
int totalQueryCount=0, totalOkCount = 0; int totalQueryCount=0, totalOkCount = 0;
int totalPage = maxId/LIMIT_1000; int totalPage = (maxId-BEGIN_ID) /LIMIT_1000;
if((maxId%LIMIT_1000)!=0){ if(( (maxId-BEGIN_ID)%LIMIT_1000)!=0){
totalPage+=1; totalPage+=1;
} }
int startId = 0,endId = 0;
for (int page = 1; page <= totalPage; page++) { for (int page = 1; page <= totalPage; page++) {
startId = (page-1) * LIMIT_1000; //startId = (page-1) * LIMIT_1000;
endId = page * LIMIT_1000; //endId = page * LIMIT_1000;
startId = BEGIN_ID + (page-1) * LIMIT_1000;
endId = startId + LIMIT_1000;
long thisPageStart= System.currentTimeMillis(); long thisPageStart= System.currentTimeMillis();
List<TmpBlackGreyList> tmpQueryList = null; List<TmpBlackGreyList> tmpQueryList = null;
...@@ -1286,7 +1302,7 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS ...@@ -1286,7 +1302,7 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS
thisPageOk++; thisPageOk++;
totalOkCount++; totalOkCount++;
}*/ }*/
int count = blackGreyListService.saveBlackRreyListByJdbc(tmpQueryList); int count = blackGreyListService.saveBlackGreyListByJdbc(tmpQueryList);
thisPageOk+=count; thisPageOk+=count;
totalOkCount+=count; totalOkCount+=count;
} }
...@@ -1300,5 +1316,101 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS ...@@ -1300,5 +1316,101 @@ public class BlackListToolsManagerServiceImpl implements IBlackListToolsManagerS
log.info("\n>>>>>>方法tmpBlackGreyToProduct()处理 Method End, totalQueryCount: {} , totalOkCount: {} , total cost: {} <<<<<<", totalQueryCount, totalOkCount, (System.currentTimeMillis()-totalStar)+".ms"); log.info("\n>>>>>>方法tmpBlackGreyToProduct()处理 Method End, totalQueryCount: {} , totalOkCount: {} , total cost: {} <<<<<<", totalQueryCount, totalOkCount, (System.currentTimeMillis()-totalStar)+".ms");
} }
@Override
public String importBlackGrey(String txtTableFile) {
List<String> contextList = ReadOrWriteTxt.readTxtList(txtTableFile);
if(contextList==null || contextList.size()==0){
return txtTableFile+",读取参数文件为空!";
}
int details_haveCount = 0;
int saveOkCount = 0;
List<TmpBlackGreyList> blackGreyList = new ArrayList<TmpBlackGreyList>();
TmpBlackGreyList blackGreyObj = null;
for (String context: contextList) {
//uuid|name|phone_no|id_no|black_type|type|join_black_reason|max_overdue_days|total_overdue_days|remark|created_at|updated_at
if(context.length()<11){
log.warn("该行参数有误, 跳过: {} ", context);
continue;
}
String[] array = context.trim().split("[|]");
if(array.length<12){
log.warn("该行参数格式有误, 跳过: {} ", context);
continue;
}
blackGreyObj = new TmpBlackGreyList();
if(StringUtils.isNotEmpty(array[0].trim())){
blackGreyObj.setUuid(array[0].trim());
}
if(StringUtils.isNotEmpty(array[1].trim())){
blackGreyObj.setName(array[1].trim());
}
if(StringUtils.isNotEmpty(array[2].trim())){
blackGreyObj.setPhoneNo(array[2].trim());
}
if(StringUtils.isNotEmpty(array[3].trim())){
blackGreyObj.setIdNo(array[3].trim());
}
if(StringUtils.isNotEmpty(array[4].trim())){
blackGreyObj.setBlackType(array[4].trim());
}
if(StringUtils.isNotEmpty(array[5].trim())){
blackGreyObj.setType(array[5].trim());
}
if(StringUtils.isNotEmpty(array[6].trim())){
blackGreyObj.setJoinBlackReason(array[6].trim());
}
if(StringUtils.isNotEmpty(array[7].trim())){
blackGreyObj.setMaxOverdueDays(array[7].trim());
}
if(StringUtils.isNotEmpty(array[8].trim())){
blackGreyObj.setTotalOverdueDays(array[8].trim());
}
if(StringUtils.isNotEmpty(array[9].trim())){
blackGreyObj.setRemark(array[9].trim());
}
if(StringUtils.isNotEmpty(array[10].trim())){
Timestamp createdAt = new Timestamp(System.currentTimeMillis());
try {
createdAt = Timestamp.valueOf(array[10].trim());//yyyy-mm-dd hh:mm:ss
} catch (Exception e) {
e.printStackTrace();
}
blackGreyObj.setCreatedAt(createdAt);
}
if(StringUtils.isNotEmpty(array[11].trim())){
Timestamp updatedAt = new Timestamp(System.currentTimeMillis());
try {
updatedAt = Timestamp.valueOf(array[11].trim());//yyyy-mm-dd hh:mm:ss
} catch (Exception e) {
e.printStackTrace();
}
blackGreyObj.setCreatedAt(updatedAt);
blackGreyObj.setUpdatedAt(updatedAt);
}
BlackGreyListQueryVo queryResultParam = BlackGreyListQueryVo.builder().name(blackGreyObj.getName()).idNo(blackGreyObj.getIdNo()).phoneNo(blackGreyObj.getPhoneNo())
.type(blackGreyObj.getType()).status(0).build();
List<BlackGreyListDetails> detailsList = blackGreyListMapper.findBlackGreyListDetails(queryResultParam);
if(detailsList!=null && detailsList.size()>0){
details_haveCount++;
}else {
blackGreyList.add(blackGreyObj);
}
}
if(blackGreyList!=null && blackGreyList.size()> 0){
try {
saveOkCount = blackGreyListService.saveBlackGreyListByJdbc(blackGreyList);
} catch (SQLException e) {
e.printStackTrace();
}
}
String resMsgTemp = "文件:%s 总共 %d条, 明细表已存在 %d条, 插入成功 %d条;";
return String.format(resMsgTemp, txtTableFile, contextList.size(), details_haveCount, saveOkCount);
}
} }
package cn.quantgroup.qgblservice.service.impl;
import cn.quantgroup.qgblservice.constant.ConstantBlackGrey;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListDetails;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListQueryVo;
import cn.quantgroup.qgblservice.repository.mybatis.entity.blacklist.BlackGreyListResult;
import cn.quantgroup.qgblservice.repository.mybatis.mapper.blacklist.BlackGreyListMapper;
import cn.quantgroup.qgblservice.utils.parallel.ParallelComputing;
import cn.quantgroup.qgblservice.utils.parallel.ParallelComputingProcess;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import sun.misc.BASE64Decoder;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* -----------------------------------------------------------------------------<br>
* 类描述: 逾期已还清黑名单>>转灰名单-并行框架 <br>
* 创建人: yanhui.Hao <br>
* 创建时间: 2020.04.16 21:12 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
@Slf4j
@Service
public class BlackToGreyListParallel implements ParallelComputingProcess<BlackGreyListResult, Set<String>> {
private final String log_inf = "BlackToGreyListParallel";
private final String sepa = java.io.File.separator; //System.getProperty("file.separator");
private final DateFormat df = new SimpleDateFormat(ConstantBlackGrey.DAYE_FORMAT.YYYYMMDD);
private final String saveLogPath = "/home/logs/";
//并行处理框架
private ParallelComputing<BlackGreyListResult, Set<String>> p = new ParallelComputing<BlackGreyListResult, Set<String>>("BlackToGreyListParallel");
/**
* 当前服务器CPU个数
*/
private static int SYS_CPU_COUNT = Runtime.getRuntime().availableProcessors();//4
//private static int SYS_CPU_COUNT = 6;
@Autowired
private BlackGreyListMapper blackGreyListMapper;
public void runMain(List<BlackGreyListResult> tmpQueryList) {
long startProcess = System.currentTimeMillis();
int listSize = tmpQueryList.size();
log.info(log_inf+"-runMain开始执行, listSize: {} , SYS_CPU_COUNT: {} ", listSize, SYS_CPU_COUNT);
try {
//并行框架---执行
Set<String> optSet = Collections.synchronizedSet(new HashSet<>());
p.processForThread(tmpQueryList, this, SYS_CPU_COUNT, optSet);
//并行框架执行结果
if (optSet.size() > 0) {//并行框架执行异常
log.error("runMain() ERROR! >> optSet size:{} , Json:{} ", optSet.size(), JSONObject.toJSONString(optSet));
return;
} else {
//执行成功,覆盖执行时间
log.info(log_inf + " >> to process() is OK!");
}
log.info(log_inf +"-runMain执行结束, cost:" + (System.currentTimeMillis() - startProcess) + "ms.");
} catch (Exception e) {
log.error(log_inf +"-runMain执行异常.", e);
}
}
@Override
public void process(int threadId, List<BlackGreyListResult> blackResultList, Set<String> optSet) {
AtomicInteger ok_count = new AtomicInteger();
AtomicInteger err_count = new AtomicInteger();
int list_size = blackResultList.size();
//yyyyMMdd
Calendar calendar = Calendar.getInstance();
String dateName = df.format(calendar.getTime());
int index = 0;
for (BlackGreyListResult blackResult : blackResultList) {
log.info("threadId_{}: 开始执行list_size: {} , this: {} ", threadId , list_size, (++index));
try {
BlackGreyListQueryVo queryParam = BlackGreyListQueryVo.builder().rId(blackResult.getRId()).build();
List<BlackGreyListDetails> detailsList = blackGreyListMapper.findBlackGreyListDetails(queryParam);
if (detailsList!=null && detailsList.size()>0) {
for(BlackGreyListDetails details : detailsList){
//现金贷
if("1".equals(details.getType()) && details.getStatus()==0){
/*#现金分期在逾转已还 (条件为step1的表内join_black_reason=1的用户,判断其是否还处于在逾状态,如果in_overdue=0,则更新step1的表的black_type为2并更新join_black_reason为6,否则不做修改)
use xyqb;
select
count(distinct a.user_id) in_overdue
from repayment_plan a
left join loan_application_manifest_history b
on a.loan_application_history_id=b.loan_application_history_id
left join user c on a.user_id=c.id
where c.uuid='d9867db1-cb8e-4a94-adfc-d3f80079c8d0'
and b.transaction_status in (2,5)
and a.repayment_status not in (3,4) and a.deadline<=CURDATE();*/
}
//白条
else if("2".equals(details.getType()) && details.getStatus()==0){
/*#白条在逾转已还 (条件为step1的表内join_black_reason=3的用户,判断其是否还处于在逾状态,如果in_overdue=0,则更新step1的表的black_type为2并更新join_black_reason为6,否则不做修改)
use xyqb;
select
count(distinct a.user_id) in_overdue
from baitiao_repayment_plan a
left join baitiao_order b on a.order_id=b.id
left join user c on a.user_id=c.id
where c.uuid='e501ab61-ae1b-4098-a322-fc59b17b1109'
and b.status=1
and a.repayment_status not in (3,4) and a.deadline<CURDATE();*/
}
//Vcc
else if("10".equals(details.getType()) && details.getStatus()==0){
/* #vcc在逾转已还 (条件为step1的表内join_black_reason=2的用户,判断其是否还处于在逾状态,如果in_overdue=0,则更新step1的表的black_type为2并更新join_black_reason为6,否则不做修改)
use xyqb_user;
select id user_id from user where uuid='24fdc7b7-b0b7-46e9-8045-f79ad8eaeb1b';
use acsdb;
select
count(distinct a.user_id) in_overdue
from acs_plan a
left join acs_trans b on a.trans_id = b.id
where a.plan_status ='Overdue'
and b.trans_type in ('Shop','Loan')
and b.trans_status = 'Complete'
and a.user_id='58308876';*/
}else{
err_count.getAndIncrement();
optSet.add("otherType:"+blackResult.getRId());
writeLogByName(saveLogPath + dateName + sepa + "error.log", "black_list_result_not_find_details>>"+blackResult.getRId());
}
}
ok_count.getAndIncrement();
}else {
err_count.getAndIncrement();
optSet.add("notFind:"+blackResult.getRId());
writeLogByName(saveLogPath + dateName + sepa + "error.log", "black_list_result_not_find_details>>"+blackResult.getRId());
}
} catch (Exception e) {
optSet.add("ERR:" + blackResult.getRId());
log.error(log_inf + "process() threadId:" + threadId + ",ERROR!", e);
}
}
log.info(log_inf + "处理结束 CPU_{} , 成功条数:{} , 失败:{} , this_list_size: {} ", threadId, ok_count.get(), err_count.get(), list_size);
}
//-------------------------------------------------------------------------
private static void writeLogByName(String fileName, String msg) {
try {
FileUtils.write(new File(fileName), msg + "\r\n", "UTF-8", true);
} catch (IOException e) {
e.printStackTrace();
System.err.println("writeLog Error," + msg + "," + e.toString());
}
}
}
package cn.quantgroup.qgblservice.utils; package cn.quantgroup.qgblservice.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.*; import java.io.*;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
...@@ -12,6 +15,7 @@ import java.util.List; ...@@ -12,6 +15,7 @@ import java.util.List;
* 公司: 北京众信利民信息技术有限公司 <br> * 公司: 北京众信利民信息技术有限公司 <br>
* ----------------------------------------------------------------------------- * -----------------------------------------------------------------------------
*/ */
@Slf4j
public class ReadOrWriteTxt { public class ReadOrWriteTxt {
/**传入txt路径读取txt文件 /**传入txt路径读取txt文件
* @param txtPath * @param txtPath
...@@ -79,7 +83,62 @@ public class ReadOrWriteTxt { ...@@ -79,7 +83,62 @@ public class ReadOrWriteTxt {
} }
} }
public static List<String> readTxtList(String txtPath) {
File file = new File(txtPath);
if(file.isFile() && file.exists()){
List<String> lineList = new ArrayList<String>();
try {
FileInputStream fileInputStream = new FileInputStream(file);
InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream, "GBK");
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
StringBuffer sb = new StringBuffer();
String text = null;
while((text = bufferedReader.readLine()) != null){
lineList.add(text);
}
return lineList;
} catch (Exception e) {
e.printStackTrace();
}
}
return null;
}
public static List<String> readResourceFilList(String fileName) {
InputStream isr = ReadOrWriteTxt.class.getResourceAsStream(fileName);
BufferedReader br = new BufferedReader(new InputStreamReader(isr));
List<String> lineList = new ArrayList<String>();
try {
boolean inKey = false;
/*for (String line = br.readLine(); line != null; line = br.readLine()) {
line = line.trim();
if (!inKey) {
if (line.contains("---BEGIN---")) {
inKey = true;
}
continue;
} else {
if (line.contains("---END---")) {
inKey = false;
break;
}
lineList.add(line);
}
}*/
for (String line = br.readLine(); line != null; line = br.readLine()) {
line = line.trim();
lineList.add(line);
}
br.close();
isr.close();
} catch (IOException e) {
e.printStackTrace();
log.error("读取:" + fileName + "异常", e);
}
return lineList;
}
public static void main(String[] args) { public static void main(String[] args) {
//String filePath = "D:\\JavaTeam\\shmf.txt"; //String filePath = "D:\\JavaTeam\\shmf.txt";
......
package cn.quantgroup.qgblservice.utils.parallel;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* -----------------------------------------------------------------------------<br>
* 描述: <h1>并行计算框架</h1> <h2>框架描述:</h2> 该并行计算框架主要将较大的List切分为多份,并行处理。<br>
* 请运行当前类中的Main方法来学习该类的用法及作用。</br>
* ParallelComputing<String, String> p = new ParallelComputing<String, String>("TestMyService");</br>
* boolean boo1 = p.processForDataShard(list, new ParallelComputingProcessImpl(), 3000, "外部数据");</br>
* boolean boo2 = p.processForThread(list, new ParallelComputingProcessImpl(), Runtime.getRuntime().availableProcessors(), "外部数据");</br>
* 参数: ListType 需并行处理的List泛型
* ParameterType 业务处理时传入的参数类型
* 作者:yanhui.Hao <br>
* 时间:2020.01.15 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
public class ParallelComputing<ListType, ParameterType> {
private static final Logger logger = Logger.getLogger(ParallelComputing.class);
/** 线程池门面类 */
private ExecutorService pool;
/** 业务名称 */
private String serviceName = "DefaultService";
/** 默认构造,默认的ServiceName,默认使用newCachedThreadPool(可重用并在60秒内自动伸缩的线程池)来管理 */
public ParallelComputing() {
pool = newCachedThreadPool(serviceName);
}
/** 指定serviceName,使用newCachedThreadPool(可重用并在60秒内自动伸缩的线程池)来管理 */
public ParallelComputing(String serviceName) {
this.serviceName = serviceName;
pool = newCachedThreadPool(serviceName);
}
/** 指定serviceName,newFixedThreadPool(可重用固定线程数的线程池) ,指定固定线程池数量 */
public ParallelComputing(String serviceName, int nThreads) {
this.serviceName = serviceName;
pool = newFixedThreadPool(nThreads, serviceName);
}
/** 默认serviceName,newFixedThreadPool(可重用固定线程数的线程池) ,指定固定线程池数量 */
public ParallelComputing(int nThreads) {
pool = newFixedThreadPool(nThreads, serviceName);
}
/** 创建一个可重用固定线程数的线程池 */
private ExecutorService newFixedThreadPool(int num, final String serviceName) {
ExecutorService pool = Executors.newFixedThreadPool(num, new ThreadFactory() {
AtomicInteger atomicInteger = new AtomicInteger();// 原子记数
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(serviceName + "-" + atomicInteger.getAndIncrement());
return t;
}
});
return pool;
}
/** 创建一个可重用并在60秒内自动伸缩的线程池 */
private ExecutorService newCachedThreadPool(final String serviceName) {
ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactory() {
AtomicInteger atomicInteger = new AtomicInteger();// 原子记数
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(serviceName + "-" + atomicInteger.getAndIncrement());
return t;
}
});
return pool;
}
/** 关闭线程池,释放资源 */
public void colse() {
pool.shutdownNow();
// 关闭线程池
pool.shutdown();
}
/**
* 并行执行的任务-按数据list长度切分任务数
*
* @param list
* 需并行处理的数据
* @param process
* 业务实现类,需实现ParallelComputingProcess接口。
* @param dataShardSize
* 每个线程处理条数,小于2000按2000处理
*/
public boolean processForDataShard(List<ListType> list, ParallelComputingProcess<ListType, ParameterType> process, int dataShardSize, ParameterType obj) {
if (list == null || list.size() < 1 || process == null) {
return false;
}
int minShardSize = 2000;// 每个线程最小处理条数
dataShardSize = dataShardSize < minShardSize ? minShardSize : dataShardSize;
long startTime = System.currentTimeMillis();
// 将list分块后丢给多个线程,并行处理
int listSize = list.size();// 总数
int threadNum = listSize / dataShardSize;// 启动线程数
int dataMon = listSize % dataShardSize;// 数据余数
int threadFullNum = threadNum + (dataMon > 0 ? 1 : 0);// 总共需要启动线程数
final CountDownLatch latch = new CountDownLatch(threadFullNum);// 同步辅助类
final ParallelComputingProcess<ListType, ParameterType> processShard = process;
final ParameterType object = obj;
if (threadNum > 0) {
for (int i = 0; i < threadNum; i++) {
int fromIndex = i * dataShardSize;
int toIndex = ((i + 1) * dataShardSize);
final List<ListType> listShard = list.subList(fromIndex, toIndex);
final int threadId = i;
Runnable t = new Runnable() {
@Override
public void run() {
job(threadId, listShard, processShard, object);
latch.countDown();// 计数减一
}
};
pool.execute(t);
}
}
if (dataMon > 0) {
int fromIndex = listSize - dataMon;
int toIndex = listSize;
final List<ListType> listShard = list.subList(fromIndex, toIndex);
final int threadId = threadNum;
Runnable t = new Runnable() {
@Override
public void run() {
job(threadId, listShard, processShard, object);
latch.countDown();// 计数减一
}
};
pool.execute(t);
}
logger.info(serviceName + "-processForDataShard START (thread count:" + threadFullNum + ",Sharding size:" + dataShardSize + ",total count:" + listSize + ")");
try {
latch.await();// 等待子线程结束
} catch (InterruptedException e) {
logger.error(serviceName + "-processForDataShard ERROR!", e);
e.printStackTrace();
return false;
}
logger.info(serviceName + "-processForDataShard FINISH (thread count:" + threadFullNum + ",Sharding size:" + dataShardSize + ",total count:" + listSize + ") use time " + (System.currentTimeMillis() - startTime) + "ms");
return true;
}
/**
* 并行执行的任务-按设置任务数,如list长度%任务数>0,则会启动任务数+1个任务去处理
*
* @param list
* 需并行处理的数据
* @param process
* 业务实现类
* @param threadNum
* 启动任务处理数,推荐获取CPU数(Runtime.getRuntime().availableProcessors())
* (如果任务数小于等于0,则单线程处理,如果任务数大于80,则只启动80个线程处理,如果数据小于2000条,则单线程处理)
*/
public boolean processForThread(List<ListType> list, ParallelComputingProcess<ListType, ParameterType> process, int threadNum, ParameterType obj) {
if (list == null || list.size() < 1 || process == null) {
return false;
}
long startTime = System.currentTimeMillis();
int listSize = list.size();// 总数
threadNum = threadNum <= 0 ? 1 : threadNum;// 如果设置的启动线程数小于等于0,则单线程处理
threadNum = threadNum > 80 ? 80 : threadNum;// 限制线程最大启动数为80或81(余数线程)
threadNum = listSize < 2000 ? 1 : threadNum;// 如果数据小于2000条,则单线程处理。
int everyListNum = listSize / threadNum;// 每个线程处理数
int datadMon = listSize % threadNum;// 数据余数
int threadFullNum = threadNum + (datadMon > 0 ? 1 : 0);// 总共需要启动线程数
final CountDownLatch latch = new CountDownLatch(threadFullNum);// 同步辅助类
final ParallelComputingProcess<ListType, ParameterType> processShard = process;
final ParameterType object = obj;
for (int i = 0; i < threadNum; i++) {
int fromIndex = i * everyListNum;
int toIndex = ((i + 1) * everyListNum);
final List<ListType> listShard = list.subList(fromIndex, toIndex);
final int threadId = i;
Runnable t = new Runnable() {
@Override
public void run() {
job(threadId, listShard, processShard, object);
latch.countDown();// 计数减一
}
};
pool.execute(t);
}
if (datadMon > 0) {
int fromIndex = listSize - datadMon;
int toIndex = listSize;
final List<ListType> listShard = list.subList(fromIndex, toIndex);
final int threadId = threadNum;
Runnable t = new Runnable() {
@Override
public void run() {
job(threadId, listShard, processShard, object);
latch.countDown();// 计数减一
}
};
pool.execute(t);
}
logger.info(serviceName + "-processForThread START (thread count:" + threadFullNum + ",Sharding size:" + everyListNum + ",total count:" + listSize + ")");
try {
latch.await();// 等待子线程结束
} catch (InterruptedException e) {
logger.error(serviceName + "-processForThread ERROR!", e);
e.printStackTrace();
return false;
}
logger.info(serviceName + "-processForThread FINISH (thread count:" + threadFullNum + ",Sharding size:" + everyListNum + ",total count:" + listSize + ") use time " + (System.currentTimeMillis() - startTime) + "ms");
return true;
}
/** 并行执行任务 */
private void job(int threadId, List<ListType> list, ParallelComputingProcess<ListType, ParameterType> process, ParameterType obj) {
process.process(threadId, list, obj);
}
}
package cn.quantgroup.qgblservice.utils.parallel;
import java.util.List;
/**
* -----------------------------------------------------------------------------<br>
* 描述: 并行处理-业务执行接口 <br>
* 参数: ListType 需并行处理的List中的泛型 <br>
* ParameterType 业务处理时传入的参数类型
* 作者:yanhui.Hao <br>
* 时间:2020.01.15 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
public interface ParallelComputingProcess<ListType, ParameterType> {
/***
* 具体的业务实现在该方法中编写
* @param threadId 线程ID
* @param list 切分后的小List集合
* @param obj 外部传入数据,为业务实现提供数据支持
*/
public void process(int threadId, List<ListType> list, ParameterType obj);
}
package cn.quantgroup.qgblservice.utils.parallel;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* -----------------------------------------------------------------------------<br>
* 描述: 小例子-应用业务类 <br>
* 作者:yanhui.Hao <br>
* 时间:2020.01.15 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
public class ParallelDemo implements ParallelComputingProcess<Map<String,Object>,Boolean>{
private Logger logger = Logger.getLogger(ParallelDemo.class);
private final String log_inf = "ParallelTest";
//并行处理框架
private ParallelComputing<Map<String, Object>, Boolean> p = new ParallelComputing<Map<String,Object>,Boolean>("MyParallelTestName");
/** 当前服务器CPU个数 */
public static int SYS_CPU_COUNT = Runtime.getRuntime().availableProcessors();
public void runMain(){
//result 业务待处理数据
List<Map<String, Object>> result = new ArrayList<>();
Boolean optSecond = true;
p.processForThread(result, this, SYS_CPU_COUNT, optSecond);
//并行框架执行结果
long startProcess = System.currentTimeMillis();
if(!optSecond){//并行框架执行异常
logger.error("runMain() ERROR! >> process().");//加监控用
return;
}else{
//执行成功,覆盖执行时间
logger.info(log_inf+" >> to process() is OK, To write redis Utc is OK!");
}
logger.info(log_inf+" >> to process() End, cost:"+(System.currentTimeMillis()-startProcess)+"ms.");
}
@Override
public void process(int threadId, List<Map<String, Object>> list, Boolean optSecond) {
try {
int count1 = 0;
for (Map<String, Object> map : list) {
StringBuilder key1 = new StringBuilder();
key1.append(map.get("key1").toString());
StringBuilder value1 = new StringBuilder();
value1.append(map.get("value1").toString());
logger.debug("map:"+key1+"===="+value1);
count1++;
}
logger.info(log_inf+"key is vid CPU_"+threadId+">总条数:"+count1);
} catch (Exception e) {
optSecond = false;
logger.error(log_inf+"process() threadId:" + threadId + ",ERROR!", e);
}
}
}
package cn.quantgroup.qgblservice.utils.parallel.impl;
import cn.quantgroup.qgblservice.utils.parallel.ParallelComputingProcess;
import java.util.List;
/**
* -----------------------------------------------------------------------------<br>
* 描述: 小例子-应用业务类 <br>
* 作者:yanhui.Hao <br>
* 时间:2020.01.15 <br>
* 授权: (C) Copyright (c) 2017 <br>
* 公司: 北京众信利民信息技术有限公司 <br>
* -----------------------------------------------------------------------------
*/
public class ParallelComputingProcessImpl implements ParallelComputingProcess<String, String> {
@Override
public void process(int threadId, List<String> list, String obj) {
System.out.println(Thread.currentThread().getThreadGroup().getName() + "-" + Thread.currentThread().getName() + "-threadId[" + threadId + "],obj>>>" + obj + "----test>>" + list);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment