Commit 8f5b161c authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

优化代码

parent 644c5350
...@@ -26,7 +26,7 @@ func (BraveMessageHandler) MsgProcess(msg string) { ...@@ -26,7 +26,7 @@ func (BraveMessageHandler) MsgProcess(msg string) {
if err != nil { if err != nil {
logger.Error.Println("brave 解析msg失败:", err) logger.Error.Println("brave 解析msg失败:", err)
} }
logger.Info.Println("解析brave消息")
msgInfluxProcess(traceMsg) msgInfluxProcess(traceMsg)
} }
func (BraveMessageHandler) Destroy() { func (BraveMessageHandler) Destroy() {
......
...@@ -30,6 +30,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -30,6 +30,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
defer func() { defer func() {
_ = consumer.Close() _ = consumer.Close()
messageHandle.Destroy()
atomic.AddInt32(&consumerCount, -1) atomic.AddInt32(&consumerCount, -1)
logger.Info.Println("consumer结束") logger.Info.Println("consumer结束")
if consumerCount == 0 { if consumerCount == 0 {
...@@ -60,7 +61,6 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -60,7 +61,6 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
consumer.MarkOffset(msg, "") // mark message as processed consumer.MarkOffset(msg, "") // mark message as processed
} }
case <-signals: case <-signals:
messageHandle.Destroy()
return return
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment