Commit 43a9d969 authored by xiaoguang.xu's avatar xiaoguang.xu

更换为UDP, 批量1000个写入一次

parent e9abe7b8
...@@ -7,11 +7,11 @@ import ( ...@@ -7,11 +7,11 @@ import (
func NewClient() client.Client { func NewClient() client.Client {
conf := client.HTTPConfig{ conf := client.UDPConfig{
Addr: "http://172.20.6.29:8086", Addr: "172.20.6.29:8089",
} }
con, err := client.NewHTTPClient(conf) con, err := client.NewUDPClient(conf)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
......
...@@ -26,16 +26,9 @@ func MsgProcess(msg string) { ...@@ -26,16 +26,9 @@ func MsgProcess(msg string) {
msgInfluxProcess(traceMsg) msgInfluxProcess(traceMsg)
} }
func msgInfluxProcess(traceMsgs []TraceMsg) { var pointSlice = make([]*client.Point, 0, 1000)
c := conf.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{ func msgInfluxProcess(traceMsgs []TraceMsg) {
Database: "monitor",
})
if err != nil {
log.Fatal(err)
}
for _, traceMsg := range traceMsgs { for _, traceMsg := range traceMsgs {
if traceMsg.Kind != "SERVER" { if traceMsg.Kind != "SERVER" {
...@@ -47,21 +40,18 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -47,21 +40,18 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
path = traceMsg.Tags.HttpMethod + " " + traceMsg.Tags.HttpPath path = traceMsg.Tags.HttpMethod + " " + traceMsg.Tags.HttpPath
} }
//fmt.Println("搞起" ,traceMsg.TraceId)
sysName := traceMsg.LocalEndpoint.ServiceName sysName := traceMsg.LocalEndpoint.ServiceName
fields := make(map[string]interface{}) fields := make(map[string]interface{})
fields["duration"] = traceMsg.Duration / 1000 fields["duration"] = traceMsg.Duration / 1000
fields["traceId"] = traceMsg.TraceId fields["traceId"] = traceMsg.TraceId
tags := make(map[string]string,) tags := make(map[string]string, )
tags["sys_name"] = sysName tags["sys_name"] = sysName
tags["path"] = path tags["path"] = path
tags["host"] = traceMsg.LocalEndpoint.Ipv4 tags["host"] = traceMsg.LocalEndpoint.Ipv4
tags["kind"] = traceMsg.Kind tags["kind"] = traceMsg.Kind
tags["duration_tag"] = strconv.Itoa(traceMsg.Duration) tags["duration_tag"] = strconv.Itoa(traceMsg.Duration)
bytes, err := json.Marshal(traceMsg) bytes, err := json.Marshal(traceMsg)
msg := string(bytes) msg := string(bytes)
...@@ -74,9 +64,14 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -74,9 +64,14 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
unix := time.Unix(0, traceMsg.Timestamp*1000) unix := time.Unix(0, traceMsg.Timestamp*1000)
if len(pointSlice) == 1000 {
go batchWrite(pointSlice)
pointSlice = make([]*client.Point, 0)
}
point, _ := client.NewPoint("trace_info", tags, fields, unix) point, _ := client.NewPoint("trace_info", tags, fields, unix)
points.AddPoint(point) pointSlice = append(pointSlice, point)
err = c.Write(points)
if err != nil { if err != nil {
log.Fatal(err, msg) log.Fatal(err, msg)
} }
...@@ -84,6 +79,25 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -84,6 +79,25 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
} }
func batchWrite(pointArray []*client.Point) {
c := conf.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
})
if err != nil {
log.Fatal(err)
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
log.Fatal(err)
}
}
func msgRedisProcess(traceMsg []TraceMsg) { func msgRedisProcess(traceMsg []TraceMsg) {
conn := conf.Pool.Get() conn := conf.Pool.Get()
defer func() { _ = conn.Close() }() defer func() { _ = conn.Close() }()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment