Commit 3ca53eff authored by liwenbin's avatar liwenbin

资产分发记录状态改为资金状态

parent b27ee4a3
......@@ -4,6 +4,7 @@ import cn.quantgroup.tech.brave.service.ITechRabbitBuilder;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Stopwatch;
import com.quantgroup.asset.distribution.enums.funding.FundingResult;
import com.quantgroup.asset.distribution.service.distribute.IAssetDistributeService;
import com.quantgroup.asset.distribution.service.funding.IAidFundRouteRecordService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
......@@ -33,6 +34,8 @@ public class ConsumerConfig implements RabbitListenerConfigurer {
@Autowired
private IAidFundRouteRecordService iAidFundRouteRecordService;
@Autowired
private IAssetDistributeService distributeService;
@Resource
private ITechRabbitBuilder techRabbitBuilder;
......@@ -74,6 +77,7 @@ public class ConsumerConfig implements RabbitListenerConfigurer {
log.info("助贷资金路由有效MQ消息接收, 消息内容 : {} ",ms);
String applyNo = jo.getJSONObject("data").getString("applyNo");
iAidFundRouteRecordService.fundingResultNotity(applyNo,FundingResult.fromCode(noticeType));
distributeService.receiveFundingResult(applyNo, FundingResult.fromCode(noticeType));
log.info("助贷资金路由有效MQ消息处理结束, bizNo : {} ,noticeType : {} , 耗时 : {} ",applyNo,noticeType,stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
}
// 采用手动应答模式, 手动确认应答更为安全稳定
......
package com.quantgroup.asset.distribution.constant;
/**
* 状态常量
* 资产分发记录状态标识
* @author liwenbin
*
*/
public class StatusConstants {
public static final Integer SUCCESS = 1;
public static final int SUCCESS = 1;
public static final Integer FAIL = 0;
public static final int FAIL = 0;
public static final int WAIT = 2;
}
......@@ -31,7 +31,10 @@ public enum QGExceptionType {
RULE_CALC_UNKNOW_ERROR(2023, "规则计算出现未知异常, expression : %s"),
RULE_CALC_UNKNOW_RESULT(2024, "规则判断出现未知结果,请联系管理人员, expression : %s"),
UNKNOW_RULE_TYPE(2025, "未知的规则类型, %s"),
NO_DISTRIBUTE_NODE(2026, "未找到分发节点, uuid : %s, assetNo %s, records : %s");
NO_DISTRIBUTE_NODE(2026, "未找到分发节点, uuid : %s, assetNo %s, records : %s"),
NOTIFY_FUND_SERVER_ERROR(2041, "通知资金系统失败, uuid : %s, bizNo : %s, assetNo : %s"),
NOT_FOUND_FUND_SERVER_RESULT_BIZNO(2042, "未找到资金结果通知订单, bizNo : %s, status : %s");
......
......@@ -69,7 +69,7 @@ public class AssetServiceImpl implements IAssetService{
// 把资产基础属性值放入data
data = addAssetAttributeToData(asset, data);
// 资产分发
assetDistributeService.distribute(assetForm, asset, data, DistributeLogoConstants.FIRST);
assetDistributeService.distribute(assetForm, asset, data);
log.info("资产分发完成, uuid : {}, bizNo : {}, assetNo : {}, bizChannel : {}, 耗时 : {}", assetForm.getUuid(),
assetForm.getBizNo(), assetForm.getAssetNo(), assetForm.getBizChannel(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
} catch (QGException qe) {
......
......@@ -2,6 +2,7 @@ package com.quantgroup.asset.distribution.service.distribute;
import java.util.List;
import com.quantgroup.asset.distribution.enums.funding.FundingResult;
import com.quantgroup.asset.distribution.model.entity.DistributeRecord;
import com.quantgroup.asset.distribution.service.jpa.entity.Asset;
import com.quantgroup.asset.distribution.service.jpa.entity.AssetDistributeRecord;
......@@ -19,7 +20,7 @@ public interface IAssetDistributeRecordService {
* @param distributeRecord
* @param distributeStatus
*/
public void saveDistributeRecord(Asset asset, DistributeRecord distributeRecord, Boolean distributeStatus, int ruleType);
public void saveDistributeRecord(Asset asset, DistributeRecord distributeRecord, int distributeStatus, int ruleType);
/**
* 获取资产分发记录
......@@ -27,4 +28,10 @@ public interface IAssetDistributeRecordService {
* @return
*/
public List<AssetDistributeRecord> getDistributeRecord(String assetNo);
/**
* @param bizNo
* @return
*/
public void updateAssetDistributeStatus(String bizNo, int status);
}
......@@ -2,6 +2,7 @@ package com.quantgroup.asset.distribution.service.distribute;
import java.util.Map;
import com.quantgroup.asset.distribution.enums.funding.FundingResult;
import com.quantgroup.asset.distribution.model.form.AssetForm;
import com.quantgroup.asset.distribution.service.jpa.entity.Asset;
......@@ -19,5 +20,12 @@ public interface IAssetDistributeService {
* @param startIndex 开始分发节点
* @param distributeLogo 1:初次分发或异常重试, 2:分发失败重新分发(预留)
*/
public void distribute(AssetForm assetForm, Asset asset, Map<String, Object> data, int distributeLogo);
public void distribute(AssetForm assetForm, Asset asset, Map<String, Object> data);
/**
* 接收fundingResult消息
* @param bizNo
* @param fundingResult
*/
public void receiveFundingResult(String bizNo, FundingResult fundingResult);
}
......@@ -10,6 +10,8 @@ import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.quantgroup.asset.distribution.constant.StatusConstants;
import com.quantgroup.asset.distribution.exception.QGException;
import com.quantgroup.asset.distribution.exception.QGExceptionType;
import com.quantgroup.asset.distribution.model.entity.DistributeRecord;
import com.quantgroup.asset.distribution.service.distribute.IAssetDistributeRecordService;
import com.quantgroup.asset.distribution.service.jpa.entity.Asset;
......@@ -33,8 +35,11 @@ public class AssetDistributeRecordServiceImpl implements IAssetDistributeRecordS
@Autowired
private IRedisService<Integer> redisService;
/**
* 不能改成异步,防止save的时候record被清空
*/
@Override
public void saveDistributeRecord(Asset asset, DistributeRecord distributeRecord, Boolean distributeStatus, int ruleType) {
public void saveDistributeRecord(Asset asset, DistributeRecord distributeRecord, int distributeStatus, int ruleType) {
try {
AssetDistributeRecord assetDistributeRecord = new AssetDistributeRecord();
assetDistributeRecord.setAssetNo(asset.getAssetNo());
......@@ -45,7 +50,7 @@ public class AssetDistributeRecordServiceImpl implements IAssetDistributeRecordS
assetDistributeRecord.setBizNo(asset.getBizNo());
assetDistributeRecord.setAssetDistributeTravel(JSON.toJSONString(distributeRecord));
assetDistributeRecord.setAssetDistributeTarget(ruleType);
assetDistributeRecord.setAssetDistributeStatus(distributeStatus == true ? StatusConstants.SUCCESS : StatusConstants.FAIL);
assetDistributeRecord.setAssetDistributeStatus(distributeStatus);
assetDistributeRecord.setEnable(true);
assetDistributeRecordRepository.save(assetDistributeRecord);
// 完成一次分发,清除分发记录
......@@ -59,4 +64,16 @@ public class AssetDistributeRecordServiceImpl implements IAssetDistributeRecordS
public List<AssetDistributeRecord> getDistributeRecord(String assetNo) {
return assetDistributeRecordRepository.findByAssetNo(assetNo);
}
@Override
public void updateAssetDistributeStatus(String bizNo, int status) {
AssetDistributeRecord assetDistributeRecord = assetDistributeRecordRepository.findByBizNoOrderByCreatedAtDescLimitOne(bizNo);
if (assetDistributeRecord == null) {
log.info("资产分发记录更改状态未找到订单, bizNo : {}, status : {}", bizNo, status);
throw new QGException(QGExceptionType.NOT_FOUND_FUND_SERVER_RESULT_BIZNO, bizNo, status);
} else {
assetDistributeRecord.setAssetDistributeStatus(status);
assetDistributeRecordRepository.save(assetDistributeRecord);
}
}
}
package com.quantgroup.asset.distribution.service.distribute.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -9,7 +8,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
......@@ -17,14 +15,16 @@ import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import com.quantgroup.asset.distribution.constant.DistributeConstants;
import com.quantgroup.asset.distribution.constant.RedisKeyConstants;
import com.quantgroup.asset.distribution.constant.StatusConstants;
import com.quantgroup.asset.distribution.enums.funding.FundingResult;
import com.quantgroup.asset.distribution.exception.QGException;
import com.quantgroup.asset.distribution.exception.QGExceptionType;
import com.quantgroup.asset.distribution.model.entity.DistributeRecord;
import com.quantgroup.asset.distribution.model.form.AssetForm;
import com.quantgroup.asset.distribution.model.response.GlobalResponse;
import com.quantgroup.asset.distribution.service.alarm.IAlarmService;
import com.quantgroup.asset.distribution.service.distribute.IAssetDistributeRecordService;
import com.quantgroup.asset.distribution.service.distribute.IAssetDistributeRuleConfigService;
import com.quantgroup.asset.distribution.service.distribute.IAssetDistributeService;
......@@ -64,18 +64,20 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
private INotifyService notifyService;
@Autowired
private IAssetDistributeRecordService assetDistributeRecordService;
@Autowired
private IAlarmService alarmService;
/**
* 分发
*/
@Override
public void distribute(AssetForm assetForm, Asset asset, Map<String, Object> data, int distributeLogo) {
public void distribute(AssetForm assetForm, Asset asset, Map<String, Object> data) {
if (isDistribute) {
List<AssetDistributeRuleConfig> assetDistributeRuleConfigList = assetDistributeRuleConfigService.getAllRuleConfigOrderByPriorityAsc();
// 检查规则配置
checkDistributeRuleConfig(assetDistributeRuleConfigList);
// 获取该资产所有已经尝试分发过的类型
Set<Integer> hasDistributedTypeSet = getAllDistributeRecordType(asset.getAssetNo(), distributeLogo);
Set<Integer> hasDistributedTypeSet = getAllDistributeRecordType(asset.getAssetNo());
boolean success = false;
DistributeRecord record = new DistributeRecord(data);
for (AssetDistributeRuleConfig ruleConfig : assetDistributeRuleConfigList) {
......@@ -85,20 +87,20 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
String el = ruleConfig.getAssetDistributeRuleEl();
boolean valid = ruleService.valid(el, data);
record.addRecords(ruleConfig.getId(), el, ruleType, ruleConfig.getAssetDistributeRuleName(), valid);
// 保存节点尝试记录
saveAssetAttemptDistributeNode(asset.getAssetNo(), ruleType);
if (valid) {
boolean distributeStatus = beginDistribute(assetForm, asset, ruleType);
int distributeStatus = beginDistribute(assetForm, asset, ruleType);
assetDistributeRecordService.saveDistributeRecord(asset, record, distributeStatus, ruleType);
// 分发成功
if (distributeStatus) {
if (distributeStatus == StatusConstants.SUCCESS || distributeStatus == StatusConstants.WAIT) {
log.info("用户执行分发节点, uuid : {}, bizNo : {}, assetNo : {}, bizChannel : {}, ruleId : {}, ruleName : {}, distributeStatus : {}",
assetForm.getUuid(), assetForm.getBizNo(), assetForm.getAssetNo(), assetForm.getBizChannel(), ruleConfig.getId(), ruleConfig.getAssetDistributeRuleName(),
distributeStatus);
success = true;
break;
}
}
// 保存节点尝试记录
saveAssetAttemptDistributeNode(asset.getAssetNo(), ruleType);
if (success) { break; }
}
if (!success) {
// 如果一个分配节点都没找到,报错
......@@ -115,19 +117,19 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
* 开始进行分发
* @param assetForm
* @param ruleType
* @return
* @return 1:成功,2:失败, 3:分配中
*/
public boolean beginDistribute(AssetForm assetForm, Asset asset, int ruleType) {
public int beginDistribute(AssetForm assetForm, Asset asset, int ruleType) {
switch (ruleType) {
case DistributeConstants.RuleType.FUND_ROUTE :
notifyService.notifyFundServer(assetForm);
return true;
return StatusConstants.WAIT;
case DistributeConstants.RuleType.AID_FUND_ROUTE:
GlobalResponse response = aidFundRouteService.aidFundRoute(assetForm, asset.getUserLoanType());
return response.getCode() == 0;
return response.getCode() == 0 ? StatusConstants.SUCCESS : StatusConstants.FAIL;
case DistributeConstants.RuleType.DIVERSION:
notifyService.notifyFundServer(assetForm);
return true;
return StatusConstants.WAIT;
default :
throw new QGException(QGExceptionType.UNKNOW_RULE_TYPE, ruleType);
}
......@@ -153,12 +155,8 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
* @param assetNo
* @return
*/
public Set<Integer> getAllDistributeRecordType(String assetNo, int distributeLogo) {
public Set<Integer> getAllDistributeRecordType(String assetNo) {
Set<Integer> sets = new HashSet<>();
if (distributeLogo == 1) {
// 用户初次分发或者异常重试直接返回空
return sets;
}
List<Integer> list = redisService.getList(RedisKeyConstants.DISTRIBUTE_FAIL_TYPE_RECORD + assetNo);
if (list == null || list.size() == 0) {
// 如果缓存没有去查库,把库里执行过的都拿出来
......@@ -174,6 +172,11 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
}
}
sets.addAll(list);
if (sets.size() != list.size()) {
list = list.stream().distinct().collect(Collectors.toList());
redisService.del(RedisKeyConstants.DISTRIBUTE_FAIL_TYPE_RECORD + assetNo);
redisService.setListEx(RedisKeyConstants.DISTRIBUTE_FAIL_TYPE_RECORD + assetNo, list, 3, TimeUnit.DAYS);
}
return sets;
}
......@@ -183,4 +186,24 @@ public class AssetDistributeServiceImpl implements IAssetDistributeService{
public void saveAssetAttemptDistributeNode(String assetNo, Integer ruleType) {
redisService.rightPushEx(RedisKeyConstants.DISTRIBUTE_FAIL_TYPE_RECORD + assetNo, ruleType, 3, TimeUnit.DAYS);
}
/**
* 接收fundingResult消息
*/
@Override
public void receiveFundingResult(String bizNo, FundingResult fundingResult) {
try {
// 1、更改分配记录状态
assetDistributeRecordService.updateAssetDistributeStatus(bizNo, fundingResult == FundingResult.FUAD_ASSIGN_SUCC ? StatusConstants.SUCCESS : StatusConstants.FAIL);
// 2、重新进行分发, 目前还没有,等接了助贷资金路由以后再增加
} catch (QGException qe) {
log.error("资产分发接收资金结果处理出现错误 : {}, bizNo : {}, fundingResult : {}", qe.qgExceptionType.code + "->" + qe.detail, bizNo, fundingResult.getCode()) ;
alarmService.dingtalkAlarm("Warn", "资产分发接收资金结果处理出现错误", "bizNo : " + bizNo + " , fundingResult : " + fundingResult.getCode() +
" , 错误信息 : " + qe.qgExceptionType.code + "->" + qe.detail);
} catch (Exception e) {
log.error("资产分发接收资金结果处理异常, bizNo : {}, fundingResult : {}", bizNo, fundingResult.getCode(), e);
alarmService.dingtalkAlarm("Warn", "资产分发接收资金结果处理出现异常", "bizNo : " + bizNo + " , fundingResult : " + fundingResult.getCode() +
" , 错误信息 : 未知错误");
}
}
}
......@@ -3,6 +3,7 @@ package com.quantgroup.asset.distribution.service.jpa.repository;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import com.quantgroup.asset.distribution.service.jpa.entity.AssetDistributeRecord;
......@@ -14,4 +15,7 @@ import com.quantgroup.asset.distribution.service.jpa.entity.AssetDistributeRecor
public interface IAssetDistributeRecordRepository extends JpaRepository<AssetDistributeRecord, Long>{
public List<AssetDistributeRecord> findByAssetNo(String assetNo);
@Query(value = "select * from asset_distribute_record where biz_no = ?1 and asset_distribute_status = 2 order by created_at desc limit 1", nativeQuery = true)
public AssetDistributeRecord findByBizNoOrderByCreatedAtDescLimitOne(String bizNo);
}
......@@ -9,6 +9,8 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.quantgroup.asset.distribution.exception.QGException;
import com.quantgroup.asset.distribution.exception.QGExceptionType;
import com.quantgroup.asset.distribution.model.form.AssetForm;
import com.quantgroup.asset.distribution.service.alarm.IAlarmService;
import com.quantgroup.asset.distribution.service.distribute.IDistributeFailLogService;
......@@ -26,15 +28,9 @@ public class NotifyServiceImpl implements INotifyService{
@Autowired
private IHttpService httpService;
@Autowired
private IDistributeFailLogService distributeFailLogService;
@Autowired
private IAlarmService alarmService;
@Async
@Override
public void notifyFundServer(AssetForm assetForm) {
try {
if (isDebug) { return; }
log.info("通知资金系统结果开始, uuid : {}, bizNo : {}, assetNo : {}, callbackUrl : {}, assetForm : {}", assetForm.getUuid(), assetForm.getBizNo(), assetForm.getAssetNo(), assetForm.getCallbackUrl(), JSON.toJSONString(assetForm));
if (StringUtils.isEmpty(assetForm.getCallbackUrl())) {
......@@ -44,13 +40,7 @@ public class NotifyServiceImpl implements INotifyService{
Map<String, String> response = httpService.postHasResponse(assetForm.getCallbackUrl(), assetForm.transToNotifyMap());
log.info("通知资金系统结果结束, uuid : {}, bizNo : {}, assetNo : {}, callbackUrl : {}, assetForm : {}, response : {}", assetForm.getUuid(), assetForm.getBizNo(), assetForm.getAssetNo(), assetForm.getCallbackUrl(), JSON.toJSONString(assetForm), JSON.toJSONString(response));
if(response==null || response.size()==0 || !"200".equals(response.get("statusCode")) || "error".equals(response.get("response"))) {
distributeFailLogService.saveDistributeFailLog(assetForm, "通知资金系统失败.");
alarmService.dingtalkAlarm("Warn", "资产入库分发通知资金系统失败", "bizChannel : " + assetForm.getBizChannel()
+ " , bizType : " + assetForm.getBizType() + " , bizNo : " + assetForm.getBizNo()
+ " , assetNo : " + assetForm.getAssetNo() + " , uuid : " + assetForm.getUuid());
}
} catch (Exception e) {
log.info("通知资金系统结果异常, uuid : {}, bizNo : {}, assetNo : {}, callbackUrl : {}, assetForm : {}", assetForm.getUuid(), assetForm.getBizNo(), assetForm.getAssetNo(), assetForm.getCallbackUrl(), JSON.toJSONString(assetForm), e);
throw new QGException(QGExceptionType.NOTIFY_FUND_SERVER_ERROR, assetForm.getUuid(), assetForm.getBizNo(), assetForm.getAssetNo());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment