Commit c2caca2c authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

调整consumer结构

parent 66f9fd42
...@@ -22,7 +22,7 @@ func (HealthMessageHandler) MsgProcess(msg string) { ...@@ -22,7 +22,7 @@ func (HealthMessageHandler) MsgProcess(msg string) {
err := json.Unmarshal([]byte(msg), &chunkMsg) err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil { if err != nil {
logger.Error.Println("healthMessageHandler解析json失败:",err) logger.Error.Println("healthMessageHandler解析json失败:",err)
logger.Error.Println([]byte(msg)) logger.Error.Println(msg)
} }
buildMsg(chunkMsg) buildMsg(chunkMsg)
} }
......
...@@ -7,11 +7,14 @@ import ( ...@@ -7,11 +7,14 @@ import (
"github.com/bsm/sarama-cluster" "github.com/bsm/sarama-cluster"
"os" "os"
"os/signal" "os/signal"
"sync/atomic"
"syscall"
) )
var consumerCount int32
func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) { func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
config := cluster.NewConfig() config := cluster.NewConfig()
config.Consumer.Return.Errors = true config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true config.Group.Return.Notifications = true
...@@ -20,12 +23,18 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -20,12 +23,18 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
if err != nil { if err != nil {
panic(err) panic(err)
} }
atomic.AddInt32(&consumerCount, 1)
// trap SIGINT to trigger a shutdown. // trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
defer func() { defer func() {
_ = consumer.Close() _ = consumer.Close()
atomic.AddInt32(&consumerCount, -1)
logger.Info.Println("consumer结束")
if consumerCount == 0 {
os.Exit(0)
}
}() }()
// consume errors // consume errors
...@@ -55,5 +64,4 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -55,5 +64,4 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
return return
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment