Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
E
enoch
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
DevOps
enoch
Commits
cbb7ee94
Commit
cbb7ee94
authored
May 22, 2019
by
Node- 门 忠鑫
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
添加consumer destroy 功能
parent
d6377de9
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
24 additions
and
5 deletions
+24
-5
agent_msg_process.go
service/agent_msg_process.go
+13
-0
kafka_agent_sarama.go
service/kafka_agent_sarama.go
+1
-0
messageHandler.go
service/messageHandler.go
+1
-0
msg_process.go
service/msg_process.go
+9
-5
No files found.
service/agent_msg_process.go
View file @
cbb7ee94
...
@@ -4,6 +4,7 @@ import (
...
@@ -4,6 +4,7 @@ import (
"encoding/json"
"encoding/json"
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"math/big"
"math/big"
"net"
"net"
...
@@ -26,6 +27,18 @@ func (HealthMessageHandler) MsgProcess(msg string) {
...
@@ -26,6 +27,18 @@ func (HealthMessageHandler) MsgProcess(msg string) {
buildMsg
(
chunkMsg
)
buildMsg
(
chunkMsg
)
}
}
func
(
HealthMessageHandler
)
Destroy
()
{
if
len
(
metricsPointSlice
)
>
0
{
logger
.
Info
.
Println
(
"metricsMessageHandler 提交本地缓存数据:"
,
len
(
metricsPointSlice
))
batchWrite
(
metricsPointSlice
)
}
if
len
(
healthPointSlice
)
>
0
{
logger
.
Info
.
Println
(
"HealthMessageHandler 提交本地缓存数据:"
,
len
(
healthPointSlice
))
batchWrite
(
healthPointSlice
)
}
}
func
buildHealthInfluxMsg
(
appName
string
,
ip
string
,
timestamp
time
.
Time
,
submitLimit
int
,
db
map
[
string
]
end_points
.
DBDetail
)
{
func
buildHealthInfluxMsg
(
appName
string
,
ip
string
,
timestamp
time
.
Time
,
submitLimit
int
,
db
map
[
string
]
end_points
.
DBDetail
)
{
tags
:=
make
(
map
[
string
]
string
,
)
tags
:=
make
(
map
[
string
]
string
,
)
tags
[
"sys_name"
]
=
appName
tags
[
"sys_name"
]
=
appName
...
...
service/kafka_agent_sarama.go
View file @
cbb7ee94
...
@@ -49,6 +49,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
...
@@ -49,6 +49,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
consumer
.
MarkOffset
(
msg
,
""
)
// mark message as processed
consumer
.
MarkOffset
(
msg
,
""
)
// mark message as processed
}
}
case
<-
signals
:
case
<-
signals
:
messageHandle
.
Destroy
()
return
return
}
}
}
}
...
...
service/messageHandler.go
View file @
cbb7ee94
...
@@ -2,4 +2,5 @@ package service
...
@@ -2,4 +2,5 @@ package service
type
MessageHandler
interface
{
type
MessageHandler
interface
{
MsgProcess
(
msg
string
)
MsgProcess
(
msg
string
)
Destroy
()
}
}
service/msg_process.go
View file @
cbb7ee94
...
@@ -30,6 +30,13 @@ func (BraveMessageHandler) MsgProcess(msg string) {
...
@@ -30,6 +30,13 @@ func (BraveMessageHandler) MsgProcess(msg string) {
//msgRedisProcess(traceMsg)
//msgRedisProcess(traceMsg)
msgInfluxProcess
(
traceMsg
)
msgInfluxProcess
(
traceMsg
)
}
}
func
(
BraveMessageHandler
)
Destroy
()
{
//系统关系时,提交所有数据
if
len
(
pointSlice
)
>
0
{
logger
.
Info
.
Println
(
"braveMessageHandler 提交本地缓存数据:"
,
len
(
pointSlice
))
batchWrite
(
pointSlice
)
}
}
var
batchSize
=
5000
var
batchSize
=
5000
var
pointSlice
=
make
([]
*
client
.
Point
,
0
,
batchSize
)
var
pointSlice
=
make
([]
*
client
.
Point
,
0
,
batchSize
)
...
@@ -97,8 +104,6 @@ func batchWrite(pointArray []*client.Point) {
...
@@ -97,8 +104,6 @@ func batchWrite(pointArray []*client.Point) {
}
}
}
}
func
httpWrite
(
pointArray
[]
*
client
.
Point
)
error
{
func
httpWrite
(
pointArray
[]
*
client
.
Point
)
error
{
c
:=
data
.
NewClient
()
c
:=
data
.
NewClient
()
defer
func
()
{
_
=
c
.
Close
()
}()
defer
func
()
{
_
=
c
.
Close
()
}()
...
@@ -130,7 +135,7 @@ func ReSubmit(data []string) error {
...
@@ -130,7 +135,7 @@ func ReSubmit(data []string) error {
if
len
(
pointSlice
)
>
1000
{
if
len
(
pointSlice
)
>
1000
{
err
:=
httpWrite
(
pointSlice
)
err
:=
httpWrite
(
pointSlice
)
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
logger
.
Info
.
Println
(
"缓存重新提交:1000"
)
logger
.
Info
.
Println
(
"缓存重新提交:1000"
)
pointSlice
=
make
([]
*
client
.
Point
,
0
)
pointSlice
=
make
([]
*
client
.
Point
,
0
)
...
@@ -140,7 +145,7 @@ func ReSubmit(data []string) error {
...
@@ -140,7 +145,7 @@ func ReSubmit(data []string) error {
err
:=
httpWrite
(
pointSlice
)
err
:=
httpWrite
(
pointSlice
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Info
.
Println
(
pointSlice
)
logger
.
Info
.
Println
(
pointSlice
)
return
err
return
err
}
}
logger
.
Info
.
Println
(
"缓存重新提交:"
,
len
(
pointSlice
))
logger
.
Info
.
Println
(
"缓存重新提交:"
,
len
(
pointSlice
))
}
}
...
@@ -200,5 +205,4 @@ func Counter(day string, fun func(sysName string, durations map[string]string))
...
@@ -200,5 +205,4 @@ func Counter(day string, fun func(sysName string, durations map[string]string))
}
}
fun
(
string
(
redisKey
.
([]
uint8
)),
reply2
)
fun
(
string
(
redisKey
.
([]
uint8
)),
reply2
)
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment