Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Q
qg-push
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
head_group
qg-push
Commits
abb334a0
Commit
abb334a0
authored
Sep 14, 2022
by
李健华
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
增加请求stms 验证token并获取对应的供应商code
parent
240d7578
Changes
19
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
571 additions
and
43 deletions
+571
-43
.gitignore
.gitignore
+2
-0
SendSocketMessageController.java
.../netty/server/Controller/SendSocketMessageController.java
+3
-4
NettyServerApplication.java
src/main/java/com/netty/server/NettyServerApplication.java
+6
-0
ChannelInit.java
src/main/java/com/netty/server/channel/ChannelInit.java
+7
-6
FeignConfig.java
src/main/java/com/netty/server/config/FeignConfig.java
+79
-0
MessageConverter.java
src/main/java/com/netty/server/config/MessageConverter.java
+23
-0
MyEncoder.java
src/main/java/com/netty/server/config/MyEncoder.java
+44
-0
ResTemplateConfig.java
src/main/java/com/netty/server/config/ResTemplateConfig.java
+23
-0
MessageHandler.java
src/main/java/com/netty/server/handler/MessageHandler.java
+4
-12
WebsocketMessageHandler.java
...ava/com/netty/server/handler/WebsocketMessageHandler.java
+31
-15
StmsRe.java
src/main/java/com/netty/server/model/StmsRe.java
+23
-0
Result.java
src/main/java/com/netty/server/response/Result.java
+135
-0
ResultCode.java
src/main/java/com/netty/server/response/ResultCode.java
+42
-0
IStmsServer.java
src/main/java/com/netty/server/server/IStmsServer.java
+14
-0
StmsRemoteService.java
src/main/java/com/netty/server/server/StmsRemoteService.java
+14
-0
TcpServer.java
src/main/java/com/netty/server/server/TcpServer.java
+4
-1
StmsServerImpl.java
...ain/java/com/netty/server/server/impl/StmsServerImpl.java
+42
-0
Md5Utils.java
src/main/java/com/netty/server/utils/Md5Utils.java
+70
-0
RedisUtil.java
src/main/java/com/netty/server/utils/RedisUtil.java
+5
-5
No files found.
.gitignore
View file @
abb334a0
...
@@ -35,3 +35,5 @@ build/
...
@@ -35,3 +35,5 @@ build/
#log
#log
*.log
*.log
*.gz
config-cache/*
src/main/java/com/netty/server/Controller/SendSocketMessageController.java
View file @
abb334a0
...
@@ -4,7 +4,6 @@ import com.netty.server.handler.ChannelHandlerPool;
...
@@ -4,7 +4,6 @@ import com.netty.server.handler.ChannelHandlerPool;
import
com.netty.server.model.MsgAgreement
;
import
com.netty.server.model.MsgAgreement
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.MsgUtil
;
import
io.netty.channel.Channel
;
import
io.netty.channel.Channel
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
io.netty.util.AttributeKey
;
import
io.netty.util.AttributeKey
;
...
@@ -24,10 +23,10 @@ public class SendSocketMessageController {
...
@@ -24,10 +23,10 @@ public class SendSocketMessageController {
private
CacheService
cacheService
;
private
CacheService
cacheService
;
@PostMapping
(
"/send"
)
@PostMapping
(
"/send"
)
public
String
send2User
(
@RequestParam
(
value
=
"
token"
)
String
token
,
@RequestParam
(
value
=
"data"
)
String
data
)
{
public
String
send2User
(
@RequestParam
(
value
=
"
relationKey"
)
String
relationKey
,
@RequestParam
(
value
=
"data"
)
String
data
)
{
String
channelIds
=
cacheService
.
getRedisUtil
().
getChannelRelation
(
token
);
String
channelIds
=
cacheService
.
getRedisUtil
().
getChannelRelation
(
relationKey
);
if
(
null
==
channelIds
)
{
if
(
null
==
channelIds
)
{
return
"商户"
+
token
+
"不在线!"
;
return
"商户"
+
relationKey
+
"不在线!"
;
}
}
String
[]
channelIdlist
=
channelIds
.
split
(
","
);
String
[]
channelIdlist
=
channelIds
.
split
(
","
);
for
(
String
channelId
:
channelIdlist
)
{
for
(
String
channelId
:
channelIdlist
)
{
...
...
src/main/java/com/netty/server/NettyServerApplication.java
View file @
abb334a0
...
@@ -9,11 +9,15 @@ import org.springframework.boot.ApplicationRunner;
...
@@ -9,11 +9,15 @@ import org.springframework.boot.ApplicationRunner;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.cloud.client.discovery.EnableDiscoveryClient
;
import
org.springframework.cloud.client.discovery.EnableDiscoveryClient
;
import
org.springframework.cloud.openfeign.EnableFeignClients
;
import
org.springframework.context.annotation.ComponentScan
;
/**
/**
* @author qiding
* @author qiding
*/
*/
@ComponentScan
(
basePackages
=
{
"com.netty.server.*"
})
@EnableFeignClients
(
basePackages
=
{
"com.netty.*"
})
@SpringBootApplication
@SpringBootApplication
@RequiredArgsConstructor
@RequiredArgsConstructor
@EnableApolloConfig
@EnableApolloConfig
...
@@ -22,6 +26,8 @@ public class NettyServerApplication implements ApplicationRunner {
...
@@ -22,6 +26,8 @@ public class NettyServerApplication implements ApplicationRunner {
private
final
TcpServer
tcpServer
;
private
final
TcpServer
tcpServer
;
public
static
void
main
(
String
[]
args
)
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
NettyServerApplication
.
class
,
args
);
SpringApplication
.
run
(
NettyServerApplication
.
class
,
args
);
}
}
...
...
src/main/java/com/netty/server/channel/ChannelInit.java
View file @
abb334a0
...
@@ -3,18 +3,15 @@ package com.netty.server.channel;
...
@@ -3,18 +3,15 @@ package com.netty.server.channel;
import
com.netty.server.handler.MessageHandler
;
import
com.netty.server.handler.MessageHandler
;
import
com.netty.server.handler.WebsocketMessageHandler
;
import
com.netty.server.handler.WebsocketMessageHandler
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.IStmsServer
;
import
io.netty.channel.ChannelInitializer
;
import
io.netty.channel.ChannelInitializer
;
import
io.netty.channel.socket.SocketChannel
;
import
io.netty.channel.socket.SocketChannel
;
import
io.netty.handler.codec.http.HttpObjectAggregator
;
import
io.netty.handler.codec.http.HttpObjectAggregator
;
import
io.netty.handler.codec.http.HttpServerCodec
;
import
io.netty.handler.codec.http.HttpServerCodec
;
import
io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator
;
import
io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator
;
import
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
;
import
io.netty.handler.codec.string.StringDecoder
;
import
io.netty.handler.codec.string.StringEncoder
;
import
io.netty.handler.stream.ChunkedWriteHandler
;
import
io.netty.handler.stream.ChunkedWriteHandler
;
import
io.netty.handler.timeout.IdleStateHandler
;
import
io.netty.handler.timeout.IdleStateHandler
;
import
lombok.RequiredArgsConstructor
;
import
lombok.RequiredArgsConstructor
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeUnit
;
...
@@ -34,11 +31,14 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
...
@@ -34,11 +31,14 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
private
CacheService
cacheService
;
private
CacheService
cacheService
;
private
WebsocketMessageHandler
websocketHandler
;
private
WebsocketMessageHandler
websocketHandler
;
private
IStmsServer
stmsServer
;
public
ChannelInit
(
CacheService
cacheService
,
WebsocketMessageHandler
websocketHandler
,
Long
nettyAllIdleTime
)
{
public
ChannelInit
(
CacheService
cacheService
,
WebsocketMessageHandler
websocketHandler
,
Long
nettyAllIdleTime
,
IStmsServer
stmsServer
)
{
this
.
cacheService
=
cacheService
;
this
.
cacheService
=
cacheService
;
this
.
websocketHandler
=
websocketHandler
;
this
.
websocketHandler
=
websocketHandler
;
this
.
nettyAllIdleTime
=
nettyAllIdleTime
;
this
.
nettyAllIdleTime
=
nettyAllIdleTime
;
this
.
stmsServer
=
stmsServer
;
}
}
@Override
@Override
...
@@ -55,7 +55,8 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
...
@@ -55,7 +55,8 @@ public class ChannelInit extends ChannelInitializer<SocketChannel> {
// 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据
// 聚合 websocket 的数据帧,因为客户端可能分段向服务器端发送数据
.
addLast
(
new
WebSocketFrameAggregator
(
1024
*
62
))
.
addLast
(
new
WebSocketFrameAggregator
(
1024
*
62
))
// 添加消息处理器
// 添加消息处理器
.
addLast
(
new
MessageHandler
(
cacheService
,
websocketHandler
));
.
addLast
(
new
MessageHandler
(
cacheService
,
websocketHandler
,
stmsServer
));
// // 添加消息处理器
// // 添加消息处理器
// .addLast("messageHandler", messageHandler);
// .addLast("messageHandler", messageHandler);
}
}
...
...
src/main/java/com/netty/server/config/FeignConfig.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
config
;
import
com.google.common.collect.Lists
;
import
feign.Contract
;
import
feign.Logger
;
import
feign.codec.Decoder
;
import
feign.codec.Encoder
;
import
org.springframework.beans.factory.ObjectFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.http.HttpMessageConverters
;
import
org.springframework.cloud.openfeign.support.SpringDecoder
;
import
org.springframework.cloud.openfeign.support.SpringEncoder
;
import
org.springframework.cloud.openfeign.support.SpringMvcContract
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.http.converter.HttpMessageConverter
;
import
javax.annotation.PostConstruct
;
import
java.util.List
;
@Configuration
public
class
FeignConfig
{
// @Bean
// public Encoder encoder(){
// return new MyEncoder();
// }
List
<
HttpMessageConverter
<?>>
converters
=
Lists
.
newArrayList
();
@PostConstruct
public
void
init
(){
MessageConverter
wxConverter
=
new
MessageConverter
();
HttpMessageConverters
httpMessageConverters
=
new
HttpMessageConverters
(
wxConverter
);
this
.
converters
=
httpMessageConverters
.
getConverters
();
}
@Bean
Logger
.
Level
feignLevel
()
{
return
Logger
.
Level
.
FULL
;
}
@Bean
public
Contract
contract
()
{
return
new
SpringMvcContract
();
}
@Bean
public
Encoder
springEncoder
(
@Autowired
ObjectFactory
<
HttpMessageConverters
>
messageConverters
)
{
return
new
SpringEncoder
(
messageConverters
);
}
/**
* 注入新的Decoder Feign将自动 替换解决微信返回参数为[text/plain] 无法转化为json
* 微信虽然接口返回为JSON格式数据但却将数据表示为了[text/plain]导致Feign没有采用JSON解析器来解析,从而无法将响应数据转化为对应的POJO对象;
*/
@Bean
public
Decoder
feignDecoder
()
{
MessageConverter
wxConverter
=
new
MessageConverter
();
ObjectFactory
<
HttpMessageConverters
>
objectFactory
=
()
->
new
HttpMessageConverters
(
wxConverter
);
return
new
SpringDecoder
(
objectFactory
);
}
// @Bean
// public Decoder feignDecoder() {
//
// /**
// * 改动原因: 压测时,feign 会有性能问题
// *
// * addDefaultConverters 为true, feign 执行decoder 每次都会重新初始化 默认的 decoder
// * 所以:提前把 自定义的convert 和 默认的convert 处理好后,不再处理默认添加
// *
// * https://blog.csdn.net/weixin_35762553/article/details/112087522
// */
// ObjectFactory objectFactory = () -> new HttpMessageConverters(false, converters);
// return new ResponseEntityDecoder(new SpringDecoder(objectFactory));
// }
//
}
\ No newline at end of file
src/main/java/com/netty/server/config/MessageConverter.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
config
;
import
org.springframework.http.MediaType
;
import
org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
;
import
java.util.ArrayList
;
import
java.util.List
;
/**
* 自定义解析器,支持微信返回text/plain数据
*
* @author axq
*/
public
class
MessageConverter
extends
MappingJackson2HttpMessageConverter
{
public
MessageConverter
()
{
List
<
MediaType
>
mediaTypes
=
new
ArrayList
<>();
mediaTypes
.
add
(
MediaType
.
TEXT_PLAIN
);
mediaTypes
.
add
(
MediaType
.
APPLICATION_JSON
);
mediaTypes
.
add
(
MediaType
.
APPLICATION_JSON_UTF8
);
mediaTypes
.
add
(
MediaType
.
APPLICATION_FORM_URLENCODED
);
setSupportedMediaTypes
(
mediaTypes
);
}
}
src/main/java/com/netty/server/config/MyEncoder.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
config
;
import
com.alibaba.fastjson.JSONObject
;
import
feign.RequestTemplate
;
import
feign.codec.EncodeException
;
import
feign.codec.Encoder
;
import
org.springframework.http.MediaType
;
import
java.io.UnsupportedEncodingException
;
import
java.lang.reflect.Field
;
import
java.lang.reflect.Type
;
import
java.net.URLEncoder
;
public
class
MyEncoder
implements
Encoder
{
@Override
public
void
encode
(
Object
o
,
Type
type
,
RequestTemplate
rt
)
throws
EncodeException
{
StringBuffer
sb
=
new
StringBuffer
();
try
{
Class
clazz
=
Thread
.
currentThread
().
getContextClassLoader
().
loadClass
(
type
.
getTypeName
());
Field
[]
fields
=
clazz
.
getDeclaredFields
();
String
oStr
=
JSONObject
.
toJSONString
(
o
);
for
(
Field
field
:
fields
){
if
(
sb
.
length
()
>
0
){
sb
.
append
(
"&"
);
}
field
.
setAccessible
(
true
);
Object
fieldValue
=
field
.
get
(
JSONObject
.
parseObject
(
oStr
,
clazz
));
if
(
fieldValue
!=
null
){
sb
.
append
(
URLEncoder
.
encode
(
field
.
getName
(),
"UTF-8"
))
.
append
(
"="
)
.
append
(
URLEncoder
.
encode
(
field
.
get
(
JSONObject
.
parseObject
(
oStr
,
clazz
)).
toString
()));
}
}
}
catch
(
ClassNotFoundException
e
)
{
e
.
printStackTrace
();
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
}
catch
(
IllegalAccessException
e
)
{
e
.
printStackTrace
();
}
rt
.
header
(
"Content-Type"
,
MediaType
.
APPLICATION_FORM_URLENCODED_VALUE
);
rt
.
body
(
sb
.
toString
());
}
}
\ No newline at end of file
src/main/java/com/netty/server/config/ResTemplateConfig.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
config
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.web.client.RestTemplateBuilder
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.web.client.RestTemplate
;
/**
*/
@Configuration
public
class
ResTemplateConfig
{
@Autowired
private
RestTemplateBuilder
builder
;
@Bean
public
RestTemplate
restTemplate
()
{
return
builder
.
build
();
}
}
src/main/java/com/netty/server/handler/MessageHandler.java
View file @
abb334a0
package
com
.
netty
.
server
.
handler
;
package
com
.
netty
.
server
.
handler
;
import
com.netty.server.model.DeviceChannelInfo
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.IStmsServer
;
import
com.netty.server.store.ChannelStore
;
import
com.netty.server.store.ChannelStore
;
import
com.netty.server.store.WebSocketSession
;
import
com.netty.server.store.WebSocketSession
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.NetWorkUtils
;
import
io.netty.channel.Channel
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandler
;
import
io.netty.channel.ChannelHandler
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.SimpleChannelInboundHandler
;
import
io.netty.channel.SimpleChannelInboundHandler
;
import
io.netty.channel.socket.SocketChannel
;
import
io.netty.handler.codec.http.FullHttpRequest
;
import
io.netty.handler.codec.http.FullHttpRequest
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
io.netty.handler.codec.http.websocketx.WebSocketFrame
;
import
io.netty.handler.codec.http.websocketx.WebSocketFrame
;
import
io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker
;
import
io.netty.handler.timeout.IdleState
;
import
io.netty.handler.timeout.IdleState
;
import
io.netty.handler.timeout.IdleStateEvent
;
import
io.netty.handler.timeout.IdleStateEvent
;
import
lombok.RequiredArgsConstructor
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
java.util.Collection
;
import
java.util.Date
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
/**
/**
* 消息处理,单例启动
* 消息处理,单例启动
...
@@ -41,9 +32,10 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame>
...
@@ -41,9 +32,10 @@ public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame>
private
WebsocketMessageHandler
websocketHandler
;
private
WebsocketMessageHandler
websocketHandler
;
private
CacheService
cacheService
;
private
CacheService
cacheService
;
public
MessageHandler
(
CacheService
cacheService
,
WebsocketMessageHandler
websocketHandler
)
{
public
MessageHandler
(
CacheService
cacheService
,
WebsocketMessageHandler
websocketHandler
,
IStmsServer
stmsServer
)
{
this
.
cacheService
=
cacheService
;
this
.
cacheService
=
cacheService
;
this
.
websocketHandler
=
new
WebsocketMessageHandler
(
cacheService
);
this
.
websocketHandler
=
new
WebsocketMessageHandler
(
cacheService
,
stmsServer
);
}
}
@Override
@Override
...
...
src/main/java/com/netty/server/handler/WebsocketMessageHandler.java
View file @
abb334a0
...
@@ -4,9 +4,10 @@ import com.alibaba.fastjson.JSON;
...
@@ -4,9 +4,10 @@ import com.alibaba.fastjson.JSON;
import
com.netty.server.model.DeviceChannelInfo
;
import
com.netty.server.model.DeviceChannelInfo
;
import
com.netty.server.model.MsgAgreement
;
import
com.netty.server.model.MsgAgreement
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.CacheService
;
import
com.netty.server.server.IStmsServer
;
import
com.netty.server.store.WebSocketSession
;
import
com.netty.server.store.WebSocketSession
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.CacheUtil
;
import
com.netty.server.utils.M
sgUtil
;
import
com.netty.server.utils.M
d5Utils
;
import
com.netty.server.utils.NetWorkUtils
;
import
com.netty.server.utils.NetWorkUtils
;
import
io.netty.buffer.ByteBufUtil
;
import
io.netty.buffer.ByteBufUtil
;
import
io.netty.channel.Channel
;
import
io.netty.channel.Channel
;
...
@@ -22,8 +23,6 @@ import org.springframework.beans.factory.annotation.Autowired;
...
@@ -22,8 +23,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
java.util.Collection
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentHashMap
;
...
@@ -42,9 +41,11 @@ public class WebsocketMessageHandler {
...
@@ -42,9 +41,11 @@ public class WebsocketMessageHandler {
private
final
String
USER
=
"user"
;
private
final
String
USER
=
"user"
;
private
final
AttributeKey
<
String
>
key
=
AttributeKey
.
valueOf
(
USER
);
private
final
AttributeKey
<
String
>
key
=
AttributeKey
.
valueOf
(
USER
);
private
CacheService
cacheService
;
private
CacheService
cacheService
;
private
IStmsServer
stmsServer
;
public
WebsocketMessageHandler
(
CacheService
cacheService
)
{
public
WebsocketMessageHandler
(
CacheService
cacheService
,
IStmsServer
stmsServer
)
{
this
.
cacheService
=
cacheService
;
this
.
cacheService
=
cacheService
;
this
.
stmsServer
=
stmsServer
;
}
}
@Autowired
@Autowired
...
@@ -60,11 +61,20 @@ public class WebsocketMessageHandler {
...
@@ -60,11 +61,20 @@ public class WebsocketMessageHandler {
String
uri
=
request
.
uri
();
String
uri
=
request
.
uri
();
ConcurrentMap
<
String
,
String
>
paramMap
=
getUrlParams
(
uri
);
ConcurrentMap
<
String
,
String
>
paramMap
=
getUrlParams
(
uri
);
System
.
out
.
println
(
"接收到的参数是:"
+
JSON
.
toJSONString
(
paramMap
));
System
.
out
.
println
(
"接收到的参数是:"
+
JSON
.
toJSONString
(
paramMap
));
// this.sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer()));
// ctx.close();
// 加校验
// 加校验
String
relationInfo
=
paramMap
.
get
(
"channelId"
)
+
"/"
+
paramMap
.
get
(
"bussinessId"
);
if
(
paramMap
.
getOrDefault
(
"token"
,
""
).
equals
(
""
)
||
paramMap
.
getOrDefault
(
"channelId"
,
""
).
equals
(
""
))
{
online
(
relationInfo
,
ctx
.
channel
());
this
.
sendResponse
(
ctx
,
request
,
new
DefaultFullHttpResponse
(
request
.
protocolVersion
(),
HttpResponseStatus
.
BAD_REQUEST
,
ctx
.
alloc
().
buffer
()));
ctx
.
close
();
}
// String supplierCode = stmsServer.getStmsTokenInfo(paramMap.get("token"));
String
supplierCode
=
"100"
;
if
(
null
==
supplierCode
)
{
this
.
sendResponse
(
ctx
,
request
,
new
DefaultFullHttpResponse
(
request
.
protocolVersion
(),
HttpResponseStatus
.
FORBIDDEN
,
ctx
.
alloc
().
buffer
()));
ctx
.
close
();
}
String
mdString
=
paramMap
.
get
(
"channelId"
)
+
"-"
+
supplierCode
;
String
relationKey
=
Md5Utils
.
MD5Encode
(
mdString
);
online
(
relationKey
,
ctx
.
channel
());
// 参数分别是 (ws地址,子协议,是否扩展,最大frame长度)
// 参数分别是 (ws地址,子协议,是否扩展,最大frame长度)
WebSocketServerHandshakerFactory
factory
=
new
WebSocketServerHandshakerFactory
(
getWebSocketLocation
(
request
),
null
,
true
,
5
*
1024
*
1024
);
WebSocketServerHandshakerFactory
factory
=
new
WebSocketServerHandshakerFactory
(
getWebSocketLocation
(
request
),
null
,
true
,
5
*
1024
*
1024
);
WebSocketServerHandshaker
handShaker
=
factory
.
newHandshaker
(
request
);
WebSocketServerHandshaker
handShaker
=
factory
.
newHandshaker
(
request
);
...
@@ -81,7 +91,7 @@ public class WebsocketMessageHandler {
...
@@ -81,7 +91,7 @@ public class WebsocketMessageHandler {
.
ip
(
NetWorkUtils
.
getHost
())
.
ip
(
NetWorkUtils
.
getHost
())
.
port
(
channel
.
localAddress
().
getPort
())
.
port
(
channel
.
localAddress
().
getPort
())
.
linkDate
(
new
Date
())
.
linkDate
(
new
Date
())
.
relationInfo
(
relation
Info
)
.
relationInfo
(
relation
Key
)
.
build
();
.
build
();
cacheService
.
getRedisUtil
().
pushObj
(
deviceChannelInfo
);
cacheService
.
getRedisUtil
().
pushObj
(
deviceChannelInfo
);
CacheUtil
.
cacheChannel
.
put
(
channel
.
id
().
toString
(),
channel
);
CacheUtil
.
cacheChannel
.
put
(
channel
.
id
().
toString
(),
channel
);
...
@@ -106,14 +116,19 @@ public class WebsocketMessageHandler {
...
@@ -106,14 +116,19 @@ public class WebsocketMessageHandler {
}
}
// 文本接收和回复
// 文本接收和回复
if
(
frame
instanceof
TextWebSocketFrame
)
{
if
(
frame
instanceof
TextWebSocketFrame
)
{
log
.
debug
(
"收到消息:\n{}"
,
((
TextWebSocketFrame
)
frame
).
text
());
String
msgContent
=
((
TextWebSocketFrame
)
frame
).
text
();
if
(
"HeartBeat"
.
equals
(
msgContent
))
{
log
.
debug
(
"心跳消息"
);
return
;
}
log
.
debug
(
"收到消息:\n{}"
,
msgContent
);
//判断接收消息用户是否在本服务端
//判断接收消息用户是否在本服务端
Channel
channel
=
CacheUtil
.
cacheChannel
.
get
(
ctx
.
channel
().
id
().
toString
());
Channel
channel
=
CacheUtil
.
cacheChannel
.
get
(
ctx
.
channel
().
id
().
toString
());
MsgAgreement
msgAgreement
=
new
MsgAgreement
();
MsgAgreement
msgAgreement
=
new
MsgAgreement
();
msgAgreement
.
setToChannelId
(
ctx
.
channel
().
id
().
toString
());
msgAgreement
.
setToChannelId
(
ctx
.
channel
().
id
().
toString
());
msgAgreement
.
setContent
(
((
TextWebSocketFrame
)
frame
).
text
()
);
msgAgreement
.
setContent
(
msgContent
);
if
(
null
!=
channel
)
{
if
(
null
!=
channel
)
{
channel
.
writeAndFlush
(
new
TextWebSocketFrame
(
MsgUtil
.
obj2Json
(
msgAgreement
)
));
channel
.
writeAndFlush
(
new
TextWebSocketFrame
(
msgContent
));
return
;
return
;
}
}
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
//如果为NULL则接收消息的用户不在本服务端,需要push消息给全局
...
@@ -183,13 +198,14 @@ public class WebsocketMessageHandler {
...
@@ -183,13 +198,14 @@ public class WebsocketMessageHandler {
* 上线一个用户
* 上线一个用户
*
*
* @param channel
* @param channel
* @param
token
* @param
relationKey
*/
*/
private
void
online
(
String
token
,
Channel
channel
)
{
private
void
online
(
String
relationKey
,
Channel
channel
)
{
// 保存channel通道的附带信息,以用户的uid为标识
// 保存channel通道的附带信息,以用户的uid为标识
// channel.attr(key).set(token);
// channel.attr(key).set(token);
// ChannelHandlerPool.channelGroup.add(channel);
// ChannelHandlerPool.channelGroup.add(channel);
cacheService
.
getRedisUtil
().
pushChannelRelation
(
channel
.
id
().
toString
(),
token
);
System
.
out
.
println
(
relationKey
);
cacheService
.
getRedisUtil
().
pushChannelRelation
(
channel
.
id
().
toString
(),
relationKey
);
}
}
private
static
ConcurrentMap
<
String
,
String
>
getUrlParams
(
String
url
)
{
private
static
ConcurrentMap
<
String
,
String
>
getUrlParams
(
String
url
)
{
...
...
src/main/java/com/netty/server/model/StmsRe.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
model
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* stms 返回值
*
* @author jie
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public
class
StmsRe
{
private
Integer
code
;
private
String
msg
;
private
String
detail
;
private
Object
data
;
}
src/main/java/com/netty/server/response/Result.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
response
;
import
lombok.Data
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.MDC
;
/**
* API 调用返回的结果对象
*
* @author rong yang
*/
@SuppressWarnings
(
"serial"
)
@Data
public
class
Result
<
T
>
implements
java
.
io
.
Serializable
{
/**
* 0成功,永远默认成功
*/
private
String
code
=
"0000"
;
/**
* 业务错误码
*/
private
String
businessCode
=
ResultCode
.
SUCCESS
.
getCode
();
/**
* 消息
**/
private
String
msg
;
/**
* 详细消息
**/
private
String
detail
;
/**
* 数据
**/
private
T
data
;
private
Boolean
success
;
private
String
traceId
;
/**
* @param resultCode
*/
protected
void
setResultCode
(
ResultCode
resultCode
)
{
this
.
businessCode
=
resultCode
.
getCode
();
this
.
msg
=
resultCode
.
getMsg
();
}
public
Result
()
{
this
.
traceId
=
getTraceId
();
}
public
Result
(
String
code
,
String
businessCode
,
String
msg
,
String
detail
,
T
data
,
boolean
success
)
{
this
.
code
=
code
;
this
.
businessCode
=
businessCode
;
this
.
msg
=
msg
;
this
.
detail
=
detail
;
this
.
data
=
data
;
this
.
success
=
success
;
}
public
static
Result
success
()
{
Result
result
=
new
Result
();
result
.
setResultCode
(
ResultCode
.
SUCCESS
);
result
.
setData
(
new
Object
());
return
result
;
}
public
static
Result
supplyChainSuccess
()
{
Result
result
=
new
Result
();
result
.
setCode
(
ResultCode
.
SUCCESS
.
getCode
());
result
.
setBusinessCode
(
ResultCode
.
SUCCESS
.
getCode
());
result
.
setSuccess
(
null
);
return
result
;
}
public
Result
<
T
>
buildSuccess
(
T
data
)
{
this
.
setResultCode
(
ResultCode
.
SUCCESS
);
this
.
setData
(
data
);
return
this
;
}
public
static
<
T
>
Result
<
T
>
success
(
T
data
)
{
Result
result
=
new
Result
();
result
.
setResultCode
(
ResultCode
.
SUCCESS
);
result
.
setData
(
data
);
return
result
;
}
public
static
Result
failure
()
{
Result
result
=
new
Result
();
result
.
setResultCode
(
ResultCode
.
FAILURE
);
return
result
;
}
public
static
Result
failure
(
String
msg
)
{
Result
result
=
new
Result
();
result
.
setBusinessCode
(
ResultCode
.
FAILURE
.
getCode
());
result
.
setMsg
(
msg
);
return
result
;
}
public
static
Result
failure
(
ResultCode
resultCode
)
{
Result
result
=
new
Result
();
result
.
setResultCode
(
resultCode
);
return
result
;
}
public
static
Result
failure
(
ResultCode
resultCode
,
String
detail
)
{
Result
result
=
new
Result
();
result
.
setResultCode
(
resultCode
);
result
.
setDetail
(
detail
);
return
result
;
}
public
boolean
isSuccess
()
{
return
(
"0000"
.
equals
(
code
)
||
"0"
.
equals
(
code
));
}
public
static
String
getTraceId
(){
String
traceId
=
MDC
.
get
(
"X-B3-TraceId"
);
String
spanId
=
MDC
.
get
(
"X-B3-SpanId"
);
if
(
StringUtils
.
isNotEmpty
(
traceId
)
&&
StringUtils
.
isNotEmpty
(
spanId
))
{
StringBuffer
sb
=
new
StringBuffer
()
.
append
(
"["
).
append
(
traceId
).
append
(
"-"
).
append
(
spanId
).
append
(
"]"
);
return
sb
.
toString
();
}
return
""
;
}
}
src/main/java/com/netty/server/response/ResultCode.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
response
;
import
lombok.extern.slf4j.Slf4j
;
/**
* API调用结果状态码定义
*
* @author rong yang
*/
@Slf4j
public
enum
ResultCode
{
/**
* 响应[消息中心]专用
*/
SUCCESS
(
"0000"
,
"成功"
),
FAILURE
(
"0001"
,
"失败"
),
;
private
String
code
;
private
String
msg
;
private
ResultCode
(
String
code
,
String
msg
)
{
this
.
code
=
code
;
this
.
msg
=
msg
;
}
public
String
getCode
()
{
return
code
;
}
public
String
getMsg
()
{
return
msg
;
}
}
src/main/java/com/netty/server/server/IStmsServer.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
server
;
/**
* smts 接口
*
* @author qiding
*/
public
interface
IStmsServer
{
String
getStmsTokenInfo
(
String
token
);
}
src/main/java/com/netty/server/server/StmsRemoteService.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
server
;
import
feign.Headers
;
import
org.springframework.cloud.openfeign.FeignClient
;
import
org.springframework.web.bind.annotation.PostMapping
;
import
org.springframework.web.bind.annotation.RequestHeader
;
import
com.netty.server.response.Result
;
@FeignClient
(
name
=
"stmsRemoteService"
,
url
=
"${stms.http}"
)
public
interface
StmsRemoteService
{
@Headers
(
"Content-Type:application/json"
)
@PostMapping
(
value
=
"/v2/oauth/currentuserinfo"
)
Result
<
String
>
getCurrentuserinfo
(
@RequestHeader
(
"Access-Token"
)
String
accessToken
);
}
src/main/java/com/netty/server/server/TcpServer.java
View file @
abb334a0
...
@@ -57,6 +57,9 @@ public class TcpServer implements ITcpServer {
...
@@ -57,6 +57,9 @@ public class TcpServer implements ITcpServer {
@Autowired
@Autowired
private
WebsocketMessageHandler
websocketHandler
;
private
WebsocketMessageHandler
websocketHandler
;
@Autowired
private
IStmsServer
stmsServer
;
@Value
(
"${netty.port}"
)
@Value
(
"${netty.port}"
)
private
int
port
;
private
int
port
;
...
@@ -89,7 +92,7 @@ public class TcpServer implements ITcpServer {
...
@@ -89,7 +92,7 @@ public class TcpServer implements ITcpServer {
.
channel
(
serverProperties
.
isUseEpoll
()
?
EpollServerSocketChannel
.
class
:
NioServerSocketChannel
.
class
)
.
channel
(
serverProperties
.
isUseEpoll
()
?
EpollServerSocketChannel
.
class
:
NioServerSocketChannel
.
class
)
.
localAddress
(
new
InetSocketAddress
(
port
))
.
localAddress
(
new
InetSocketAddress
(
port
))
// 配置 编码器、解码器、业务处理
// 配置 编码器、解码器、业务处理
.
childHandler
(
new
ChannelInit
(
cacheService
,
websocketHandler
,
nettyAllIdleTime
))
.
childHandler
(
new
ChannelInit
(
cacheService
,
websocketHandler
,
nettyAllIdleTime
,
stmsServer
))
// tcp缓冲区
// tcp缓冲区
.
option
(
ChannelOption
.
SO_BACKLOG
,
128
)
.
option
(
ChannelOption
.
SO_BACKLOG
,
128
)
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
...
...
src/main/java/com/netty/server/server/impl/StmsServerImpl.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
server
.
impl
;
import
com.alibaba.fastjson.JSONObject
;
import
com.netty.server.response.Result
;
import
com.netty.server.server.IStmsServer
;
import
com.netty.server.server.StmsRemoteService
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Component
;
import
org.springframework.web.client.RestTemplate
;
import
javax.annotation.Resource
;
@Slf4j
@Component
public
class
StmsServerImpl
implements
IStmsServer
{
@Value
(
"${stms.http}"
)
private
String
stms
;
@Resource
private
RestTemplate
restTemplate
;
@Resource
private
StmsRemoteService
stmsRemoteService
;
@Override
public
String
getStmsTokenInfo
(
String
token
)
{
try
{
Result
<
String
>
result
=
stmsRemoteService
.
getCurrentuserinfo
(
token
);
if
(
result
.
isSuccess
())
{
JSONObject
jsonData
=
JSONObject
.
parseObject
(
result
.
getData
().
toString
());
return
jsonData
.
getString
(
"supplierCode"
);
}
}
catch
(
Exception
e
)
{
log
.
error
(
"Exception--{}"
,
e
.
getMessage
());
}
return
null
;
}
}
src/main/java/com/netty/server/utils/Md5Utils.java
0 → 100644
View file @
abb334a0
package
com
.
netty
.
server
.
utils
;
import
lombok.extern.slf4j.Slf4j
;
import
java.security.MessageDigest
;
import
java.security.NoSuchAlgorithmException
;
/**
* Created by tums on 2015/11/30.
*/
@Slf4j
public
final
class
Md5Utils
{
public
static
String
build
(
String
content
)
{
MessageDigest
messageDigest
;
try
{
messageDigest
=
MessageDigest
.
getInstance
(
"md5"
);
}
catch
(
NoSuchAlgorithmException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
return
null
;
}
messageDigest
.
update
(
content
.
getBytes
());
byte
[]
domain
=
messageDigest
.
digest
();
StringBuilder
md5StrBuff
=
new
StringBuilder
();
// converting domain to String
for
(
int
i
=
0
;
i
<
domain
.
length
;
i
++)
{
if
(
Integer
.
toHexString
(
0xFF
&
domain
[
i
]).
length
()
==
1
)
{
md5StrBuff
.
append
(
"0"
).
append
(
Integer
.
toHexString
(
0xFF
&
domain
[
i
]));
}
else
{
md5StrBuff
.
append
(
Integer
.
toHexString
(
0xFF
&
domain
[
i
]));
}
}
return
md5StrBuff
.
toString
();
}
public
static
String
MD5Encode
(
String
origin
)
{
String
ret
=
null
;
try
{
ret
=
new
String
(
origin
);
MessageDigest
md
=
MessageDigest
.
getInstance
(
"MD5"
);
ret
=
byteArrayToHexString
(
md
.
digest
(
ret
.
getBytes
()));
}
catch
(
Exception
e
)
{
}
return
ret
;
}
private
final
static
String
[]
hexDigits
=
{
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
};
private
static
String
byteArrayToHexString
(
byte
[]
bytes
)
{
StringBuffer
sb
=
new
StringBuffer
();
for
(
byte
b
:
bytes
)
{
sb
.
append
(
byteToHexString
(
b
));
}
return
sb
.
toString
();
}
private
static
String
byteToHexString
(
byte
b
)
{
int
n
=
b
;
if
(
n
<
0
)
n
=
256
+
n
;
int
d1
=
n
/
16
;
int
d2
=
n
%
16
;
return
hexDigits
[
d1
]
+
hexDigits
[
d2
];
}
}
src/main/java/com/netty/server/utils/RedisUtil.java
View file @
abb334a0
...
@@ -36,18 +36,18 @@ public class RedisUtil {
...
@@ -36,18 +36,18 @@ public class RedisUtil {
redisTemplate
.
opsForHash
().
put
(
nettyDeviceIds
,
deviceChannelInfo
.
getChannelId
(),
JSON
.
toJSONString
(
deviceChannelInfo
));
redisTemplate
.
opsForHash
().
put
(
nettyDeviceIds
,
deviceChannelInfo
.
getChannelId
(),
JSON
.
toJSONString
(
deviceChannelInfo
));
}
}
public
void
pushChannelRelation
(
String
channelId
,
String
relation
Info
){
public
void
pushChannelRelation
(
String
channelId
,
String
relation
Key
){
Object
channelIdObj
=
redisTemplate
.
opsForHash
().
get
(
nettyChannelRelations
,
relation
Info
);
Object
channelIdObj
=
redisTemplate
.
opsForHash
().
get
(
nettyChannelRelations
,
relation
Key
);
String
channelIds
=
""
;
String
channelIds
=
""
;
if
(
null
!=
channelIdObj
)
{
if
(
null
!=
channelIdObj
)
{
channelIds
=
channelIdObj
.
toString
()
+
","
;
channelIds
=
channelIdObj
.
toString
()
+
","
;
}
}
channelIds
=
channelIds
+
channelId
;
channelIds
=
channelIds
+
channelId
;
redisTemplate
.
opsForHash
().
put
(
nettyChannelRelations
,
relation
Info
,
channelIds
);
redisTemplate
.
opsForHash
().
put
(
nettyChannelRelations
,
relation
Key
,
channelIds
);
}
}
public
String
getChannelRelation
(
String
relation
Info
){
public
String
getChannelRelation
(
String
relation
Key
){
Object
channelIdObj
=
redisTemplate
.
opsForHash
().
get
(
nettyChannelRelations
,
relation
Info
);
Object
channelIdObj
=
redisTemplate
.
opsForHash
().
get
(
nettyChannelRelations
,
relation
Key
);
if
(
null
!=
channelIdObj
)
{
if
(
null
!=
channelIdObj
)
{
return
channelIdObj
.
toString
();
return
channelIdObj
.
toString
();
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment