Commit 4842d2a9 authored by 李健华's avatar 李健华

Initial commit

parents
Pipeline #1769 failed with stages
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.netty</groupId>
<artifactId>server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>websocket-server</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<netty-all.version>4.1.75.Final</netty-all.version>
<fastjson.version>1.2.80</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加redis依赖模块 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
<resource>
<directory>${project.build.directory}/generated-resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
# 基于netty的websocket服务器
## 简介
- netty是jboss提供的一个java开源框架,netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可用性的网络服务器和客户端程序。
- 本项目是基于Netty搭建的基础项目模板。
## 环境
| # | 环境 | 版本 | 说明 |
| ---- | ---- | -------------- | --------------- |
| 1 | JDK | openJdk 11.0.8 | 建议JDK11及以上 |
## 项目结构
```
├──channel 管道
├──config 服务核心配置
├──handler 消息处理器
├──server 服务配置
├──store 频道存储
├──NettyServerApplication.java 主启动类
```
package com.netty.server.Controller;
import com.netty.server.handler.ChannelHandlerPool;
import com.netty.server.model.MsgAgreement;
import com.netty.server.server.CacheService;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/message")
public class IndexController {
@Resource
private CacheService cacheService;
@PostMapping("/send")
public String send2User(@RequestParam(value = "token") String token, @RequestParam(value = "data") String data) {
String channelIds = cacheService.getRedisUtil().getChannelRelation(token);
if (null == channelIds) {
return "商户" + token + "不在线!";
}
String[] channelIdlist = channelIds.split(",");
for (String channelId : channelIdlist) {
Channel ch = CacheUtil.cacheChannel.get(channelId);
MsgAgreement msgAgreement = new MsgAgreement();
msgAgreement.setToChannelId(channelId);
msgAgreement.setContent(data);
if (ch != null) {
ch.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement)));
return "success";
}
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
cacheService.push(msgAgreement);
}
return "success";
}
/**
* 根据用户id查找channel
*
* @param name
* @return
*/
public List<Channel> getChannelByName(String name) {
AttributeKey<String> key = AttributeKey.valueOf("user");
return ChannelHandlerPool.channelGroup.stream().filter(channel -> channel.attr(key).get().equals(name))
.collect(Collectors.toList());
}
}
package com.netty.server;
import com.netty.server.server.TcpServer;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author qiding
*/
@SpringBootApplication
@RequiredArgsConstructor
public class NettyServerApplication implements ApplicationRunner {
private final TcpServer tcpServer;
public static void main(String[] args) {
SpringApplication.run(NettyServerApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
tcpServer.start();
}
}
package com.netty.server.channel;
import com.netty.server.handler.MessageHandler;
import com.netty.server.handler.WebsocketMessageHandler;
import com.netty.server.server.CacheService;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Netty 通道初始化
*
* @author qiding
*/
@Component
@RequiredArgsConstructor
public class ChannelInit extends ChannelInitializer<SocketChannel> {
private CacheService cacheService;
private WebsocketMessageHandler websocketHandler;
public ChannelInit(CacheService cacheService, WebsocketMessageHandler websocketHandler) {
this.cacheService = cacheService;
this.websocketHandler = websocketHandler;
}
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
// 心跳时间
.addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
// 对http协议的支持.
.addLast(new HttpServerCodec())
// 对大数据流的支持
.addLast(new ChunkedWriteHandler())
// 聚合 Http 将多个requestLine、requestHeader、messageBody信息转化成单一的request或者response对象
.addLast(new HttpObjectAggregator(8192))
// 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据
.addLast(new WebSocketFrameAggregator(1024 * 62))
// 添加消息处理器
.addLast(new MessageHandler(cacheService, websocketHandler));
// // 添加消息处理器
// .addLast("messageHandler", messageHandler);
}
}
package com.netty.server.config;
import com.netty.server.model.MsgAgreement;
import com.netty.server.utils.MsgUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
/**
* 发布redis消息
*
* @author jie
*/
@Slf4j
@Component
public class MsgPub {
@Autowired
private StringRedisTemplate redisTemplate;
public void pushMessage(String topic, MsgAgreement message) {
log.info("向 "+topic+"发送消息:"+message);
redisTemplate.convertAndSend(topic, MsgUtil.obj2Json(message));
}
}
package com.netty.server.config;
import com.alibaba.fastjson.JSON;
import com.netty.server.model.MsgAgreement;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* redis订阅消息处理实现类
*
* @author jie
*/
@Slf4j
@Component
public class MsgReceiver extends MessageListenerAdapter {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 接收redis推送的消息如果当前服务连接的有此设备就推送消息
*
* @param message 消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = redisTemplate.getStringSerializer().deserialize(message.getBody());
String topic = redisTemplate.getStringSerializer().deserialize(message.getChannel());
log.info("来自" + topic + "的消息:" + msg);
MsgAgreement msgAgreement = JSON.parseObject(msg, MsgAgreement.class);
String toChannelId = msgAgreement.getToChannelId();
Channel channel = CacheUtil.cacheChannel.get(toChannelId);
if (null == channel) {
return;
}
channel.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement)));
}
}
package com.netty.server.config;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
/**
* redisTemplate配置类
* @author jie
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisMessageTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
template.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));
return template;
}
}
package com.netty.server.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* 服务配置
*
* @author qiding
*/
@Configuration
@ConfigurationProperties(prefix = ServerProperties.PREFIX)
@Data
public class ServerProperties {
public static final String PREFIX = "netty.server";
/**
* 服务器ip
*/
private String ip;
/**
* 服务器端口
*/
private Integer port;
/**
* 传输模式linux上开启会有更高的性能
*/
private boolean useEpoll;
}
package com.netty.server.config;
import com.netty.server.utils.NetWorkUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.net.UnknownHostException;
/**
* 用户管道信息;记录某个用户分配到某个服务端
* @author jie
*/
@Configuration
public class SubConfig {
@Value("${netty.port:20000}")
private int port;
/**
* 接受广播消息配置 接受主题格式为:message_pub+本机ip+本程序端口
* @param connectionFactory
* @param msgAgreementListenerAdapter
* @return
* @throws UnknownHostException
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter msgAgreementListenerAdapter) throws UnknownHostException {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(msgAgreementListenerAdapter, new PatternTopic("message_pub"+ NetWorkUtils.getHost()+port));
return container;
}
}
package com.netty.server.handler;
import io.netty.channel.Channel;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class ChannelHandlerPool {
public ChannelHandlerPool() {
}
public static Set<Channel> channelGroup = Collections.synchronizedSet(new HashSet<>());
}
package com.netty.server.handler;
import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.server.CacheService;
import com.netty.server.store.ChannelStore;
import com.netty.server.store.WebSocketSession;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.NetWorkUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 消息处理,单例启动
*
* @author qiding
*/
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private WebsocketMessageHandler websocketHandler;
private CacheService cacheService;
public MessageHandler(CacheService cacheService, WebsocketMessageHandler websocketHandler) {
this.cacheService = cacheService;
this.websocketHandler = new WebsocketMessageHandler(cacheService);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
websocketHandler.handleHttpRequest(ctx, (FullHttpRequest) msg);
log.debug("\n");
log.debug("客户端{}握手成功!", ctx.channel().id());
}
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
log.debug("\n");
log.debug("channelId:" + ctx.channel().id());
websocketHandler.handleWebSocketFrame(ctx, frame);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.debug("\n");
log.debug("断开连接");
cacheService.getRedisUtil().remove(ctx.channel().id().toString());
CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());
// 释放缓存
ChannelStore.closeAndClean(ctx);
WebSocketSession.clear(ctx.channel().id());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("\n");
log.debug("成功建立连接,channelId:{}", ctx.channel().id());
super.channelActive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.debug("心跳事件时触发");
//判断evt是否是IdleStateEvent(用于触发用户事件,包含读空闲/写空闲/读写空闲)
if(evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if(idleStateEvent.state() == IdleState.READER_IDLE){
log.debug("进入读空闲...");
}else if(idleStateEvent.state() == IdleState.WRITER_IDLE){
log.debug("进入写空闲...");
}else if(idleStateEvent.state() == IdleState.ALL_IDLE){
log.debug("进入读写空闲...");
cacheService.getRedisUtil().remove(ctx.channel().id().toString());
CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());
Channel channel = ctx.channel();
//关闭无用channel,避免浪费资源
channel.close();
}
}
}
/**
* 发现异常关闭连接打印日志
*/
// @Override
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// ctx.close();
// cacheService.getRedisUtil().remove(ctx.channel().id().toString());
// CacheUtil.cacheChannel.remove(ctx.channel().id().toString(), ctx.channel());
// log.error("异常信息:\r\n" + cause.getMessage());
// }
}
package com.netty.server.handler;
import com.alibaba.fastjson.JSON;
import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.model.MsgAgreement;
import com.netty.server.server.CacheService;
import com.netty.server.store.WebSocketSession;
import com.netty.server.utils.CacheUtil;
import com.netty.server.utils.MsgUtil;
import com.netty.server.utils.NetWorkUtils;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* Websocket 消息处理器
*
* @author qiding
*/
@Slf4j
@Component
public class WebsocketMessageHandler {
private final String USER = "user";
private final AttributeKey<String> key = AttributeKey.valueOf(USER);
private CacheService cacheService;
public WebsocketMessageHandler(CacheService cacheService) {
this.cacheService = cacheService;
}
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 对webSocket 首次握手进行解析
*/
public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 首次握手进行校验
this.isFullHttpRequest(ctx, request);
// 获取请求uri
String uri = request.uri();
ConcurrentMap<String, String> paramMap = getUrlParams(uri);
System.out.println("接收到的参数是:" + JSON.toJSONString(paramMap));
// 加校验
String relationInfo = paramMap.get("channelId") + "/" + paramMap.get("bussinessId");
online(relationInfo, ctx.channel());
// 参数分别是 (ws地址,子协议,是否扩展,最大frame长度)
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(request), null, true, 5 * 1024 * 1024);
WebSocketServerHandshaker handShaker = factory.newHandshaker(request);
if (handShaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handShaker.handshake(ctx.channel(), request);
}
WebSocketSession.setChannelShaker(ctx.channel().id(), handShaker);
SocketChannel channel = (SocketChannel) ctx.channel();
//保存设备信息
DeviceChannelInfo deviceChannelInfo = DeviceChannelInfo.builder()
.channelId(channel.id().toString())
.ip(NetWorkUtils.getHost())
.port(channel.localAddress().getPort())
.linkDate(new Date())
.relationInfo(relationInfo)
.build();
cacheService.getRedisUtil().pushObj(deviceChannelInfo);
CacheUtil.cacheChannel.put(channel.id().toString(), channel);
}
/**
* 处理消息
*/
public void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 获取webSocket 会话
WebSocketServerHandshaker handShaker = WebSocketSession.getChannelShaker(ctx.channel().id());
// 关闭
if (frame instanceof CloseWebSocketFrame) {
log.debug("收到关闭请求");
handShaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 握手PING/PONG
if (frame instanceof PingWebSocketFrame) {
ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本接收和回复
if (frame instanceof TextWebSocketFrame) {
log.debug("收到消息:\n{}", ((TextWebSocketFrame) frame).text());
//判断接收消息用户是否在本服务端
Channel channel = CacheUtil.cacheChannel.get(ctx.channel().id().toString());
MsgAgreement msgAgreement = new MsgAgreement();
msgAgreement.setToChannelId(ctx.channel().id().toString());
msgAgreement.setContent(((TextWebSocketFrame) frame).text());
if (null != channel) {
channel.writeAndFlush(new TextWebSocketFrame(MsgUtil.obj2Json(msgAgreement)));
return;
}
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
cacheService.push(msgAgreement);
return;
}
// 二进制文本
if (frame instanceof BinaryWebSocketFrame) {
ctx.writeAndFlush(frame.retain());
}
}
/**
* 根据用户id查找channel
*
* @param name
* @return
*/
public List<Channel> getChannelByName(String name) {
return ChannelHandlerPool.channelGroup.stream().filter(channel -> channel.attr(key).get().equals(name))
.collect(Collectors.toList());
}
/**
* 判断是否是正确的websocket 握手协议
*/
private void isFullHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (!request.decoderResult().isSuccess()) {
log.error("非webSocket请求");
this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer()));
ctx.close();
return;
}
if (!HttpMethod.GET.equals(request.method())) {
log.error("非GET请求");
this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
ctx.close();
}
}
/**
* SSL支持采用wss:
*/
private String getWebSocketLocation(FullHttpRequest request) {
return "ws://" + request.headers().get(HttpHeaderNames.HOST) + "/websocket";
}
/**
* http 握手通用响应
*/
private void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) {
HttpResponseStatus status = resp.status();
if (status != HttpResponseStatus.OK) {
ByteBufUtil.writeUtf8(resp.content(), status.toString());
HttpUtil.setContentLength(req, resp.content().readableBytes());
}
boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK;
HttpUtil.setKeepAlive(req, keepAlive);
ChannelFuture future = ctx.write(resp);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 上线一个用户
*
* @param channel
* @param token
*/
private void online(String token, Channel channel) {
// 保存channel通道的附带信息,以用户的uid为标识
// channel.attr(key).set(token);
// ChannelHandlerPool.channelGroup.add(channel);
cacheService.getRedisUtil().pushChannelRelation(channel.id().toString(), token);
}
private static ConcurrentMap<String, String> getUrlParams(String url) {
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
url = url.replace("?", ";");
if (!url.contains(";")) {
return map;
}
if (url.split(";").length > 0) {
String[] arr = url.split(";")[1].split("&");
for (String s : arr) {
String key = s.split("=")[0];
String value = s.split("=")[1];
map.put(key, value);
}
return map;
} else {
return map;
}
}
}
package com.netty.server.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 用户管道信息;记录某个用户分配到某个服务端
*
* @author jie
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DeviceChannelInfo {
/**
* 服务端IP
*/
private String ip;
/**
* 服务端port
*/
private int port;
/**
* channelId
*/
private String channelId;
/**
* 链接时间
*/
private Date linkDate;
/**
* 关联信息
*/
private String relationInfo;
}
package com.netty.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 消息协议
*
* @author jie
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MsgAgreement {
/**
* 发送至channelId
*/
private String toChannelId;
/**
* 消息内容
*/
private String content;
}
package com.netty.server.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* 服务端信息
*
* @author jie
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerInfo {
/**
* IP
*/
private String ip;
/**
* 端口
*/
private int port;
/**
* 启动时间
*/
private Date openDate;
}
package com.netty.server.server;
import com.netty.server.config.MsgPub;
import com.netty.server.model.DeviceChannelInfo;
import com.netty.server.model.MsgAgreement;
import com.netty.server.utils.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author jie
*/
@Service
public class CacheService {
@Autowired
private MsgPub msgPub;
@Autowired
private RedisUtil redisUtil;
/**
* 将通道数据广播给指定的主题 主题格式为:message_pub+节点ip+节点端口
* @param msgAgreement
*/
public void push(MsgAgreement msgAgreement) {
DeviceChannelInfo deviceChannelInfo = redisUtil.selectByChannel(msgAgreement.getToChannelId());
if (deviceChannelInfo == null) {
return;
}
msgPub.pushMessage("message_pub"+deviceChannelInfo.getIp()+deviceChannelInfo.getPort(), msgAgreement);
}
public RedisUtil getRedisUtil() {
return redisUtil;
}
}
package com.netty.server.server;
import javax.annotation.PreDestroy;
/**
* @author qiding
*/
public interface ITcpServer {
/**
* 主启动程序,初始化参数
*
* @throws Exception 初始化异常
*/
void start() throws Exception;
/**
* 优雅的结束服务器
*
* @throws InterruptedException 提前中断异常
*/
@PreDestroy
void destroy() throws InterruptedException;
}
package com.netty.server.server;
import com.alibaba.fastjson.JSON;
import com.netty.server.channel.ChannelInit;
import com.netty.server.config.ServerProperties;
import com.netty.server.handler.WebsocketMessageHandler;
import com.netty.server.model.ServerInfo;
import com.netty.server.utils.CacheUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 启动 Server
*
* @author qiding
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class TcpServer implements ITcpServer {
private final ChannelInit channelInit;
private final ServerProperties serverProperties;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel channel;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private CacheService cacheService;
@Autowired
private WebsocketMessageHandler websocketHandler;
@Override
public void start() throws Exception {
log.info("初始化 TCP server ...");
bossGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
workerGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
this.tcpServer();
}
/**
* 初始化
*/
private void tcpServer() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(serverProperties.getPort()))
// 配置 编码器、解码器、业务处理
.childHandler(new ChannelInit(cacheService, websocketHandler))
// tcp缓冲区
.option(ChannelOption.SO_BACKLOG, 128)
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
.childOption(ChannelOption.TCP_NODELAY, false)
// 保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(serverProperties.getPort())).syncUninterruptibly();
this.channel = channelFuture.channel();
log.info("websocket server启动成功!开始监听端口:{}", serverProperties.getPort());
String ip = InetAddress.getLocalHost().getHostAddress();
Date date = new Date();
//每3秒向注册中心注册一下自己的服务端信息 如果5秒没有注册redis便清除此服务端信息
CacheUtil.executorService.submit(() -> {
try {
while (channel.isActive()) {
redisTemplate.opsForValue().set("newnettyServer" + ip, JSON.toJSONString(new ServerInfo(ip, serverProperties.getPort(), date)), 5 * 1000, TimeUnit.MILLISECONDS);
Thread.sleep(3 * 1000);
}
} catch (Exception e) {
log.error(e.getMessage());
}
});
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 销毁
*/
@PreDestroy
@Override
public void destroy() {
if (null == channel) {
return;
}
channel.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
package com.netty.server.store;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 频道信息存储
* <p>
* 封装netty的频道存储,客户端id和频道双向绑定
*
* @author qiding
*/
@Slf4j
public class ChannelStore {
/**
* 频道绑定 key
*/
private final static AttributeKey<Object> CLIENT_ID = AttributeKey.valueOf("clientId");
/**
* 客户端和频道绑定
*/
private final static ConcurrentHashMap<String, ChannelId> CLIENT_CHANNEL_MAP = new ConcurrentHashMap<>(16);
/**
* 存储频道
*/
public final static ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 重入锁
*/
private static final Lock LOCK = new ReentrantLock();
/**
* 获取单机连接数量
*/
public static int getLocalConnectCount() {
return CHANNEL_GROUP.size();
}
/**
* 获取绑定的通道数量(测试用)
*/
public static int getBindCount() {
return CLIENT_CHANNEL_MAP.size();
}
/**
* 绑定频道和客户端id
*
* @param ctx 连接频道
* @param clientId 用户id
*/
public static void bind(ChannelHandlerContext ctx, String clientId) {
LOCK.lock();
try {
// 释放旧的连接
closeAndClean(clientId);
// 绑定客户端id到频道上
ctx.channel().attr(CLIENT_ID).set(clientId);
// 双向保存客户端id和频道
CLIENT_CHANNEL_MAP.put(clientId, ctx.channel().id());
// 保存频道
CHANNEL_GROUP.add(ctx.channel());
} finally {
LOCK.unlock();
}
}
/**
* 是否已登录
*/
public static boolean isAuth(ChannelHandlerContext ctx) {
return !StringUtil.isNullOrEmpty(getClientId(ctx));
}
/**
* 获取客户端id
*
* @param ctx 连接频道
*/
public static String getClientId(ChannelHandlerContext ctx) {
return ctx.channel().hasAttr(CLIENT_ID) ? (String) ctx.channel().attr(CLIENT_ID).get() : "";
}
/**
* 获取频道
*
* @param clientId 客户端id
*/
public static Channel getChannel(String clientId) {
return Optional.of(CLIENT_CHANNEL_MAP.containsKey(clientId))
.filter(Boolean::booleanValue)
.map(b -> CLIENT_CHANNEL_MAP.get(clientId))
.map(CHANNEL_GROUP::find)
.orElse(null);
}
/**
* 释放连接和资源
* CLIENT_CHANNEL_MAP 需要释放
* CHANNEL_GROUP 不需要释放,netty会自动帮我们移除
*
* @param clientId 客户端id
*/
public static void closeAndClean(String clientId) {
// 清除绑定关系
Optional.of(CLIENT_CHANNEL_MAP.containsKey(clientId))
.filter(Boolean::booleanValue)
.ifPresent(oldChannel -> CLIENT_CHANNEL_MAP.remove(clientId));
// 若存在旧连接,则关闭旧连接,相同clientId,不允许重复连接
Optional.ofNullable(getChannel(clientId))
.ifPresent(ChannelOutboundInvoker::close);
}
public static void closeAndClean(ChannelHandlerContext ctx) {
closeAndClean(getClientId(ctx));
}
}
package com.netty.server.store;
import io.netty.channel.ChannelId;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import java.util.HashMap;
/**
* 管理websocket握手会话
*
* @author qiding
*/
public class WebSocketSession {
private final static HashMap<ChannelId, WebSocketServerHandshaker> CHANNEL_SHAKER = new HashMap<>();
/**
* 添加
*/
public static void setChannelShaker(ChannelId channelId, WebSocketServerHandshaker handShaker) {
CHANNEL_SHAKER.put(channelId, handShaker);
}
/**
* 获取
*/
public static WebSocketServerHandshaker getChannelShaker(ChannelId channelId) {
return CHANNEL_SHAKER.get(channelId);
}
/**
* 释放
*/
public static void clear(ChannelId channelId) {
CHANNEL_SHAKER.remove(channelId);
}
}
package com.netty.server.utils;
import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author jie
*/
public class CacheUtil {
/**
* 线程池
*/
public static ExecutorService executorService = Executors.newFixedThreadPool(3);
/**
* 缓存channel
*/
public static Map<String, Channel> cacheChannel = new ConcurrentHashMap(new HashMap<>());
}
package com.netty.server.utils;
import com.alibaba.fastjson.JSON;
import com.netty.server.model.MsgAgreement;
/**
* 构建消息
*
* @author jie
*/
public class MsgUtil {
public static MsgAgreement buildMsg(String channelId, String content) {
return new MsgAgreement(channelId, content);
}
public static MsgAgreement json2Obj(String objJsonStr) {
objJsonStr = objJsonStr.replace("\r\n", "");
return JSON.parseObject(objJsonStr, MsgAgreement.class);
}
public static String obj2Json(MsgAgreement msgAgreement) {
return JSON.toJSONString(msgAgreement) + "\r\n";
}
}
package com.netty.server.utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
/**
* @author jie
*/
public class NetWorkUtils {
/**
* 获取从1100开始的没有被占用的ip
* @return
*/
public static int getPort() {
int initPort = 1100;
while (true) {
if (!isPortUsing(initPort)) {
break;
}
initPort++;
}
return initPort;
}
/**
* 检测端口是否被占用
* @param port
* @return
*/
public static boolean isPortUsing(int port) {
boolean flag = false;
try {
Socket socket = new Socket("localhost", port);
socket.close();
flag = true;
} catch (IOException e) {
}
return flag;
}
/**
* 杀掉端口进程
* @param port
*/
public static void killPort(int port){
try {
Runtime.getRuntime().exec("taskkill /F /pid " + port + "");
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取本机ip
* @return
* @throws UnknownHostException
*/
public static String getHost() throws UnknownHostException {
return InetAddress.getLocalHost().getHostAddress();
}
}
package com.netty.server.utils;
import com.alibaba.fastjson.JSON;
import com.netty.server.model.DeviceChannelInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @author jie
*/
@Service
public class RedisUtil {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 向redis存入设备连接信息
*
* @param deviceChannelInfo
*/
public void pushObj(DeviceChannelInfo deviceChannelInfo) {
redisTemplate.opsForHash().put("newdeviceIds", deviceChannelInfo.getChannelId(), JSON.toJSONString(deviceChannelInfo));
}
public void pushChannelRelation(String channelId, String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo);
String channelIds = "";
if (null != channelIdObj) {
channelIds = channelIdObj.toString() + ",";
}
channelIds = channelIds + channelId;
redisTemplate.opsForHash().put("channelRelations", relationInfo, channelIds);
}
public String getChannelRelation(String relationInfo){
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo);
if (null != channelIdObj) {
return channelIdObj.toString();
}
return null;
}
/**
* 查询redis中的设备连接信息
*
* @return
*/
public List<DeviceChannelInfo> popList() {
List<Object> values = redisTemplate.opsForHash().values("newdeviceIds");
if (null == values) {
return new ArrayList<>();
}
List<DeviceChannelInfo> deviceChannelInfoList = new ArrayList<>();
for (Object strJson : values) {
deviceChannelInfoList.add(JSON.parseObject(strJson.toString(), DeviceChannelInfo.class));
}
return deviceChannelInfoList;
}
/**
* 根据channelId查询连接信息
*
* @param channelId
* @return
*/
public DeviceChannelInfo selectByChannel(String channelId) {
Object deviceIds = redisTemplate.opsForHash().get("newdeviceIds", channelId);
if (deviceIds == null) {
return null;
}
return JSON.parseObject(deviceIds.toString(), DeviceChannelInfo.class);
}
/**
* 移除某个设备信息
*
* @param channelId
*/
public void remove(String channelId) {
DeviceChannelInfo deviceChannelInfo = selectByChannel(channelId);
if (deviceChannelInfo != null) {
removeChannelRelations(channelId, deviceChannelInfo.getRelationInfo());
}
redisTemplate.opsForHash().delete("newdeviceIds", channelId);
}
private void removeChannelRelations(String channelId, String relationInfo) {
Object channelIdObj = redisTemplate.opsForHash().get("channelRelations", relationInfo);
if (null != channelIdObj) {
String channelIds = "";
List<String> channelIdlist = new ArrayList<>(Arrays.asList(channelIdObj.toString().split(",")));
channelIdlist.remove(channelId);
if (channelIdlist.size() == 0) {
redisTemplate.opsForHash().delete("channelRelations", relationInfo);
} else {
channelIds = String.join(",", channelIdlist);
redisTemplate.opsForHash().put("channelRelations", relationInfo, channelIds);
}
}
}
/**
* 清空设备信息
*/
public void clear() {
redisTemplate.delete("newdeviceIds");
}
}
{
"properties": [
{
"name": "netty.server.host",
"type": "java.lang.String",
"description": "监听的ip."
},
{
"name": "netty.server.port",
"type": "java.lang.String",
"description": "监听的端口."
},
{
"name": "netty.server.use-epoll",
"type": "java.lang.String",
"description": "传输模式linux上开启会有更高的性能."
}
] }
\ No newline at end of file
spring:
application:
name: tcp-server
redis:
host: 172.16.4.6
port: 30835
password:
server:
port: 9999
# tcp
netty:
server:
host: 127.0.0.1
port: 20000
use-epoll: false
# 日记配置
logging:
level:
# 开启debug日记打印
com.netty: debug
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment