Commit 56c61c2e authored by 鹿朋's avatar 鹿朋

新的资金路由添加重试

parent 98d74518
...@@ -7,6 +7,7 @@ import com.quantgroup.asset.distribution.model.response.GlobalResponse; ...@@ -7,6 +7,7 @@ import com.quantgroup.asset.distribution.model.response.GlobalResponse;
import com.quantgroup.asset.distribution.service.newrule.CoreFilter; import com.quantgroup.asset.distribution.service.newrule.CoreFilter;
import com.quantgroup.asset.distribution.service.newrule.pojo.funds.FundCallBackParam; import com.quantgroup.asset.distribution.service.newrule.pojo.funds.FundCallBackParam;
import com.quantgroup.asset.distribution.service.newrule.third.AuditResponce; import com.quantgroup.asset.distribution.service.newrule.third.AuditResponce;
import com.quantgroup.asset.distribution.util.UUIDUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
...@@ -33,6 +34,7 @@ public class NewAssertController { ...@@ -33,6 +34,7 @@ public class NewAssertController {
public GlobalResponse assetsHandler(AuditResponce auditResponce) { public GlobalResponse assetsHandler(AuditResponce auditResponce) {
log.info("风控通知审核结果:{}", JSONObject.toJSONString(auditResponce)); log.info("风控通知审核结果:{}", JSONObject.toJSONString(auditResponce));
try { try {
auditResponce.setAssetNo(UUIDUtil.getAssetNo());
coreFilter.coreHandle(auditResponce); coreFilter.coreHandle(auditResponce);
} catch (Exception e) { } catch (Exception e) {
return GlobalResponse.error(e.getMessage()); return GlobalResponse.error(e.getMessage());
......
...@@ -4,6 +4,8 @@ import java.sql.Timestamp; ...@@ -4,6 +4,8 @@ import java.sql.Timestamp;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.quantgroup.asset.distribution.service.newrule.CoreFilter;
import com.quantgroup.asset.distribution.service.newrule.third.AuditResponce;
import com.quantgroup.asset.distribution.service.redis.IRedisService; import com.quantgroup.asset.distribution.service.redis.IRedisService;
import com.quantgroup.asset.distribution.util.DateUtil; import com.quantgroup.asset.distribution.util.DateUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
...@@ -23,6 +25,8 @@ import com.quantgroup.asset.distribution.service.jpa.repository.IDistributeFailL ...@@ -23,6 +25,8 @@ import com.quantgroup.asset.distribution.service.jpa.repository.IDistributeFailL
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/** /**
* 重试Controller * 重试Controller
* *
...@@ -40,14 +44,16 @@ public class RetryController { ...@@ -40,14 +44,16 @@ public class RetryController {
private IAssetService assetService; private IAssetService assetService;
@Autowired @Autowired
private IRedisService<String> redisService; private IRedisService<String> redisService;
@Resource
private CoreFilter coreFilter;
@RequestMapping("/orders") @RequestMapping("/orders")
public GlobalResponse retryOrders(String startTime, String endTime) { public GlobalResponse retryOrders(String startTime, String endTime) {
try { try {
Page<DistributeFailLog> pageableFailAuditLog = distributeFailLogRepository.findByCreatedAtBetweenAndEnable( Page<DistributeFailLog> pageableFailAuditLog = distributeFailLogRepository.findByCreatedAtBetweenAndEnableAndCallType(
Timestamp.valueOf(startTime), Timestamp.valueOf(endTime), true, new PageRequest(0, 1000)); Timestamp.valueOf(startTime), Timestamp.valueOf(endTime), true,0, new PageRequest(0, 1000));
List<DistributeFailLog> distributeFailLogList = pageableFailAuditLog.getContent(); List<DistributeFailLog> distributeFailLogList = pageableFailAuditLog.getContent();
retryDistributeFailLogList(distributeFailLogList); retryDistributeFailLogListOld(distributeFailLogList);
} catch (Exception e) { } catch (Exception e) {
log.error("资产分发失败订单重新分发异常!", e); log.error("资产分发失败订单重新分发异常!", e);
} }
...@@ -59,43 +65,88 @@ public class RetryController { ...@@ -59,43 +65,88 @@ public class RetryController {
public void repeatDBAudit() { public void repeatDBAudit() {
String lockKey = "ASSET_DISTRIBUTION:REPEAT_DB_ATTRIBUTE:@88JMS"; String lockKey = "ASSET_DISTRIBUTION:REPEAT_DB_ATTRIBUTE:@88JMS";
try { try {
log.info("数据库资产分发失败订单重新分发开始"); boolean b = redisService.setIfAbsent(lockKey, "1", 1, TimeUnit.DAYS);
boolean b = redisService.setIfAbsent(lockKey, "1", 1, TimeUnit.DAYS); if (!b) {
if (!b) { return; } return;
String startTime = DateUtil.getStrFormatDate("yyyy-MM-dd HH:mm:ss", -7); }
String endTime = DateUtil.timestamp2Str(System.currentTimeMillis() + "", "yyyy-MM-dd HH:mm:ss"); old();
int i = 1; now();
while (i > 0 && !CheckController.isOffLine()) {
Page<DistributeFailLog> pageableFailDistributeLog = distributeFailLogRepository.findByCreatedAtBetweenAndEnable(Timestamp.valueOf(startTime),
Timestamp.valueOf(endTime), true, new PageRequest(0, 50));
List<DistributeFailLog> distributeFailLogList = pageableFailDistributeLog.getContent();
retryDistributeFailLogList(distributeFailLogList);
TimeUnit.SECONDS.sleep(30);
++i;
if (distributeFailLogList == null || distributeFailLogList.size() < 50) {
i = -1;
}
}
log.info("数据库资产分发失败订单重新分发完成!");
} catch (Exception e) { } catch (Exception e) {
log.error("数据库资产分发失败订单重新分发出现异常!", e); log.error("数据库资产分发失败订单重新分发出现异常!", e);
} finally { } finally {
redisService.del(lockKey); redisService.del(lockKey);
} }
} }
@Async
public void now() throws Exception {
log.info("数据库new资产分发失败订单重新分发开始");
try {
String startTime = DateUtil.getStrFormatDate("yyyy-MM-dd HH:mm:ss", -7);
String endTime = DateUtil.timestamp2Str(System.currentTimeMillis() + "", "yyyy-MM-dd HH:mm:ss");
int i = 1;
while (i > 0 && !CheckController.isOffLine()) {
Page<DistributeFailLog> pageableFailDistributeLog = distributeFailLogRepository.findByCreatedAtBetweenAndEnableAndCallType(Timestamp.valueOf(startTime),
Timestamp.valueOf(endTime), true, 1,new PageRequest(0, 50));
List<DistributeFailLog> distributeFailLogList = pageableFailDistributeLog.getContent();
retryDistributeFailLogListNew(distributeFailLogList);
TimeUnit.SECONDS.sleep(30);
++i;
if (distributeFailLogList == null || distributeFailLogList.size() < 50) {
i = -1;
}
}
log.info("数据库new资产分发失败订单重新分发完成!");
} catch (Exception e){
throw e;
}
}
@Async
public void old() throws Exception {
log.info("数据库old资产分发失败订单重新分发开始");
try {
String startTime = DateUtil.getStrFormatDate("yyyy-MM-dd HH:mm:ss", -7);
String endTime = DateUtil.timestamp2Str(System.currentTimeMillis() + "", "yyyy-MM-dd HH:mm:ss");
int i = 1;
while (i > 0 && !CheckController.isOffLine()) {
Page<DistributeFailLog> pageableFailDistributeLog = distributeFailLogRepository.findByCreatedAtBetweenAndEnableAndCallType(Timestamp.valueOf(startTime),
Timestamp.valueOf(endTime), true, 0,new PageRequest(0, 50));
List<DistributeFailLog> distributeFailLogList = pageableFailDistributeLog.getContent();
retryDistributeFailLogListOld(distributeFailLogList);
TimeUnit.SECONDS.sleep(30);
++i;
if (distributeFailLogList == null || distributeFailLogList.size() < 50) {
i = -1;
}
}
log.info("数据库old资产分发失败订单重新分发完成!");
} catch (Exception e){
throw e;
}
}
private void retryDistributeFailLogListOld(List<DistributeFailLog> distributeFailLogList) {
if (CollectionUtils.isNotEmpty(distributeFailLogList)) {
log.info("old分发失败查询数量为:{}", distributeFailLogList.size());
distributeFailLogList.forEach(distributeFailLog -> {
AssetForm assetForm = JSON.parseObject(distributeFailLog.getContext(), AssetForm.class);
// 重新审核
assetService.assetsIn(assetForm);
distributeFailLog.setEnable(false);
distributeFailLogRepository.save(distributeFailLog);
});
}
}
private void retryDistributeFailLogList(List<DistributeFailLog> distributeFailLogList) { private void retryDistributeFailLogListNew(List<DistributeFailLog> distributeFailLogList) {
if (CollectionUtils.isNotEmpty(distributeFailLogList)) { if (CollectionUtils.isNotEmpty(distributeFailLogList)) {
log.info("分发失败查询数量为:{}", distributeFailLogList.size()); log.info("new分发失败查询数量为:{}", distributeFailLogList.size());
distributeFailLogList.forEach(distributeFailLog -> { distributeFailLogList.forEach(distributeFailLog -> {
AssetForm assetForm = JSON.parseObject(distributeFailLog.getContext(), AssetForm.class); AuditResponce assetForm = JSON.parseObject(distributeFailLog.getContext(), AuditResponce.class);
// 重新审核 // 重新审核
assetService.assetsIn(assetForm); coreFilter.coreHandle(assetForm);
distributeFailLog.setEnable(false);
distributeFailLog.setEnable(false); distributeFailLogRepository.save(distributeFailLog);
distributeFailLogRepository.save(distributeFailLog); });
}); }
} }
}
} }
package com.quantgroup.asset.distribution.service.distribute; package com.quantgroup.asset.distribution.service.distribute;
import com.quantgroup.asset.distribution.model.form.AssetForm; import com.quantgroup.asset.distribution.model.form.AssetForm;
import com.quantgroup.asset.distribution.service.newrule.third.AuditResponce;
/** /**
* 分发失败日志处理Service * 分发失败日志处理Service
...@@ -15,4 +16,10 @@ public interface IDistributeFailLogService { ...@@ -15,4 +16,10 @@ public interface IDistributeFailLogService {
* @param failReason * @param failReason
*/ */
public void saveDistributeFailLog(AssetForm assetForm, String failReason); public void saveDistributeFailLog(AssetForm assetForm, String failReason);
/**
* 保存失败订单new
* @param assetForm
* @param failReason
*/
public void saveDistributeFailLogNew(AuditResponce assetForm, String failReason);
} }
package com.quantgroup.asset.distribution.service.distribute.impl; package com.quantgroup.asset.distribution.service.distribute.impl;
import com.quantgroup.asset.distribution.service.newrule.third.AuditResponce;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -33,9 +34,29 @@ public class DistributeFailLogServiceImpl implements IDistributeFailLogService{ ...@@ -33,9 +34,29 @@ public class DistributeFailLogServiceImpl implements IDistributeFailLogService{
distributeFailLog.setContext(JSON.toJSONString(assetForm)); distributeFailLog.setContext(JSON.toJSONString(assetForm));
distributeFailLog.setFailReason(failReason); distributeFailLog.setFailReason(failReason);
distributeFailLog.setEnable(true); distributeFailLog.setEnable(true);
distributeFailLog.setCallType(0);
distributeFailLogRepository.save(distributeFailLog); distributeFailLogRepository.save(distributeFailLog);
} catch (Exception e) { } catch (Exception e) {
log.error("资产分发失败订单保存异常, asset : {}", JSON.toJSONString(assetForm), e); log.error("资产分发失败订单保存异常, asset : {}", JSON.toJSONString(assetForm), e);
} }
} }
@Async
@Override
public void saveDistributeFailLogNew(AuditResponce assetForm, String failReason) {
try {
DistributeFailLog distributeFailLog = new DistributeFailLog();
distributeFailLog.setBizChannel(assetForm.getBizChannel());
distributeFailLog.setAssetNo(assetForm.getAssetNo());
distributeFailLog.setBizNo(assetForm.getBizNo());
distributeFailLog.setBizType(Integer.parseInt(assetForm.getBizType()));
distributeFailLog.setUuid(assetForm.getUuid());
distributeFailLog.setContext(JSON.toJSONString(assetForm));
distributeFailLog.setFailReason(failReason);
distributeFailLog.setEnable(true);
distributeFailLog.setCallType(1);
distributeFailLogRepository.save(distributeFailLog);
} catch (Exception e) {
log.error("资产分发失败订单保存异常, asset : {}", JSON.toJSONString(assetForm), e);
}
}
} }
...@@ -57,7 +57,10 @@ public class DistributeFailLog implements Serializable{ ...@@ -57,7 +57,10 @@ public class DistributeFailLog implements Serializable{
@Column(name = "enable") @Column(name = "enable")
private Boolean enable; private Boolean enable;
@Column(name = "call_type")
private Integer callType = 0;
@Column(name = "created_at") @Column(name = "created_at")
private Timestamp createdAt; private Timestamp createdAt;
......
...@@ -20,8 +20,9 @@ public interface IDistributeFailLogRepository extends JpaRepository<DistributeFa ...@@ -20,8 +20,9 @@ public interface IDistributeFailLogRepository extends JpaRepository<DistributeFa
* @param start * @param start
* @param end * @param end
* @param enable * @param enable
* @param callType
* @param pageable * @param pageable
* @return * @return
*/ */
public Page<DistributeFailLog> findByCreatedAtBetweenAndEnable(Timestamp start, Timestamp end, Boolean enable,Pageable pageable); public Page<DistributeFailLog> findByCreatedAtBetweenAndEnableAndCallType(Timestamp start, Timestamp end, Boolean enable,Integer callType ,Pageable pageable);
} }
...@@ -11,6 +11,7 @@ import com.quantgroup.asset.distribution.exception.QGException; ...@@ -11,6 +11,7 @@ import com.quantgroup.asset.distribution.exception.QGException;
import com.quantgroup.asset.distribution.exception.QGExceptionType; import com.quantgroup.asset.distribution.exception.QGExceptionType;
import com.quantgroup.asset.distribution.service.alarm.IAlarmService; import com.quantgroup.asset.distribution.service.alarm.IAlarmService;
import com.quantgroup.asset.distribution.service.alarm.impl.MarkdownMessage; import com.quantgroup.asset.distribution.service.alarm.impl.MarkdownMessage;
import com.quantgroup.asset.distribution.service.distribute.IDistributeFailLogService;
import com.quantgroup.asset.distribution.service.httpclient.IHttpService; import com.quantgroup.asset.distribution.service.httpclient.IHttpService;
import com.quantgroup.asset.distribution.service.jpa.entity.*; import com.quantgroup.asset.distribution.service.jpa.entity.*;
import com.quantgroup.asset.distribution.service.jpa.repository.IChannelRuleRepository; import com.quantgroup.asset.distribution.service.jpa.repository.IChannelRuleRepository;
...@@ -30,8 +31,11 @@ import com.quantgroup.asset.distribution.util.IdCardUtil; ...@@ -30,8 +31,11 @@ import com.quantgroup.asset.distribution.util.IdCardUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -74,53 +78,64 @@ public class CoreFilter { ...@@ -74,53 +78,64 @@ public class CoreFilter {
@Resource @Resource
private IRoutingRecordRepository routingRecordRepository; private IRoutingRecordRepository routingRecordRepository;
@Autowired
private IDistributeFailLogService distributeFailLogService;
@Async
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void coreHandle(AuditResponce auditResponce){ public void coreHandle(AuditResponce auditResponce){
List<ChannelRuleEntity> channelRuleEntityList = channelRuleRepository.getByChannelId(Long.parseLong(auditResponce.getBizChannel())); try {
if (CollectionUtils.isEmpty(channelRuleEntityList)){ List<ChannelRuleEntity> channelRuleEntityList = channelRuleRepository.getByChannelId(Long.parseLong(auditResponce.getBizChannel()));
throw new RuntimeException("渠道配置产品集不存在,运营人员配置有问题"); if (CollectionUtils.isEmpty(channelRuleEntityList)){
} throw new RuntimeException("渠道配置产品集不存在,运营人员配置有问题");
UserAssociationBean associationBean = userCenterService.getUserAssociationBean(auditResponce.getUuid());
List<FinanceProduct> financeProducts = new ArrayList<>();
List<RoutingRecordVO> routingRecordList = new ArrayList<>();
for (ChannelRuleEntity channelRuleEntity : channelRuleEntityList) {
FundProductEntity fundProduct = fundProductRepository.getByIdEquals(channelRuleEntity.getFundProductId());
if (fundProduct == null){
log.error("这种情况不可能出现,需要看下原因 {}",JSON.toJSONString(channelRuleEntity));
continue;
} }
if (currentVerify(fundProduct,auditResponce,associationBean,channelRuleEntity)){ UserAssociationBean associationBean = userCenterService.getUserAssociationBean(auditResponce.getUuid());
financeProducts.add(build(auditResponce,fundProduct,channelRuleEntity)); List<FinanceProduct> financeProducts = new ArrayList<>();
List<RoutingRecordVO> routingRecordList = new ArrayList<>();
for (ChannelRuleEntity channelRuleEntity : channelRuleEntityList) {
FundProductEntity fundProduct = fundProductRepository.getByIdEquals(channelRuleEntity.getFundProductId());
if (fundProduct == null){
log.error("这种情况不可能出现,需要看下原因 {}",JSON.toJSONString(channelRuleEntity));
continue;
}
if (currentVerify(fundProduct,auditResponce,associationBean,channelRuleEntity)){
financeProducts.add(build(auditResponce,fundProduct,channelRuleEntity));
}
routingRecordList.add(generateRouteRecord(channelRuleEntity));
} }
routingRecordList.add(generateRouteRecord(channelRuleEntity)); FilterResult result = new FilterResult();
} result.setFinanceProducts(JSON.toJSONString(financeProducts));
if (CollectionUtils.isEmpty(financeProducts)) { Map<String,String> response = httpService.postHasResponse(auditResponce.getCallbackUrl(), result.get(auditResponce));
throw new RuntimeException("筛选完成后没有可用的配置"); if(MapUtils.isEmpty(response) || !(HttpStatus.OK.value() + "").equals(response.get("statusCode"))
} || (!"success".equals(response.get("response")) && (!JSONObject.isValidObject(response.get("response"))
FilterResult result = new FilterResult(); || 0 != JSONObject.parseObject(response.get("response")).getInteger("code").intValue()))) {
result.setFinanceProducts(JSON.toJSONString(financeProducts)); MarkdownMessage markdownMessage = new MarkdownMessage();
Map<String,String> response = httpService.postHasResponse(auditResponce.getCallbackUrl(), result.get(auditResponce)); markdownMessage.setTitle("通知业务系统审核结果失败告警");
if(MapUtils.isEmpty(response) || !(HttpStatus.OK.value() + "").equals(response.get("statusCode")) markdownMessage.add(String.format(AUDIT_FAIL_TEXT,auditResponce.getUuid()));
|| (!"success".equals(response.get("response")) && (!JSONObject.isValidObject(response.get("response")) alarmService.sendMsgWithAlert(markdownMessage);
|| 0 != JSONObject.parseObject(response.get("response")).getInteger("code").intValue()))) { log.error("通知业务系统审核结果失败 {}",JSON.toJSONString(auditResponce));
MarkdownMessage markdownMessage = new MarkdownMessage(); throw new QGException(QGExceptionType.NOTIFY_BIZ_ERROR);
markdownMessage.setTitle("通知业务系统审核结果失败告警"); } else {
markdownMessage.add(String.format(AUDIT_FAIL_TEXT,auditResponce.getUuid())); RoutingRecordEntity routingRecordEntity = new RoutingRecordEntity();
alarmService.sendMsgWithAlert(markdownMessage); routingRecordEntity.setCreditNo(auditResponce.getBizNo());
log.error("通知业务系统审核结果失败 {}",JSON.toJSONString(auditResponce)); routingRecordEntity.setPhone(associationBean.getPhoneNo());
throw new QGException(QGExceptionType.NOTIFY_BIZ_ERROR); routingRecordEntity.setUuid(auditResponce.getUuid());
} else { if (CollectionUtils.isEmpty(financeProducts)){
RoutingRecordEntity routingRecordEntity = new RoutingRecordEntity(); routingRecordEntity.setStatus(new Byte("0"));
routingRecordEntity.setCreditNo(auditResponce.getBizNo()); }else {
routingRecordEntity.setPhone(associationBean.getPhoneNo()); routingRecordEntity.setStatus(new Byte("1"));
routingRecordEntity.setUuid(auditResponce.getUuid()); routingRecordEntity.setRoutedResult(JSON.toJSONString(routingRecordList));
if (CollectionUtils.isEmpty(financeProducts)){ }
routingRecordEntity.setStatus(new Byte("0")); routingRecordRepository.save(routingRecordEntity);
}else {
routingRecordEntity.setStatus(new Byte("1"));
routingRecordEntity.setRoutedResult(JSON.toJSONString(routingRecordList));
} }
routingRecordRepository.save(routingRecordEntity); } catch (Exception e) {
log.error("资金路由分发出现异常, uuid : {}, bizChannel : {}, bizType : {}, bizNo : {} ", auditResponce.getUuid(),
auditResponce.getBizChannel(), auditResponce.getBizType());
distributeFailLogService.saveDistributeFailLogNew(auditResponce, ExceptionUtils.getMessage(e));
alarmService.dingtalkAlarm("Warn", "资金路由分发出现异常", "bizChannel : " + auditResponce.getBizChannel()
+ " , bizType : " + auditResponce.getBizType() + " , bizNo : " + auditResponce.getBizNo()
+ " , uuid : " + auditResponce.getUuid()
+ " , 错误信息 : "+ ExceptionUtils.getMessage(e));
} }
} }
......
...@@ -124,4 +124,6 @@ public class AuditResponce { ...@@ -124,4 +124,6 @@ public class AuditResponce {
private String userTag; private String userTag;
private String assetNo;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment