Commit a630c16e authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

# 添加机器信息

parent 10c844eb
...@@ -32,7 +32,8 @@ func main() { ...@@ -32,7 +32,8 @@ func main() {
//初始化redis连接池 //初始化redis连接池
conf.RedisPoolInit() conf.RedisPoolInit()
go service.Consumer() //go service.Consumer()
go service.AgentConsumer()
if quartz { if quartz {
log.Println("启动定时任务") log.Println("启动定时任务")
......
package service
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"github.com/influxdata/influxdb/client/v2"
"time"
)
func AgentMsgProcess(msg string) {
chunkMsg := enn_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil {
fmt.Println(err)
}
buildMetricsInfluxMsg(chunkMsg)
}
func buildMetricsInfluxMsg(chunkMsg enn_points.ChunkMsg) {
for _, p := range chunkMsg.EndPoints {
tags := make(map[string]string, )
tags["sys_name"] = chunkMsg.AppName
tags["host"] = chunkMsg.Ip
fields := make(map[string]interface{})
var metrics = p.Metrics
var health = p.Health
var status = health.Status
if "UP" == status.Code {
fields["sever_status"] = 1
}
var diskSpace = health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free
fields["disk_threshold"] = diskSpace.Threshold
fields["mem_tol"] = metrics.Mem
fields["mem_free"] = metrics.MemFree
fields["heap_tol"] = metrics.Heap
fields["heap_init"] = metrics.HeapInit
fields["heap_used"] = metrics.HeapUsed
fields["nonheap_tol"] = metrics.Nonheap
fields["nonheap_init"] = metrics.NonheapInit
fields["nonheap_used"] = metrics.NonheapUsed
fields["nonheap_commit"] = metrics.NonheapCommitted
fields["thread_tol"] = metrics.ThreadsTotalStarted
fields["thread_peak"] = metrics.ThreadsPeak
fields["thread_daemon"] = metrics.ThreadsDaemon
fields["class_tol"] = metrics.Classes
fields["class_loaded"] = metrics.ClassesLoaded
fields["class_unloaded"] = metrics.ClassesUnloaded
fields["gc_parnew_count"] = metrics.GcParnewCount
fields["gc_parnew_time"] = metrics.GcParnewTime
fields["gc_concurrent_mark_sweep"] = metrics.GcConcurrentmarksweepCount
fields["gc_concurrent_mark_time"] = metrics.GcConcurrentmarksweepTime
fields["processors"] = metrics.Processors
unix := time.Unix(0, p.Timestamp)
if (len(pointSlice) == batchSize) {
go batchWrite(pointSlice)
pointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("machine_info", tags, fields, unix)
pointSlice = append(pointSlice, point)
}
}
package enn_points
type ChunkMsg struct {
AppName string `json:"appName"`
Ip string `json:"ip"`
EndPoints []EndPoint `json:"end_points"`
}
type EndPoint struct {
Health Health `json:"health"`
Metrics MetricsInfo `json:"metrics"`
Timestamp int64 `json:"timestamp"`
}
type Health struct {
Status Status `json:"status"`
Details Detail `json:"details"`
}
type Detail struct {
DiskSpace DiskInfo `json:"disk_space"`
Redis RedisInfo `json:"redis"`
Db interface{} `json:"db"`
}
/**
硬盘信息
*/
type DiskInfo struct {
Details DiskDetail `json:"details"`
Status Status `json:"status"`
}
/**
硬盘详情
*/
type DiskDetail struct {
Total int64 `json:"total"`
Free int64 `json:"free"`
Threshold int64 `json:"threshold"`
}
/**
redis
*/
type RedisInfo struct {
Status Status `json:"status"`
}
type Status struct {
Code string `json:"code"`
Description string `json:"description"`
}
type MetricsInfo struct {
Mem int `json:"mem"`
MemFree int `json:"mem.free"`
Processors int `json:"processors"`
InstanceUptime int `json:"instance.uptime"`
Uptime int `json:"uptime"`
SystemloadAverage float64 `json:"systemload.average"`
HeapCommitted int `json:"heap.committed"`
HeapInit int `json:"heap.init"`
HeapUsed int `json:"heap.used"`
Heap int `json:"heap"`
NonheapCommitted int `json:"nonheap.committed"`
NonheapInit int `json:"nonheap.init"`
NonheapUsed int `json:"nonheap.used"`
Nonheap int `json:"nonheap"`
ThreadsPeak int `json:"threads.peak"`
ThreadsDaemon int `json:"threads.daemon"`
ThreadsTotalStarted int `json:"threads.totalStarted"`
Threads int `json:"threads"`
Classes int `json:"classes"`
ClassesLoaded int `json:"classes.loaded"`
ClassesUnloaded int `json:"classes.unloaded"`
GcParnewCount int `json:"gc.parnew.count"`
GcParnewTime int `json:"gc.parnew.time"`
GcConcurrentmarksweepCount int `json:"gc.concurrentmarksweep.count"`
GcConcurrentmarksweepTime int `json:"gc.concurrentmarksweep.time"`
CounterStatus200AddReduceApply int `json:"counter.status.200.addReduceApply"`
CounterStatus200OwnerList int `json:"counter.status.200.ownerList"`
CounterStatus200ShowReport int `json:"counter.status.200.showReport"`
GaugeResponseShowReduceApplyList int `json:"gauge.response.showReduceApplyList"`
CounterStatus200ShowOwnerReduceApplyList int `json:"counter.status.200.showOwnerReduceApplyList"`
GaugeResponseQueryPreciseCallRecordList int `json:"gauge.response.queryPreciseCallRecordList"`
GaugeResponseQueryUserPhoneRemarkList int `json:"gauge.response.queryUserPhoneRemarkList"`
CounterStatus200FindCreditReport int `json:"counter.status.200.findCreditReport"`
GaugeResponseAddUserPhoneRemark int `json:"gauge.response.addUserPhoneRemark"`
CounterStatus200DisperseCase int `json:"counter.status.200.disperseCase"`
CounterStatus200QueryDispatcherParam int `json:"counter.status.200.queryDispatcherParam"`
CounterStatus200CheckReduceApply int `json:"counter.status.200.checkReduceApply"`
GaugeResponseQueryRepayOrderList int `json:"gauge.response.queryRepayOrderList"`
GaugeResponseAddReduceApply int `json:"gauge.response.addReduceApply"`
GaugeResponseOwnerList int `json:"gauge.response.ownerList"`
GaugeResponsePhoneBook int `json:"gauge.response.phoneBook"`
CounterStatus200ViewCaseDetail int `json:"counter.status.200.viewCaseDetail"`
GaugeResponseUserInfo int `json:"gauge.response.userInfo"`
GaugeResponseFindAlipayInfo int `json:"gauge.response.findAlipayInfo"`
CounterStatus200MenuTree int `json:"counter.status.200.menuTree"`
GaugeResponseCheckReduceApply int `json:"gauge.response.checkReduceApply"`
CounterStatus200QueryPreciseCallRecordList int `json:"counter.status.200.queryPreciseCallRecordList"`
CounterStatus200PhoneBook int `json:"counter.status.200.phoneBook"`
GaugeResponseAutoAssignReceiveRelation int `json:"gauge.response.autoAssign.receiveRelation"`
CounterStatus200SaveCall int `json:"counter.status.200.saveCall"`
CounterStatus200QueryRepayOrderList int `json:"counter.status.200.queryRepayOrderList"`
CounterStatus200AutoAssignReceiveRelation int `json:"counter.status.200.autoAssign.receiveRelation"`
CounterStatus200RemoveCallRecord int `json:"counter.status.200.removeCallRecord"`
GaugeResponseOrganizationList int `json:"gauge.response.organizationList"`
CounterStatus200QueryCasePackPage int `json:"counter.status.200.queryCasePackPage"`
GaugeResponseShowOwnerReduceApplyList int `json:"gauge.response.showOwnerReduceApplyList"`
GaugeResponseHandleCollectionReduce int `json:"gauge.response.handleCollectionReduce"`
CounterStatus200Logout int `json:"counter.status.200.logout"`
CounterStatus200QueryCasePage int `json:"counter.status.200.queryCasePage"`
GaugeResponseSelectList int `json:"gauge.response.selectList"`
CounterStatus200HandleCollectionReduce int `json:"counter.status.200.handleCollectionReduce"`
CounterStatus200FindAlipayInfo int `json:"counter.status.200.findAlipayInfo"`
GaugeResponseFundingList int `json:"gauge.response.fundingList"`
GaugeResponseTechHealthCheck int `json:"gauge.response.tech.health.check"`
CounterStatus200Login int `json:"counter.status.200.login"`
CounterStatus200HaveUnCheckedApply int `json:"counter.status.200.haveUnCheckedApply"`
GaugeResponseSaveCall int `json:"gauge.response.saveCall"`
GaugeResponseQueryDispatcherParam int `json:"gauge.response.queryDispatcherParam"`
CounterStatus200OrganizationList int `json:"counter.status.200.organizationList"`
GaugeResponseQueryOrganizationList int `json:"gauge.response.queryOrganizationList"`
CounterStatus200ShowReduceApplyList int `json:"counter.status.200.showReduceApplyList"`
CounterStatus200QueryDisperseParam int `json:"counter.status.200.queryDisperseParam"`
CounterStatus200AgingList int `json:"counter.status.200.agingList"`
CounterStatus200UserInfo int `json:"counter.status.200.userInfo"`
GaugeResponseQueryUserPage int `json:"gauge.response.queryUserPage"`
GaugeResponseDisperseCase int `json:"gauge.response.disperseCase"`
CounterStatus200SelectList int `json:"counter.status.200.selectList"`
GaugeResponsePackageStatus int `json:"gauge.response.packageStatus"`
CounterStatus200AddUserPhoneRemark int `json:"counter.status.200.addUserPhoneRemark"`
GaugeResponseShowReport int `json:"gauge.response.showReport"`
GaugeResponseQueryCasePage int `json:"gauge.response.queryCasePage"`
CounterStatus200QueryUserPhoneRemarkList int `json:"counter.status.200.queryUserPhoneRemarkList"`
CounterStatus200TechHealthCheck int `json:"counter.status.200.tech.health.check"`
GaugeResponseLogin int `json:"gauge.response.login"`
CounterStatus200RoleList int `json:"counter.status.200.roleList"`
GaugeResponseLogout int `json:"gauge.response.logout"`
GaugeResponseAgingList int `json:"gauge.response.agingList"`
GaugeResponseRoleList int `json:"gauge.response.roleList"`
GaugeResponseViewCaseDetail int `json:"gauge.response.viewCaseDetail"`
GaugeResponseMenuTree int `json:"gauge.response.menuTree"`
CounterStatus200QueryUserPage int `json:"counter.status.200.queryUserPage"`
GaugeResponseFindCreditReport int `json:"gauge.response.findCreditReport"`
GaugeResponseHaveUnCheckedApply int `json:"gauge.response.haveUnCheckedApply"`
GaugeResponseQueryDisperseParam int `json:"gauge.response.queryDisperseParam"`
GaugeResponseQueryCasePackPage int `json:"gauge.response.queryCasePackPage"`
CounterStatus200FundingList int `json:"counter.status.200.fundingList"`
CounterStatus200PackageStatus int `json:"counter.status.200.packageStatus"`
GaugeResponseRemoveCallRecord int `json:"gauge.response.removeCallRecord"`
CounterStatus200QueryOrganizationList int `json:"counter.status.200.queryOrganizationList"`
HttpsessionsMax int `json:"httpsessions.max"`
HttpsessionsActive int `json:"httpsessions.active"`
}
\ No newline at end of file
package service
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"time"
)
var (
//agentBrokers = []string{"192.168.4.100:15091", "192.168.4.100:15092", "192.168.4.100:15093"}
agentBrokers = []string{"172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"}
agentKafkaTopic = "quantGroup.tech.enoch.pro"
agentKafkaGroup = "quantGroup-enoch-agent"
)
type enochAgentConsumerGroupHandler struct{}
func (enochAgentConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (enochAgentConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h enochAgentConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
AgentMsgProcess(string(msg.Value))
sess.MarkMessage(msg, "")
}
return nil
}
func AgentConsumer() {
config := sarama.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Retry.Backoff = 100 * time.Second
config.Consumer.Return.Errors = false
client, err := sarama.NewClient(agentBrokers, config)
if err != nil {
fmt.Printf("kafka encoh.dev消费者获取失败")
return
}
defer func() { _ = client.Close() }()
consumerGroup, err := sarama.NewConsumerGroupFromClient(agentKafkaGroup, client)
if err != nil {
fmt.Println("kafka 创建分区消费者失败")
return
}
defer func() { _ = consumerGroup.Close() }()
// Track errors
go func() {
for err := range consumerGroup.Errors() {
fmt.Println("消费出错了", err)
}
}()
ctx := context.Background()
handler := enochAgentConsumerGroupHandler{}
for {
err = consumerGroup.Consume(ctx, []string{agentKafkaTopic}, handler)
if err != nil {
fmt.Println(err)
}
}
}
...@@ -55,7 +55,6 @@ func Consumer() { ...@@ -55,7 +55,6 @@ func Consumer() {
}() }()
ctx := context.Background() ctx := context.Background()
handler := exampleConsumerGroupHandler{} handler := exampleConsumerGroupHandler{}
for { for {
err = consumerGroup.Consume(ctx, []string{kafkaTopic}, handler) err = consumerGroup.Consume(ctx, []string{kafkaTopic}, handler)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment