Commit 46955237 authored by 李健华's avatar 李健华

Merge branch 'feature/bussiness-msg-push-20220914' into 'master'

增加请求stms 验证token并获取对应的供应商code

See merge request head_group/websocketserver!1
parents 240d7578 abb334a0
......@@ -35,3 +35,5 @@ build/
#log
*.log
*.gz
config-cache/*
......@@ -4,7 +4,6 @@ import com.netty.server.handler.ChannelHandlerPool;
import com.netty.server.model.MsgAgreement;
import com.netty.server.server.CacheService;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
......@@ -24,10 +23,10 @@ public class SendSocketMessageController {
private CacheService cacheService;
@PostMapping("/send")
public String send2User(@RequestParam(value = "token") String token, @RequestParam(value = "data") String data) {
String channelIds = cacheService.getRedisUtil().getChannelRelation(token);
public String send2User(@RequestParam(value = "relationKey") String relationKey, @RequestParam(value = "data") String data) {
String channelIds = cacheService.getRedisUtil().getChannelRelation(relationKey);
if (null == channelIds) {
return "商户" + token + "不在线!";
return "商户" + relationKey + "不在线!";
}
String[] channelIdlist = channelIds.split(",");
for (String channelId : channelIdlist) {
......
......@@ -9,11 +9,15 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
/**
* @author qiding
*/
@ComponentScan(basePackages = {"com.netty.server.*"})
@EnableFeignClients(basePackages = {"com.netty.*"})
@SpringBootApplication
@RequiredArgsConstructor
@EnableApolloConfig
......@@ -22,6 +26,8 @@ public class NettyServerApplication implements ApplicationRunner {
private final TcpServer tcpServer;
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
}
......
......@@ -3,18 +3,15 @@ package com.netty.server.channel;
import com.netty.server.handler.MessageHandler;
import com.netty.server.handler.WebsocketMessageHandler;
import com.netty.server.server.CacheService;
import com.netty.server.server.IStmsServer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
......@@ -34,11 +31,14 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
private CacheService cacheService;
private WebsocketMessageHandler websocketHandler;
private IStmsServer stmsServer;
public ChannelInit(CacheService cacheService, WebsocketMessageHandler websocketHandler, Long nettyAllIdleTime) {
public ChannelInit(CacheService cacheService, WebsocketMessageHandler websocketHandler, Long nettyAllIdleTime, IStmsServer stmsServer) {
this.cacheService = cacheService;
this.websocketHandler = websocketHandler;
this.nettyAllIdleTime = nettyAllIdleTime;
this.stmsServer = stmsServer;
}
@Override
......@@ -55,7 +55,8 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
// 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据
.addLast(new WebSocketFrameAggregator(1024 * 62))
// 添加消息处理器
.addLast(new MessageHandler(cacheService, websocketHandler));
.addLast(new MessageHandler(cacheService, websocketHandler, stmsServer));
// // 添加消息处理器
// .addLast("messageHandler", messageHandler);
}
......
package com.netty.server.config;
import com.google.common.collect.Lists;
import feign.Contract;
import feign.Logger;
import feign.codec.Decoder;
import feign.codec.Encoder;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
import org.springframework.cloud.openfeign.support.SpringDecoder;
import org.springframework.cloud.openfeign.support.SpringEncoder;
import org.springframework.cloud.openfeign.support.SpringMvcContract;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.HttpMessageConverter;
import javax.annotation.PostConstruct;
import java.util.List;
@Configuration
public class FeignConfig {
// @Bean
// public Encoder encoder(){
// return new MyEncoder();
// }
List<HttpMessageConverter<?>> converters = Lists.newArrayList();
@PostConstruct
public void init(){
MessageConverter wxConverter = new MessageConverter();
HttpMessageConverters httpMessageConverters = new HttpMessageConverters(wxConverter);
this.converters = httpMessageConverters.getConverters();
}
@Bean
Logger.Level feignLevel() {
return Logger.Level.FULL;
}
@Bean
public Contract contract() {
return new SpringMvcContract();
}
@Bean
public Encoder springEncoder(@Autowired ObjectFactory<HttpMessageConverters> messageConverters) {
return new SpringEncoder(messageConverters);
}
/**
* 注入新的Decoder Feign将自动 替换解决微信返回参数为[text/plain] 无法转化为json
* 微信虽然接口返回为JSON格式数据但却将数据表示为了[text/plain]导致Feign没有采用JSON解析器来解析,从而无法将响应数据转化为对应的POJO对象;
*/
@Bean
public Decoder feignDecoder() {
MessageConverter wxConverter = new MessageConverter();
ObjectFactory<HttpMessageConverters> objectFactory = () -> new HttpMessageConverters(wxConverter);
return new SpringDecoder(objectFactory);
}
// @Bean
// public Decoder feignDecoder() {
//
// /**
// * 改动原因: 压测时,feign 会有性能问题
// *
// * addDefaultConverters 为true, feign 执行decoder 每次都会重新初始化 默认的 decoder
// * 所以:提前把 自定义的convert 和 默认的convert 处理好后,不再处理默认添加
// *
// * https://blog.csdn.net/weixin_35762553/article/details/112087522
// */
// ObjectFactory objectFactory = () -> new HttpMessageConverters(false, converters);
// return new ResponseEntityDecoder(new SpringDecoder(objectFactory));
// }
//
}
\ No newline at end of file
package com.netty.server.config;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义解析器,支持微信返回text/plain数据
*
* @author axq
*/
public class MessageConverter extends MappingJackson2HttpMessageConverter {
public MessageConverter() {
List<MediaType> mediaTypes = new ArrayList<>();
mediaTypes.add(MediaType.TEXT_PLAIN);
mediaTypes.add(MediaType.APPLICATION_JSON);
mediaTypes.add(MediaType.APPLICATION_JSON_UTF8);
mediaTypes.add(MediaType.APPLICATION_FORM_URLENCODED);
setSupportedMediaTypes(mediaTypes);
}
}
package com.netty.server.config;
import com.alibaba.fastjson.JSONObject;
import feign.RequestTemplate;
import feign.codec.EncodeException;
import feign.codec.Encoder;
import org.springframework.http.MediaType;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Type;
import java.net.URLEncoder;
public class MyEncoder implements Encoder {
@Override
public void encode(Object o, Type type, RequestTemplate rt) throws EncodeException {
StringBuffer sb = new StringBuffer();
try {
Class clazz = Thread.currentThread().getContextClassLoader().loadClass(type.getTypeName());
Field[] fields =clazz.getDeclaredFields();
String oStr = JSONObject.toJSONString(o);
for(Field field : fields){
if(sb.length() > 0){
sb.append("&");
}
field.setAccessible(true);
Object fieldValue = field.get(JSONObject.parseObject(oStr,clazz));
if(fieldValue != null){
sb.append(URLEncoder.encode(field.getName(),"UTF-8"))
.append("=")
.append(URLEncoder.encode(field.get(JSONObject.parseObject(oStr,clazz)).toString()));
}
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
rt.header("Content-Type", MediaType.APPLICATION_FORM_URLENCODED_VALUE);
rt.body(sb.toString());
}
}
\ No newline at end of file
package com.netty.server.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
/**
*/
@Configuration
public class ResTemplateConfig {
@Autowired
private RestTemplateBuilder builder;
@Bean
public RestTemplate restTemplate() {
return builder.build();
}
}
package com.netty.server.handler;
import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.server.CacheService;
import com.netty.server.server.IStmsServer;
import com.netty.server.store.ChannelStore;
import com.netty.server.store.WebSocketSession;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.NetWorkUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 消息处理,单例启动
......@@ -41,9 +32,10 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame>
private WebsocketMessageHandler websocketHandler;
private CacheService cacheService;
public MessageHandler(CacheService cacheService, WebsocketMessageHandler websocketHandler) {
public MessageHandler(CacheService cacheService, WebsocketMessageHandler websocketHandler, IStmsServer stmsServer) {
this.cacheService = cacheService;
this.websocketHandler = new WebsocketMessageHandler(cacheService);
this.websocketHandler = new WebsocketMessageHandler(cacheService, stmsServer);
}
@Override
......
......@@ -4,9 +4,10 @@ import com.alibaba.fastjson.JSON;
import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.model.MsgAgreement;
import com.netty.server.server.CacheService;
import com.netty.server.server.IStmsServer;
import com.netty.server.store.WebSocketSession;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil;
import com.netty.server.utils.Md5Utils;
import com.netty.server.utils.NetWorkUtils;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
......@@ -22,8 +23,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
......@@ -42,9 +41,11 @@ public class WebsocketMessageHandler {
private final String USER = "user";
private final AttributeKey<String> key = AttributeKey.valueOf(USER);
private CacheService cacheService;
private IStmsServer stmsServer;
public WebsocketMessageHandler(CacheService cacheService) {
public WebsocketMessageHandler(CacheService cacheService, IStmsServer stmsServer) {
this.cacheService = cacheService;
this.stmsServer = stmsServer;
}
@Autowired
......@@ -60,11 +61,20 @@ public class WebsocketMessageHandler {
String uri = request.uri();
ConcurrentMap<String, String> paramMap = getUrlParams(uri);
System.out.println("接收到的参数是:" + JSON.toJSONString(paramMap));
// this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
// ctx.close();
// 加校验
String relationInfo = paramMap.get("channelId") + "/" + paramMap.get("bussinessId");
online(relationInfo, ctx.channel());
if (paramMap.getOrDefault("token", "").equals("") || paramMap.getOrDefault("channelId", "").equals("")) {
this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer()));
ctx.close();
}
// String supplierCode = stmsServer.getStmsTokenInfo(paramMap.get("token"));
String supplierCode = "100";
if (null == supplierCode) {
this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
ctx.close();
}
String mdString = paramMap.get("channelId") + "-" + supplierCode;
String relationKey = Md5Utils.MD5Encode(mdString);
online(relationKey, ctx.channel());
// 参数分别是 (ws地址,子协议,是否扩展,最大frame长度)
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, true, 5 * 1024 * 1024);
WebSocketServerHandshaker handShaker = factory.newHandshaker(request);
......@@ -81,7 +91,7 @@ public class WebsocketMessageHandler {
.ip(NetWorkUtils.getHost())
.port(channel.localAddress().getPort())
.linkDate(new Date())
.relationInfo(relationInfo)
.relationInfo(relationKey)
.build();
cacheService.getRedisUtil().pushObj(deviceChannelInfo);
CacheUtil.cacheChannel.put(channel.id().toString(), channel);
......@@ -106,14 +116,19 @@ public class WebsocketMessageHandler {
}
// 文本接收和回复
if (frame instanceof TextWebSocketFrame) {
log.debug("收到消息:\n{}", ((TextWebSocketFrame) frame).text());
String msgContent = ((TextWebSocketFrame) frame).text();
if ("HeartBeat".equals(msgContent)) {
log.debug("心跳消息");
return;
}
log.debug("收到消息:\n{}", msgContent);
//判断接收消息用户是否在本服务端
Channel channel = CacheUtil.cacheChannel.get(ctx.channel().id().toString());
MsgAgreement msgAgreement = new MsgAgreement();
msgAgreement.setToChannelId(ctx.channel().id().toString());
msgAgreement.setContent(((TextWebSocketFrame) frame).text());
msgAgreement.setContent(msgContent);
if (null != channel) {
channel.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement)));
channel.writeAndFlush(new TextWebSocketFrame(msgContent));
return;
}
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
......@@ -183,13 +198,14 @@ public class WebsocketMessageHandler {
* 上线一个用户
*
* @param channel
* @param token
* @param relationKey
*/
private void online(String token, Channel channel) {
private void online(String relationKey, Channel channel) {
// 保存channel通道的附带信息,以用户的uid为标识
// channel.attr(key).set(token);
// ChannelHandlerPool.channelGroup.add(channel);
cacheService.getRedisUtil().pushChannelRelation(channel.id().toString(), token);
System.out.println(relationKey);
cacheService.getRedisUtil().pushChannelRelation(channel.id().toString(), relationKey);
}
private static ConcurrentMap<String, String> getUrlParams(String url) {
......
package com.netty.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* stms 返回值
*
* @author jie
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StmsRe {
private Integer code;
private String msg;
private String detail;
private Object data;
}
package com.netty.server.response;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
/**
* API 调用返回的结果对象
*
* @author rong yang
*/
@SuppressWarnings("serial")
@Data
public class Result<T> implements java.io.Serializable {
/**
* 0成功,永远默认成功
*/
private String code = "0000";
/**
* 业务错误码
*/
private String businessCode = ResultCode.SUCCESS.getCode();
/**
* 消息
**/
private String msg;
/**
* 详细消息
**/
private String detail;
/**
* 数据
**/
private T data;
private Boolean success;
private String traceId;
/**
* @param resultCode
*/
protected void setResultCode(ResultCode resultCode) {
this.businessCode = resultCode.getCode();
this.msg = resultCode.getMsg();
}
public Result() {
this.traceId = getTraceId();
}
public Result(String code, String businessCode, String msg, String detail, T data, boolean success) {
this.code = code;
this.businessCode = businessCode;
this.msg = msg;
this.detail = detail;
this.data = data;
this.success = success;
}
public static Result success() {
Result result = new Result();
result.setResultCode(ResultCode.SUCCESS);
result.setData(new Object());
return result;
}
public static Result supplyChainSuccess() {
Result result = new Result();
result.setCode(ResultCode.SUCCESS.getCode());
result.setBusinessCode(ResultCode.SUCCESS.getCode());
result.setSuccess(null);
return result;
}
public Result<T> buildSuccess(T data) {
this.setResultCode(ResultCode.SUCCESS);
this.setData(data);
return this;
}
public static <T> Result<T> success(T data) {
Result result = new Result();
result.setResultCode(ResultCode.SUCCESS);
result.setData(data);
return result;
}
public static Result failure() {
Result result = new Result();
result.setResultCode(ResultCode.FAILURE);
return result;
}
public static Result failure(String msg) {
Result result = new Result();
result.setBusinessCode(ResultCode.FAILURE.getCode());
result.setMsg(msg);
return result;
}
public static Result failure(ResultCode resultCode) {
Result result = new Result();
result.setResultCode(resultCode);
return result;
}
public static Result failure(ResultCode resultCode, String detail) {
Result result = new Result();
result.setResultCode(resultCode);
result.setDetail(detail);
return result;
}
public boolean isSuccess() {
return ("0000".equals(code) || "0".equals(code));
}
public static String getTraceId(){
String traceId = MDC.get("X-B3-TraceId");
String spanId = MDC.get("X-B3-SpanId");
if (StringUtils.isNotEmpty(traceId) && StringUtils.isNotEmpty(spanId)) {
StringBuffer sb = new StringBuffer()
.append("[").append(traceId).append("-").append(spanId).append("]");
return sb.toString();
}
return "";
}
}
package com.netty.server.response;
import lombok.extern.slf4j.Slf4j;
/**
* API调用结果状态码定义
*
* @author rong yang
*/
@Slf4j
public enum ResultCode {
/**
* 响应[消息中心]专用
*/
SUCCESS("0000", "成功"),
FAILURE("0001", "失败"),
;
private String code;
private String msg;
private ResultCode(String code, String msg) {
this.code = code;
this.msg = msg;
}
public String getCode() {
return code;
}
public String getMsg() {
return msg;
}
}
package com.netty.server.server;
/**
* smts 接口
*
* @author qiding
*/
public interface IStmsServer {
String getStmsTokenInfo(String token);
}
package com.netty.server.server;
import feign.Headers;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import com.netty.server.response.Result;
@FeignClient(name = "stmsRemoteService", url = "${stms.http}")
public interface StmsRemoteService {
@Headers("Content-Type:application/json")
@PostMapping(value = "/v2/oauth/currentuserinfo")
Result<String> getCurrentuserinfo(@RequestHeader("Access-Token") String accessToken);
}
......@@ -57,6 +57,9 @@ public class TcpServer implements ITcpServer {
@Autowired
private WebsocketMessageHandler websocketHandler;
@Autowired
private IStmsServer stmsServer;
@Value("${netty.port}")
private int port;
......@@ -89,7 +92,7 @@ public class TcpServer implements ITcpServer {
.channel(serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
// 配置 编码器、解码器、业务处理
.childHandler(new ChannelInit(cacheService, websocketHandler, nettyAllIdleTime))
.childHandler(new ChannelInit(cacheService, websocketHandler, nettyAllIdleTime, stmsServer))
// tcp缓冲区
.option(ChannelOption.SO_BACKLOG, 128)
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
......
package com.netty.server.server.impl;
import com.alibaba.fastjson.JSONObject;
import com.netty.server.response.Result;
import com.netty.server.server.IStmsServer;
import com.netty.server.server.StmsRemoteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
@Slf4j
@Component
public class StmsServerImpl implements IStmsServer {
@Value("${stms.http}")
private String stms;
@Resource
private RestTemplate restTemplate;
@Resource
private StmsRemoteService stmsRemoteService;
@Override
public String getStmsTokenInfo(String token) {
try {
Result<String> result = stmsRemoteService.getCurrentuserinfo(token);
if (result.isSuccess()) {
JSONObject jsonData = JSONObject.parseObject(result.getData().toString());
return jsonData.getString("supplierCode");
}
} catch (Exception e) {
log.error("Exception--{}", e.getMessage());
}
return null;
}
}
package com.netty.server.utils;
import lombok.extern.slf4j.Slf4j;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* Created by tums on 2015/11/30.
*/
@Slf4j
public final class Md5Utils {
public static String build(String content) {
MessageDigest messageDigest;
try {
messageDigest = MessageDigest
.getInstance("md5");
} catch (NoSuchAlgorithmException e) {
log.error(e.getMessage(), e);
return null;
}
messageDigest.update(content.getBytes());
byte[] domain = messageDigest.digest();
StringBuilder md5StrBuff = new StringBuilder();
// converting domain to String
for (int i = 0; i < domain.length; i++) {
if (Integer.toHexString(0xFF & domain[i]).length() == 1) {
md5StrBuff.append("0").append(
Integer.toHexString(0xFF & domain[i]));
} else {
md5StrBuff.append(Integer.toHexString(0xFF & domain[i]));
}
}
return md5StrBuff.toString();
}
public static String MD5Encode(String origin) {
String ret = null;
try {
ret = new String(origin);
MessageDigest md = MessageDigest.getInstance("MD5");
ret = byteArrayToHexString(md.digest(ret.getBytes()));
} catch (Exception e) {
}
return ret;
}
private final static String[] hexDigits = { "0", "1", "2", "3", "4", "5",
"6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
private static String byteArrayToHexString(byte[] bytes) {
StringBuffer sb = new StringBuffer();
for (byte b : bytes) {
sb.append(byteToHexString(b));
}
return sb.toString();
}
private static String byteToHexString(byte b) {
int n = b;
if (n < 0)
n = 256 + n;
int d1 = n / 16;
int d2 = n % 16;
return hexDigits[d1] + hexDigits[d2];
}
}
......@@ -36,18 +36,18 @@ public class RedisUtil {
redisTemplate.opsForHash().put(nettyDeviceIds, deviceChannelInfo.getChannelId(), JSON.toJSONString(deviceChannelInfo));
}
public void pushChannelRelation(String channelId, String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationInfo);
public void pushChannelRelation(String channelId, String relationKey){
Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationKey);
String channelIds = "";
if (null != channelIdObj) {
channelIds = channelIdObj.toString() + ",";
}
channelIds = channelIds + channelId;
redisTemplate.opsForHash().put(nettyChannelRelations, relationInfo, channelIds);
redisTemplate.opsForHash().put(nettyChannelRelations, relationKey, channelIds);
}
public String getChannelRelation(String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationInfo);
public String getChannelRelation(String relationKey){
Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationKey);
if (null != channelIdObj) {
return channelIdObj.toString();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment