Commit 65db26d3 authored by 于桐's avatar 于桐

用户分群数据上报

parent fc3ffd1a
Pipeline #945 failed with stages
...@@ -344,7 +344,7 @@ ...@@ -344,7 +344,7 @@
<dependency> <dependency>
<groupId>cn.qg.ec.data-stream-sdk</groupId> <groupId>cn.qg.ec.data-stream-sdk</groupId>
<artifactId>data-stream-sdk</artifactId> <artifactId>data-stream-sdk</artifactId>
<version>1.0.9-SNAPSHOT</version> <version>1.1.12-SNAPSHOT</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
......
package cn.quantgroup.xyqb.event;
import java.io.UnsupportedEncodingException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import cn.qg.ec.kinesis.EnvironmentConfig;
import cn.qg.ec.kinesis.KinesisProducerClient;
import cn.qg.ec.model.user.base.UserBaseDetailEvent;
import cn.quantgroup.tech.util.TechEnvironment;
import cn.quantgroup.xyqb.entity.User;
import cn.quantgroup.xyqb.exception.PushUserToLkbException;
import cn.quantgroup.xyqb.model.UserRegisterParam;
import cn.quantgroup.xyqb.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
/**
* 注册成功之后,用户分群数据上报
* http://confluence.quantgroup.cn/pages/viewpage.action?pageId=34832018
* http://confluence.quantgroup.cn/pages/viewpage.action?pageId=41784208
* http://confluence.quantgroup.cn/pages/viewpage.action?pageId=34818640
* @author yutong
*/
@Slf4j
@Component
public class KinesisRegisteredEventListener implements ApplicationListener<RegisterEvent> {
@Override
public void onApplicationEvent(RegisterEvent event) {
UserRegisterParam userRegisterParam = event.getUserRegisterParam();
User user = userRegisterParam.getUser();
log.info("[KinesisRegistered] 用户分群数据上报准备, userId:{}, registeredFrom:{}", user.getId(), user.getRegisteredFrom());
sendRecord(user);
}
private void sendRecord(User user) {
// EnvironmentConfig.DEV 环境变量配置
KinesisProducerClient kinesisProducerClient = new KinesisProducerClient(
TechEnvironment.isPro() ? EnvironmentConfig.PROD : EnvironmentConfig.DEV);
// 用户登录事件发送
// UserBaseDetailEvent regEvent = UserBaseDetailEvent
// .builder()
// .channel(1L)
// .hashPhoneNo("*********")
// .businessEventBaseInfo(BusinessEventBaseInfo
// .builder()
// .channel("***")
// .deviceId("***")
// .ip("***")
// .userUuid("***")
// .build())
// .subEventType(UserBaseDetailEvent.SubEventType.REGISTER)
// .build();
UserBaseDetailEvent regEvent = UserBaseDetailEvent
.builder()
.userId(String.valueOf(user.getId()))
.userUuid(user.getUuid())
.subEventType(UserBaseDetailEvent.SubEventType.REGISTER)
.build();
regEvent.setRegisterChannel(user.getRegisteredFrom().intValue());
regEvent.setRegisterTime(user.getCreatedAt().getTime());
try {
ListenableFuture<UserRecordResult> listenableFuture = kinesisProducerClient.SendRecord(regEvent);
Futures.addCallback(listenableFuture, Callback);
log.info("[KinesisRegistered] 用户分群数据上报发送, regEvent:{}", JsonUtil.toJson(regEvent));
} catch (UnsupportedEncodingException | JsonProcessingException e) {
log.error("[KinesisRegistered]用户分群数据上报出错, e:{}", e);
throw new PushUserToLkbException("用户分群数据上报出错");
}
}
// 异步响应结果 如果发送失败,发送方需要进行重试发送
FutureCallback<UserRecordResult> Callback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
/* Analyze and respond to the failure */
log.error(t.getMessage(), t);
log.error("[KinesisRegistered] 用户分群数据上报失败, Throwable:{}", ExceptionUtils.getStackTrace(t));
}
@Override
public void onSuccess(UserRecordResult result) {
log.info("[KinesisRegistered] 用户分群数据上报成功, result:{}", result.toString());
}
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment