Commit 66f9fd42 authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

调整consumer结构

parent cbb7ee94
...@@ -18,6 +18,7 @@ require ( ...@@ -18,6 +18,7 @@ require (
github.com/onsi/gomega v1.4.3 // indirect github.com/onsi/gomega v1.4.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
) )
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"flag" "flag"
"git.quantgroup.cn/DevOps/enoch/service" "git.quantgroup.cn/DevOps/enoch/service"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/consumer"
"git.quantgroup.cn/DevOps/enoch/service/continuous_queries" "git.quantgroup.cn/DevOps/enoch/service/continuous_queries"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache" "git.quantgroup.cn/DevOps/enoch/service/file_cache"
...@@ -31,7 +32,7 @@ func main() { ...@@ -31,7 +32,7 @@ func main() {
log.Fatalln("create file error", err) log.Fatalln("create file error", err)
} }
file_cache.Load(conf.GlobalConfig.FileCachePath) file_cache.Load(conf.GlobalConfig.FileCachePath)
file_cache.RegisterJob(service.ReSubmit) file_cache.RegisterJob(consumer.ReSubmit)
go file_cache.Delete() go file_cache.Delete()
port := conf.GlobalConfig.Port port := conf.GlobalConfig.Port
...@@ -42,8 +43,8 @@ func main() { ...@@ -42,8 +43,8 @@ func main() {
//初始化redis连接池 //初始化redis连接池
data.RedisPoolInit() data.RedisPoolInit()
go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{}) go consumer.AgentClusterConsumer(conf.HealthTopic(), consumer.HealthMessageHandler{})
go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{}) go consumer.AgentClusterConsumer(conf.BraveTopic(), consumer.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port) intPort, _ := strconv.Atoi(port)
if quartz { if quartz {
......
...@@ -2,6 +2,7 @@ package service ...@@ -2,6 +2,7 @@ package service
import ( import (
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/consumer"
"git.quantgroup.cn/DevOps/enoch/service/util" "git.quantgroup.cn/DevOps/enoch/service/util"
"net/http" "net/http"
"strconv" "strconv"
...@@ -16,7 +17,7 @@ func DurationInterface(w http.ResponseWriter, r *http.Request) { ...@@ -16,7 +17,7 @@ func DurationInterface(w http.ResponseWriter, r *http.Request) {
func DurationCalcAndSendEmail(day string) { func DurationCalcAndSendEmail(day string) {
info := "" info := ""
Duration(day, func(sysName string, durations map[string]string) { consumer.Duration(day, func(sysName string, durations map[string]string) {
info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n" info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n"
for k, v := range durations { for k, v := range durations {
i, err := strconv.Atoi(v) i, err := strconv.Atoi(v)
...@@ -38,7 +39,7 @@ func CounterInterface(w http.ResponseWriter, r *http.Request) { ...@@ -38,7 +39,7 @@ func CounterInterface(w http.ResponseWriter, r *http.Request) {
func CounterCalcAndSendEmail(day string) { func CounterCalcAndSendEmail(day string) {
info := "" info := ""
Counter(day, func(sysName string, durations map[string]string) { consumer.Counter(day, func(sysName string, durations map[string]string) {
info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n" info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n"
for k, v := range durations { for k, v := range durations {
info = info + k + " , 次数:" + v + "\n" info = info + k + " , 次数:" + v + "\n"
......
package service package consumer
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
...@@ -24,21 +23,18 @@ func (BraveMessageHandler) MsgProcess(msg string) { ...@@ -24,21 +23,18 @@ func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg := make([]TraceMsg, 3) //[]TraceMsg{} traceMsg := make([]TraceMsg, 3) //[]TraceMsg{}
err := json.Unmarshal([]byte(msg), &traceMsg) err := json.Unmarshal([]byte(msg), &traceMsg)
if err != nil { if err != nil {
fmt.Println(err) logger.Error.Println("brave 解析msg失败:", err)
} }
//msgRedisProcess(traceMsg)
msgInfluxProcess(traceMsg) msgInfluxProcess(traceMsg)
} }
func (BraveMessageHandler) Destroy() { func (BraveMessageHandler) Destroy() {
//系统关系时,提交所有数据
if len(pointSlice) > 0 { if len(pointSlice) > 0 {
logger.Info.Println("braveMessageHandler 提交本地缓存数据:", len(pointSlice)) logger.Info.Println("braveMessageHandler 提交本地缓存数据:", len(pointSlice))
batchWrite(pointSlice) batchWrite(pointSlice)
} }
} }
var batchSize = 5000
var pointSlice = make([]*client.Point, 0, batchSize) var pointSlice = make([]*client.Point, 0, batchSize)
func msgInfluxProcess(traceMsgs []TraceMsg) { func msgInfluxProcess(traceMsgs []TraceMsg) {
...@@ -90,82 +86,6 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -90,82 +86,6 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
} }
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
func Duration(day string, fun func(sysName string, durations map[string]string)) { func Duration(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get() conn := data.Pool.Get()
......
package consumer
var batchSize = 5000
package service package consumer
import ( import (
"encoding/json" "encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/end_points" "git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
...@@ -22,7 +21,8 @@ func (HealthMessageHandler) MsgProcess(msg string) { ...@@ -22,7 +21,8 @@ func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{} chunkMsg := end_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg) err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil { if err != nil {
fmt.Println(err) logger.Error.Println("healthMessageHandler解析json失败:",err)
logger.Error.Println([]byte(msg))
} }
buildMsg(chunkMsg) buildMsg(chunkMsg)
} }
......
package service package consumer
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"os/signal" "os/signal"
) )
func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) { func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
config := cluster.NewConfig() config := cluster.NewConfig()
...@@ -21,11 +20,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -21,11 +20,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer func() { _ = consumer.Close() }()
// trap SIGINT to trigger a shutdown. // trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) signal.Notify(signals, os.Interrupt)
defer func() {
_ = consumer.Close()
}()
// consume errors // consume errors
go func() { go func() {
for err := range consumer.Errors() { for err := range consumer.Errors() {
......
package service package consumer
type MessageHandler interface { type MessageHandler interface {
MsgProcess(msg string) MsgProcess(msg string)
......
package service package consumer
type TraceMsg struct { type TraceMsg struct {
TraceId string `json:"traceId"` TraceId string `json:"traceId"`
......
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment