Commit 40519f9f authored by xiaoguang.xu's avatar xiaoguang.xu

consumer 去掉重复代码.

parent 85d47da0
...@@ -34,8 +34,8 @@ func main() { ...@@ -34,8 +34,8 @@ func main() {
//初始化redis连接池 //初始化redis连接池
conf.RedisPoolInit() conf.RedisPoolInit()
go service.AgentClusterConsumer() go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{})
go service.ClusterConsumer() go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port) intPort, _ := strconv.Atoi(port)
if quartz { if quartz {
......
...@@ -14,7 +14,10 @@ var sysNameIndex = make(map[int64]bool) ...@@ -14,7 +14,10 @@ var sysNameIndex = make(map[int64]bool)
var metricsPointSlice = make([]*client.Point, 0, batchSize) var metricsPointSlice = make([]*client.Point, 0, batchSize)
var healthPointSlice = make([]*client.Point, 0, batchSize) var healthPointSlice = make([]*client.Point, 0, batchSize)
func AgentMsgProcess(msg string) { type HealthMessageHandler struct {
}
func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{} chunkMsg := end_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg) err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil { if err != nil {
...@@ -38,7 +41,6 @@ func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submit ...@@ -38,7 +41,6 @@ func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submit
healthPointSlice = make([]*client.Point, 0, batchSize) healthPointSlice = make([]*client.Point, 0, batchSize)
} }
point, _ := client.NewPoint("health_info", tags, fields, timestamp) point, _ := client.NewPoint("health_info", tags, fields, timestamp)
println(point)
healthPointSlice = append(healthPointSlice, point) healthPointSlice = append(healthPointSlice, point)
} }
......
package conf
var (
//agentBrokers = []string{"192.168.4.100:15091", "192.168.4.100:15092", "192.168.4.100:15093"}
agentBrokers = []string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"}
agentKafkaTopic = "quantGroup.tech.enoch.pro"
agentKafkaGroup = "quantGroup-enoch-agent"
)
type KafkaConf struct {
Broker []string
Topic string
Group string
}
func HealthTopic() KafkaConf {
return KafkaConf{
[]string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"},
"quantGroup.tech.enoch.pro",
"quantGroup-enoch-agent"}
}
func BraveTopic() KafkaConf {
return KafkaConf{
[]string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"},
"quantGroup.tech.brave.pro",
"enoch-group"}
}
...@@ -43,8 +43,3 @@ func RedisPoolInit() { ...@@ -43,8 +43,3 @@ func RedisPoolInit() {
flag.Parse() flag.Parse()
Pool = newPool(*redisServer, *redisPassword) Pool = newPool(*redisServer, *redisPassword)
} }
func TempNewPool() *redis.Pool{
return newPool("usercenter-redis1.quantgroups.com:6379","")
}
\ No newline at end of file
package service package service
import ( import (
"context" "git.quantgroup.cn/DevOps/enoch/service/conf"
"fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster" "github.com/bsm/sarama-cluster"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"time"
) )
var (
//agentBrokers = []string{"192.168.4.100:15091", "192.168.4.100:15092", "192.168.4.100:15093"}
agentBrokers = []string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"}
agentKafkaTopic = "quantGroup.tech.enoch.pro"
agentKafkaGroup = "quantGroup-enoch-agent"
)
type enochAgentConsumerGroupHandler struct{}
func (enochAgentConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
func (enochAgentConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h enochAgentConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
AgentMsgProcess(string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
func AgentClusterConsumer() {
config := cluster.NewConfig() config := cluster.NewConfig()
config.Consumer.Return.Errors = true config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true config.Group.Return.Notifications = true
consumer, err := cluster.NewConsumer(agentBrokers, agentKafkaGroup, []string{agentKafkaTopic}, config) consumer, err := cluster.NewConsumer(kafkaConf.Broker, kafkaConf.Group, []string{kafkaConf.Topic}, config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
...@@ -65,8 +45,7 @@ func AgentClusterConsumer() { ...@@ -65,8 +45,7 @@ func AgentClusterConsumer() {
select { select {
case msg, ok := <-consumer.Messages(): case msg, ok := <-consumer.Messages():
if ok { if ok {
//fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) messageHandle.MsgProcess(string(msg.Value))
AgentMsgProcess(string(msg.Value))
consumer.MarkOffset(msg, "") // mark message as processed consumer.MarkOffset(msg, "") // mark message as processed
} }
case <-signals: case <-signals:
...@@ -75,43 +54,3 @@ func AgentClusterConsumer() { ...@@ -75,43 +54,3 @@ func AgentClusterConsumer() {
} }
} }
func AgentConsumer() {
config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Retry.Backoff = 100 * time.Second
config.Consumer.Return.Errors = false
client, err := sarama.NewClient(agentBrokers, config)
if err != nil {
fmt.Printf("kafka encoh.dev消费者获取失败")
return
}
defer func() { _ = client.Close() }()
consumerGroup, err := sarama.NewConsumerGroupFromClient(agentKafkaGroup, client)
if err != nil {
fmt.Println("kafka 创建分区消费者失败")
return
}
defer func() { _ = consumerGroup.Close() }()
// Track errors
go func() {
for err := range consumerGroup.Errors() {
fmt.Println("消费出错了", err)
}
}()
ctx := context.Background()
handler := enochAgentConsumerGroupHandler{}
for {
err = consumerGroup.Consume(ctx, []string{agentKafkaTopic}, handler)
if err != nil {
fmt.Println(err)
}
}
}
package service
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"log"
"os"
"os/signal"
"time"
)
var (
brokers = []string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"}
kafkaTopic = "quantGroup.tech.brave.pro"
kafkaGroup = "enoch-group"
)
type exampleConsumerGroupHandler struct{}
func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
MsgProcess(string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
func ClusterConsumer() {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true
consumer, err := cluster.NewConsumer(brokers, kafkaGroup, []string{kafkaTopic}, config)
if err != nil {
panic(err)
}
defer func() { _ = consumer.Close() }()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
// consume messages, watch signals
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
//fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
MsgProcess(string(msg.Value))
consumer.MarkOffset(msg, "") // mark message as processed
}
case <-signals:
return
}
}
}
func Consumer() {
config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Retry.Backoff = 100 * time.Second
config.Consumer.Return.Errors = false
client, err := sarama.NewClient(brokers, config)
if err != nil {
fmt.Printf("kafka 消费者获取失败")
return
}
defer func() { _ = client.Close() }()
consumerGroup, err := sarama.NewConsumerGroupFromClient(kafkaGroup, client)
if err != nil {
fmt.Println("kafka 创建分区消费者失败")
return
}
defer func() { _ = consumerGroup.Close() }()
// Track errors
go func() {
for err := range consumerGroup.Errors() {
fmt.Println("消费出错了", err)
}
}()
ctx := context.Background()
handler := exampleConsumerGroupHandler{}
for {
err = consumerGroup.Consume(ctx, []string{kafkaTopic}, handler)
if err != nil {
fmt.Println(err)
}
}
}
package service
type MessageHandler interface {
MsgProcess(msg string)
}
...@@ -7,15 +7,17 @@ import ( ...@@ -7,15 +7,17 @@ import (
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"log" "log"
"strconv"
"time" "time"
) )
type BraveMessageHandler struct {
}
var httpMethod = map[string]string{ var httpMethod = map[string]string{
"get": "", "GET": "", "post": "", "options": "", "put": "", "delete": "", "head": "", "get": "", "GET": "", "post": "", "options": "", "put": "", "delete": "", "head": "",
} }
func MsgProcess(msg string) { func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg := make([]TraceMsg, 3) //[]TraceMsg{} traceMsg := make([]TraceMsg, 3) //[]TraceMsg{}
err := json.Unmarshal([]byte(msg), &traceMsg) err := json.Unmarshal([]byte(msg), &traceMsg)
...@@ -100,75 +102,6 @@ func batchWrite(pointArray []*client.Point) { ...@@ -100,75 +102,6 @@ func batchWrite(pointArray []*client.Point) {
} }
} }
func msgRedisProcess(traceMsg []TraceMsg) {
conn := conf.Pool.Get()
defer func() { _ = conn.Close() }()
for _, v := range traceMsg {
if v.Kind != "SERVER" {
break
}
//处理消息
path := v.Name
if _, ok := httpMethod[path]; ok {
path = v.Tags.HttpMethod + " " + v.Tags.HttpPath
}
var counterKey = keyGenerator("counter", v.LocalEndpoint.ServiceName)
var field = path
//1. 接口计数器
_, err := conn.Do("zincrby", counterKey, 1, field)
if err != nil {
fmt.Println("执行出错: ", err)
}
//2. 接口耗时排行
var durationKey = keyGenerator("duration", v.LocalEndpoint.ServiceName)
var member = path
//v.TraceId
casScoreAdd(conn, durationKey, member, v.Duration)
}
}
func keyGenerator(prefix string, info string) string {
day := time.Now().Format("20060102")
return prefix + ":" + info + ":" + day
}
func casScoreAdd(conn redis.Conn, durationKey string, member string, duration int) {
durationInfo, err := conn.Do("zscore", durationKey, member)
var oldDuration = duration
if durationInfo != nil {
s := string(durationInfo.([]uint8))
i, err := strconv.Atoi(s)
if err != nil {
fmt.Println("转换durationInfo 出错", err)
}
if i >= duration {
//如果大于当前值, 中断
return
}
oldDuration = i
}
err = conn.Send("watch", durationKey)
err = conn.Send("multi")
err = conn.Send("zadd", durationKey, duration, member)
reply, err := conn.Do("exec")
if err != nil {
fmt.Println("执行出错 : ", err)
}
if reply != nil {
return
}
//loop
casScoreAdd(conn, durationKey, member, oldDuration)
}
func Duration(day string, fun func(sysName string, durations map[string]string)) { func Duration(day string, fun func(sysName string, durations map[string]string)) {
conn := conf.Pool.Get() conn := conf.Pool.Get()
......
package service
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/conf"
)
func Test() {
conn := conf.Pool.Get()
defer conn.Close()
_, err := conn.Do("hincrby", "key", "field", 1)
if err != nil {
fmt.Println(err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment