Commit 8dec1eea authored by 李健华's avatar 李健华

修改部分参数为配置

parent 4976870b
...@@ -31,3 +31,7 @@ build/ ...@@ -31,3 +31,7 @@ build/
### VS Code ### ### VS Code ###
.vscode/ .vscode/
#log
*.log
...@@ -18,8 +18,8 @@ import java.util.List; ...@@ -18,8 +18,8 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@RestController @RestController
@RequestMapping("/message") @RequestMapping("/v1/message")
public class IndexController { public class SendSocketMessageController {
@Resource @Resource
private CacheService cacheService; private CacheService cacheService;
...@@ -36,11 +36,11 @@ public class IndexController { ...@@ -36,11 +36,11 @@ public class IndexController {
msgAgreement.setToChannelId(channelId); msgAgreement.setToChannelId(channelId);
msgAgreement.setContent(data); msgAgreement.setContent(data);
if (ch != null) { if (ch != null) {
ch.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement))); ch.writeAndFlush(new TextWebSocketFrame(data));
return "success"; } else {
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
cacheService.push(msgAgreement);
} }
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
cacheService.push(msgAgreement);
} }
return "success"; return "success";
} }
......
...@@ -14,6 +14,7 @@ import io.netty.handler.codec.string.StringEncoder; ...@@ -14,6 +14,7 @@ import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -27,19 +28,24 @@ import java.util.concurrent.TimeUnit; ...@@ -27,19 +28,24 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor @RequiredArgsConstructor
public class ChannelInit extends ChannelInitializer<SocketChannel> { public class ChannelInit extends ChannelInitializer<SocketChannel> {
private Long nettyAllIdleTime;
private CacheService cacheService; private CacheService cacheService;
private WebsocketMessageHandler websocketHandler; private WebsocketMessageHandler websocketHandler;
public ChannelInit(CacheService cacheService, WebsocketMessageHandler websocketHandler) {
public ChannelInit(CacheService cacheService, WebsocketMessageHandler websocketHandler, Long nettyAllIdleTime) {
this.cacheService = cacheService; this.cacheService = cacheService;
this.websocketHandler = websocketHandler; this.websocketHandler = websocketHandler;
this.nettyAllIdleTime = nettyAllIdleTime;
} }
@Override @Override
protected void initChannel(SocketChannel channel) { protected void initChannel(SocketChannel channel) {
channel.pipeline() channel.pipeline()
// 心跳时间 // 心跳时间
.addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)) .addLast("idle", new IdleStateHandler(0, 0, nettyAllIdleTime, TimeUnit.SECONDS))
// 对http协议的支持. // 对http协议的支持.
.addLast(new HttpServerCodec()) .addLast(new HttpServerCodec())
// 对大数据流的支持 // 对大数据流的支持
......
...@@ -2,6 +2,7 @@ package com.netty.server.config; ...@@ -2,6 +2,7 @@ package com.netty.server.config;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.netty.server.model.MsgAgreement; import com.netty.server.model.MsgAgreement;
import com.netty.server.server.CacheService;
import com.netty.server.utils.CacheUtil; import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil; import com.netty.server.utils.MsgUtil;
import io.netty.channel.Channel; import io.netty.channel.Channel;
...@@ -13,6 +14,8 @@ import org.springframework.data.redis.core.StringRedisTemplate; ...@@ -13,6 +14,8 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* redis订阅消息处理实现类 * redis订阅消息处理实现类
...@@ -25,6 +28,10 @@ public class MsgReceiver extends MessageListenerAdapter { ...@@ -25,6 +28,10 @@ public class MsgReceiver extends MessageListenerAdapter {
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
@Resource
private CacheService cacheService;
/** /**
* 接收redis推送的消息如果当前服务连接的有此设备就推送消息 * 接收redis推送的消息如果当前服务连接的有此设备就推送消息
* *
...@@ -40,8 +47,9 @@ public class MsgReceiver extends MessageListenerAdapter { ...@@ -40,8 +47,9 @@ public class MsgReceiver extends MessageListenerAdapter {
String toChannelId = msgAgreement.getToChannelId(); String toChannelId = msgAgreement.getToChannelId();
Channel channel = CacheUtil.cacheChannel.get(toChannelId); Channel channel = CacheUtil.cacheChannel.get(toChannelId);
if (null == channel) { if (null == channel) {
cacheService.getRedisUtil().remove(toChannelId);
return; return;
} }
channel.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement))); channel.writeAndFlush(new TextWebSocketFrame(msgAgreement.getContent()));
} }
} }
...@@ -50,23 +50,23 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> ...@@ -50,23 +50,23 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame>
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) { if (msg instanceof FullHttpRequest) {
websocketHandler.handleHttpRequest(ctx, (FullHttpRequest) msg); websocketHandler.handleHttpRequest(ctx, (FullHttpRequest) msg);
log.debug("\n"); log.info("\n");
log.debug("客户端{}握手成功!", ctx.channel().id()); log.info("客户端{}握手成功!", ctx.channel().id());
} }
super.channelRead(ctx, msg); super.channelRead(ctx, msg);
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
log.debug("\n"); log.info("\n");
log.debug("channelId:" + ctx.channel().id()); log.info("channelId:" + ctx.channel().id());
websocketHandler.handleWebSocketFrame(ctx, frame); websocketHandler.handleWebSocketFrame(ctx, frame);
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
log.debug("\n"); log.info("\n");
log.debug("断开连接"); log.info("断开连接");
cacheService.getRedisUtil().remove(ctx.channel().id().toString()); cacheService.getRedisUtil().remove(ctx.channel().id().toString());
CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel()); CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());
// 释放缓存 // 释放缓存
...@@ -76,24 +76,23 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> ...@@ -76,24 +76,23 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame>
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("\n"); log.info("\n");
log.debug("成功建立连接,channelId:{}", ctx.channel().id()); log.info("成功建立连接,channelId:{}", ctx.channel().id());
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.debug("心跳事件时触发"); log.info("心跳事件时触发");
//判断evt是否是IdleStateEvent(用于触发用户事件,包含读空闲/写空闲/读写空闲) //判断evt是否是IdleStateEvent(用于触发用户事件,包含读空闲/写空闲/读写空闲)
if(evt instanceof IdleStateEvent){ if(evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt; IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if(idleStateEvent.state() == IdleState.READER_IDLE){ if(idleStateEvent.state() == IdleState.READER_IDLE){
log.debug("进入读空闲..."); log.info("进入读空闲...");
}else if(idleStateEvent.state() == IdleState.WRITER_IDLE){ }else if(idleStateEvent.state() == IdleState.WRITER_IDLE){
log.debug("进入写空闲..."); log.info("进入写空闲...");
}else if(idleStateEvent.state() == IdleState.ALL_IDLE){ }else if(idleStateEvent.state() == IdleState.ALL_IDLE){
log.debug("进入读写空闲..."); log.info("进入读写空闲...");
cacheService.getRedisUtil().remove(ctx.channel().id().toString()); cacheService.getRedisUtil().remove(ctx.channel().id().toString());
CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel()); CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());
Channel channel = ctx.channel(); Channel channel = ctx.channel();
......
...@@ -6,6 +6,7 @@ import com.netty.server.model.DeviceChannelInfo; ...@@ -6,6 +6,7 @@ import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.model.MsgAgreement; import com.netty.server.model.MsgAgreement;
import com.netty.server.utils.RedisUtil; import com.netty.server.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -20,6 +21,7 @@ public class CacheService { ...@@ -20,6 +21,7 @@ public class CacheService {
@Autowired @Autowired
private RedisUtil redisUtil; private RedisUtil redisUtil;
/** /**
* 将通道数据广播给指定的主题 主题格式为:message_pub+节点ip+节点端口 * 将通道数据广播给指定的主题 主题格式为:message_pub+节点ip+节点端口
* @param msgAgreement * @param msgAgreement
......
...@@ -63,6 +63,12 @@ public class TcpServer implements ITcpServer { ...@@ -63,6 +63,12 @@ public class TcpServer implements ITcpServer {
@Value("${netty.useEpoll:false}") @Value("${netty.useEpoll:false}")
private Boolean useEpoll; private Boolean useEpoll;
@Value("${netty.server.redis.key}")
private String nettyServer;
@Value("${netty.all.idle.time}")
private Long nettyAllIdleTime;
@Override @Override
public void start() throws Exception { public void start() throws Exception {
...@@ -83,7 +89,7 @@ public class TcpServer implements ITcpServer { ...@@ -83,7 +89,7 @@ public class TcpServer implements ITcpServer {
.channel(serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .channel(serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port)) .localAddress(new InetSocketAddress(port))
// 配置 编码器、解码器、业务处理 // 配置 编码器、解码器、业务处理
.childHandler(new ChannelInit(cacheService, websocketHandler)) .childHandler(new ChannelInit(cacheService, websocketHandler, nettyAllIdleTime))
// tcp缓冲区 // tcp缓冲区
.option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_BACKLOG, 128)
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
...@@ -99,7 +105,7 @@ public class TcpServer implements ITcpServer { ...@@ -99,7 +105,7 @@ public class TcpServer implements ITcpServer {
CacheUtil.executorService.submit(() -> { CacheUtil.executorService.submit(() -> {
try { try {
while (channel.isActive()) { while (channel.isActive()) {
redisTemplate.opsForValue().set("newnettyServer" + ip, JSON.toJSONString(new ServerInfo(ip, port, date)), 5 * 1000, TimeUnit.MILLISECONDS); redisTemplate.opsForValue().set(nettyServer + ip, JSON.toJSONString(new ServerInfo(ip, port, date)), 5 * 1000, TimeUnit.MILLISECONDS);
Thread.sleep(3 * 1000); Thread.sleep(3 * 1000);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -3,6 +3,7 @@ package com.netty.server.utils; ...@@ -3,6 +3,7 @@ package com.netty.server.utils;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.netty.server.model.DeviceChannelInfo; import com.netty.server.model.DeviceChannelInfo;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -20,27 +21,33 @@ public class RedisUtil { ...@@ -20,27 +21,33 @@ public class RedisUtil {
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
@Value("${netty.deviceIds.redis.key}")
private String nettyDeviceIds;
@Value("${netty.channelRelations.redis.key}")
private String nettyChannelRelations;
/** /**
* 向redis存入设备连接信息 * 向redis存入设备连接信息
* *
* @param deviceChannelInfo * @param deviceChannelInfo
*/ */
public void pushObj(DeviceChannelInfo deviceChannelInfo) { public void pushObj(DeviceChannelInfo deviceChannelInfo) {
redisTemplate.opsForHash().put("newdeviceIds", deviceChannelInfo.getChannelId(), JSON.toJSONString(deviceChannelInfo)); redisTemplate.opsForHash().put(nettyDeviceIds, deviceChannelInfo.getChannelId(), JSON.toJSONString(deviceChannelInfo));
} }
public void pushChannelRelation(String channelId, String relationInfo){ public void pushChannelRelation(String channelId, String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo); Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationInfo);
String channelIds = ""; String channelIds = "";
if (null != channelIdObj) { if (null != channelIdObj) {
channelIds = channelIdObj.toString() + ","; channelIds = channelIdObj.toString() + ",";
} }
channelIds = channelIds + channelId; channelIds = channelIds + channelId;
redisTemplate.opsForHash().put("channelRelations", relationInfo, channelIds); redisTemplate.opsForHash().put(nettyChannelRelations, relationInfo, channelIds);
} }
public String getChannelRelation(String relationInfo){ public String getChannelRelation(String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo); Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationInfo);
if (null != channelIdObj) { if (null != channelIdObj) {
return channelIdObj.toString(); return channelIdObj.toString();
} }
...@@ -53,7 +60,7 @@ public class RedisUtil { ...@@ -53,7 +60,7 @@ public class RedisUtil {
* @return * @return
*/ */
public List<DeviceChannelInfo> popList() { public List<DeviceChannelInfo> popList() {
List<Object> values = redisTemplate.opsForHash().values("newdeviceIds"); List<Object> values = redisTemplate.opsForHash().values(nettyDeviceIds);
if (null == values) { if (null == values) {
return new ArrayList<>(); return new ArrayList<>();
} }
...@@ -71,7 +78,7 @@ public class RedisUtil { ...@@ -71,7 +78,7 @@ public class RedisUtil {
* @return * @return
*/ */
public DeviceChannelInfo selectByChannel(String channelId) { public DeviceChannelInfo selectByChannel(String channelId) {
Object deviceIds = redisTemplate.opsForHash().get("newdeviceIds", channelId); Object deviceIds = redisTemplate.opsForHash().get(nettyDeviceIds, channelId);
if (deviceIds == null) { if (deviceIds == null) {
return null; return null;
} }
...@@ -88,20 +95,20 @@ public class RedisUtil { ...@@ -88,20 +95,20 @@ public class RedisUtil {
if (deviceChannelInfo != null) { if (deviceChannelInfo != null) {
removeChannelRelations(channelId, deviceChannelInfo.getRelationInfo()); removeChannelRelations(channelId, deviceChannelInfo.getRelationInfo());
} }
redisTemplate.opsForHash().delete("newdeviceIds", channelId); redisTemplate.opsForHash().delete(nettyDeviceIds, channelId);
} }
private void removeChannelRelations(String channelId, String relationInfo) { private void removeChannelRelations(String channelId, String relationInfo) {
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo); Object channelIdObj = redisTemplate.opsForHash().get(nettyChannelRelations, relationInfo);
if (null != channelIdObj) { if (null != channelIdObj) {
String channelIds = ""; String channelIds = "";
List<String> channelIdlist = new ArrayList<>(Arrays.asList(channelIdObj.toString().split(","))); List<String> channelIdlist = new ArrayList<>(Arrays.asList(channelIdObj.toString().split(",")));
channelIdlist.remove(channelId); channelIdlist.remove(channelId);
if (channelIdlist.size() == 0) { if (channelIdlist.size() == 0) {
redisTemplate.opsForHash().delete("channelRelations", relationInfo); redisTemplate.opsForHash().delete(nettyChannelRelations, relationInfo);
} else { } else {
channelIds = String.join(",", channelIdlist); channelIds = String.join(",", channelIdlist);
redisTemplate.opsForHash().put("channelRelations", relationInfo, channelIds); redisTemplate.opsForHash().put(nettyChannelRelations, relationInfo, channelIds);
} }
} }
} }
...@@ -110,6 +117,6 @@ public class RedisUtil { ...@@ -110,6 +117,6 @@ public class RedisUtil {
* 清空设备信息 * 清空设备信息
*/ */
public void clear() { public void clear() {
redisTemplate.delete("newdeviceIds"); redisTemplate.delete(nettyDeviceIds);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment