Commit 85d47da0 authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

# 解析健康信息

parent 3451cc63
...@@ -11,6 +11,8 @@ import ( ...@@ -11,6 +11,8 @@ import (
) )
var sysNameIndex = make(map[int64]bool) var sysNameIndex = make(map[int64]bool)
var metricsPointSlice = make([]*client.Point, 0, batchSize)
var healthPointSlice = make([]*client.Point, 0, batchSize)
func AgentMsgProcess(msg string) { func AgentMsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{} chunkMsg := end_points.ChunkMsg{}
...@@ -18,67 +20,95 @@ func AgentMsgProcess(msg string) { ...@@ -18,67 +20,95 @@ func AgentMsgProcess(msg string) {
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
buildMetricsInfluxMsg(chunkMsg) buildMsg(chunkMsg)
} }
func buildMetricsInfluxMsg(chunkMsg end_points.ChunkMsg) { func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, db map[string]end_points.DBDetail) {
tags := make(map[string]string, )
tags["sys_name"] = appName
tags["host"] = ip
fields := make(map[string]interface{})
for k, v := range db {
var fieldName = v.Details.Database + "—" + k
fields[fieldName] = isOK(v.Status.Code)
}
if len(healthPointSlice) >= submitLimit {
go batchWrite(healthPointSlice)
healthPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("health_info", tags, fields, timestamp)
println(point)
healthPointSlice = append(healthPointSlice, point)
}
func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, health end_points.Health, metrics end_points.MetricsInfo) {
tags := make(map[string]string, )
fields := make(map[string]interface{})
tags["sys_name"] = appName
tags["host"] = ip
var status = health.Status
fields["sever_status"] = isOK(status.Code)
var diskSpace = health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free
fields["disk_threshold"] = diskSpace.Threshold
fields["mem_tol"] = metrics.Mem
fields["mem_free"] = metrics.MemFree
fields["heap_tol"] = metrics.Heap
fields["heap_init"] = metrics.HeapInit
fields["heap_used"] = metrics.HeapUsed
fields["nonheap_tol"] = metrics.Nonheap
fields["nonheap_init"] = metrics.NonheapInit
fields["nonheap_used"] = metrics.NonheapUsed
fields["nonheap_commit"] = metrics.NonheapCommitted
fields["thread_tol"] = metrics.ThreadsTotalStarted
fields["thread_peak"] = metrics.ThreadsPeak
fields["thread_daemon"] = metrics.ThreadsDaemon
fields["class_tol"] = metrics.Classes
fields["class_loaded"] = metrics.ClassesLoaded
fields["class_unloaded"] = metrics.ClassesUnloaded
fields["gc_parnew_count"] = metrics.GcParnewCount
fields["gc_parnew_time"] = metrics.GcParnewTime
fields["gc_concurrent_mark_sweep"] = metrics.GcConcurrentmarksweepCount
fields["gc_concurrent_mark_time"] = metrics.GcConcurrentmarksweepTime
fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage
if len(metricsPointSlice) >= submitLimit {
go batchWrite(metricsPointSlice)
metricsPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("machine_info", tags, fields, timestamp)
metricsPointSlice = append(metricsPointSlice, point)
}
func buildMsg(chunkMsg end_points.ChunkMsg) {
var ip = inetAtoN(chunkMsg.Ip) var ip = inetAtoN(chunkMsg.Ip)
sysNameIndex[ip] = true sysNameIndex[ip] = true
var sysNameCount = len(sysNameIndex) var sysNameCount = len(sysNameIndex)
for _, p := range chunkMsg.EndPoints { for _, p := range chunkMsg.EndPoints {
tags := make(map[string]string, )
tags["sys_name"] = chunkMsg.AppName var appName = chunkMsg.AppName
tags["host"] = chunkMsg.Ip var ip = chunkMsg.Ip
fields := make(map[string]interface{})
var metrics = p.Metrics
var health = p.Health
var status = health.Status
if "UP" == status.Code {
fields["sever_status"] = 1
}
var diskSpace = health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free
fields["disk_threshold"] = diskSpace.Threshold
fields["mem_tol"] = metrics.Mem
fields["mem_free"] = metrics.MemFree
fields["heap_tol"] = metrics.Heap
fields["heap_init"] = metrics.HeapInit
fields["heap_used"] = metrics.HeapUsed
fields["nonheap_tol"] = metrics.Nonheap
fields["nonheap_init"] = metrics.NonheapInit
fields["nonheap_used"] = metrics.NonheapUsed
fields["nonheap_commit"] = metrics.NonheapCommitted
fields["thread_tol"] = metrics.ThreadsTotalStarted
fields["thread_peak"] = metrics.ThreadsPeak
fields["thread_daemon"] = metrics.ThreadsDaemon
fields["class_tol"] = metrics.Classes
fields["class_loaded"] = metrics.ClassesLoaded
fields["class_unloaded"] = metrics.ClassesUnloaded
fields["gc_parnew_count"] = metrics.GcParnewCount
fields["gc_parnew_time"] = metrics.GcParnewTime
fields["gc_concurrent_mark_sweep"] = metrics.GcConcurrentmarksweepCount
fields["gc_concurrent_mark_time"] = metrics.GcConcurrentmarksweepTime
fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage
unix := time.Unix(0, p.Timestamp*1000000) unix := time.Unix(0, p.Timestamp*1000000)
if len(pointSlice) >= sysNameCount { //metricsInfo
go batchWrite(pointSlice) buildMetricsInfluxMsg(appName, ip, unix, sysNameCount, p.Health, p.Metrics)
pointSlice = make([]*client.Point, 0, batchSize)
} //health_info
point, _ := client.NewPoint("machine_info", tags, fields, unix) buildHealthInfluxMsg(appName, ip, unix, sysNameCount, p.Health.Details.Db.Details)
pointSlice = append(pointSlice, point)
} }
} }
...@@ -87,3 +117,10 @@ func inetAtoN(ip string) int64 { ...@@ -87,3 +117,10 @@ func inetAtoN(ip string) int64 {
ret.SetBytes(net.ParseIP(ip).To4()) ret.SetBytes(net.ParseIP(ip).To4())
return ret.Int64() return ret.Int64()
} }
func isOK(code string) int {
if "UP" == code {
return 1
}
return 0
}
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment