Commit 561ac413 authored by fengjunkai's avatar fengjunkai

Merge branch 'fileCache'

parents 53f8b9ed 893ad7e8
...@@ -18,6 +18,7 @@ require ( ...@@ -18,6 +18,7 @@ require (
github.com/onsi/gomega v1.4.3 // indirect github.com/onsi/gomega v1.4.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
) )
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"flag" "flag"
"git.quantgroup.cn/DevOps/enoch/service" "git.quantgroup.cn/DevOps/enoch/service"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/consumer"
"git.quantgroup.cn/DevOps/enoch/service/continuous_queries" "git.quantgroup.cn/DevOps/enoch/service/continuous_queries"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache" "git.quantgroup.cn/DevOps/enoch/service/file_cache"
...@@ -31,7 +32,7 @@ func main() { ...@@ -31,7 +32,7 @@ func main() {
log.Fatalln("create file error", err) log.Fatalln("create file error", err)
} }
file_cache.Load(conf.GlobalConfig.FileCachePath) file_cache.Load(conf.GlobalConfig.FileCachePath)
file_cache.RegisterJob(service.ReSubmit) file_cache.RegisterJob(consumer.ReSubmit)
go file_cache.Delete() go file_cache.Delete()
port := conf.GlobalConfig.Port port := conf.GlobalConfig.Port
...@@ -42,8 +43,8 @@ func main() { ...@@ -42,8 +43,8 @@ func main() {
//初始化redis连接池 //初始化redis连接池
data.RedisPoolInit() data.RedisPoolInit()
go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{}) go consumer.AgentClusterConsumer(conf.HealthTopic(), consumer.HealthMessageHandler{})
go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{}) go consumer.AgentClusterConsumer(conf.BraveTopic(), consumer.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port) intPort, _ := strconv.Atoi(port)
if quartz { if quartz {
......
...@@ -2,6 +2,7 @@ package service ...@@ -2,6 +2,7 @@ package service
import ( import (
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/consumer"
"git.quantgroup.cn/DevOps/enoch/service/util" "git.quantgroup.cn/DevOps/enoch/service/util"
"net/http" "net/http"
"strconv" "strconv"
...@@ -16,7 +17,7 @@ func DurationInterface(w http.ResponseWriter, r *http.Request) { ...@@ -16,7 +17,7 @@ func DurationInterface(w http.ResponseWriter, r *http.Request) {
func DurationCalcAndSendEmail(day string) { func DurationCalcAndSendEmail(day string) {
info := "" info := ""
Duration(day, func(sysName string, durations map[string]string) { consumer.Duration(day, func(sysName string, durations map[string]string) {
info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n" info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n"
for k, v := range durations { for k, v := range durations {
i, err := strconv.Atoi(v) i, err := strconv.Atoi(v)
...@@ -38,7 +39,7 @@ func CounterInterface(w http.ResponseWriter, r *http.Request) { ...@@ -38,7 +39,7 @@ func CounterInterface(w http.ResponseWriter, r *http.Request) {
func CounterCalcAndSendEmail(day string) { func CounterCalcAndSendEmail(day string) {
info := "" info := ""
Counter(day, func(sysName string, durations map[string]string) { consumer.Counter(day, func(sysName string, durations map[string]string) {
info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n" info = info + "\n" + "系统名称 : " + strings.Split(sysName, ":")[1] + "\n"
for k, v := range durations { for k, v := range durations {
info = info + k + " , 次数:" + v + "\n" info = info + k + " , 次数:" + v + "\n"
......
...@@ -49,6 +49,19 @@ func (Compare) Equal(alter string, real []string) bool { ...@@ -49,6 +49,19 @@ func (Compare) Equal(alter string, real []string) bool {
return rs return rs
} }
//限制同比
func (Compare) LimitComparedWithSame(alter string, old []string, current []string) bool {
logger.Info.Println("old:", strings.Join(old, ","), "new: ", strings.Join(current, ","))
rs := true
lastIndex := len(current) - 1
for i, r := range current {
if i != 0 || i != lastIndex {
rs = rs && limitCompareSame(alter, old[i], r)
}
}
return rs
}
/** /**
同比超过alter 同比超过alter
*/ */
...@@ -65,6 +78,15 @@ func (Compare) ComparedWithSame(alter string, old []string, current []string) bo ...@@ -65,6 +78,15 @@ func (Compare) ComparedWithSame(alter string, old []string, current []string) bo
} }
func limitCompareSame(alter string, old string, current string) bool {
cf := parseToFloat(current)
of := parseToFloat(old)
if cf < 200 && of < 200 {
return false
}
return (cf-of)/of > parseToFloat(alter)
}
func compareSame(alter string, old string, current string) bool { func compareSame(alter string, old string, current string) bool {
cf := parseToFloat(current) cf := parseToFloat(current)
of := parseToFloat(old) of := parseToFloat(old)
...@@ -106,3 +128,9 @@ func (MsgBuilder) Equal(alter string) string { ...@@ -106,3 +128,9 @@ func (MsgBuilder) Equal(alter string) string {
func (MsgBuilder) ComparedWithSame(alter string) string { func (MsgBuilder) ComparedWithSame(alter string) string {
return "同比超过" + alter return "同比超过" + alter
} }
//限制同比
func (MsgBuilder) LimitComparedWithSame(alter string) string {
return "同比超过" + alter
}
\ No newline at end of file
package service package consumer
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache" "git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
...@@ -21,20 +21,23 @@ var httpMethod = map[string]string{ ...@@ -21,20 +21,23 @@ var httpMethod = map[string]string{
func (BraveMessageHandler) MsgProcess(msg string) { func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg := make([]TraceMsg, 3) //[]TraceMsg{} traceMsg := make([]end_points.TraceMsg, 3) //[]TraceMsg{}
err := json.Unmarshal([]byte(msg), &traceMsg) err := json.Unmarshal([]byte(msg), &traceMsg)
if err != nil { if err != nil {
fmt.Println(err) logger.Error.Println("brave 解析msg失败:", err)
} }
//msgRedisProcess(traceMsg)
msgInfluxProcess(traceMsg) msgInfluxProcess(traceMsg)
} }
func (BraveMessageHandler) Destroy() {
if len(pointSlice) > 0 {
logger.Info.Println("braveMessageHandler 提交本地缓存数据:", len(pointSlice))
batchWrite(pointSlice)
}
}
var batchSize = 5000
var pointSlice = make([]*client.Point, 0, batchSize) var pointSlice = make([]*client.Point, 0, batchSize)
func msgInfluxProcess(traceMsgs []TraceMsg) { func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
for _, traceMsg := range traceMsgs { for _, traceMsg := range traceMsgs {
...@@ -83,84 +86,6 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -83,84 +86,6 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
} }
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
func Duration(day string, fun func(sysName string, durations map[string]string)) { func Duration(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get() conn := data.Pool.Get()
...@@ -200,5 +125,4 @@ func Counter(day string, fun func(sysName string, durations map[string]string)) ...@@ -200,5 +125,4 @@ func Counter(day string, fun func(sysName string, durations map[string]string))
} }
fun(string(redisKey.([]uint8)), reply2) fun(string(redisKey.([]uint8)), reply2)
} }
} }
package consumer
var batchSize = 5000
package service package consumer
import ( import (
"encoding/json" "encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/end_points" "git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"math/big" "math/big"
"net" "net"
...@@ -21,21 +21,44 @@ func (HealthMessageHandler) MsgProcess(msg string) { ...@@ -21,21 +21,44 @@ func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{} chunkMsg := end_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg) err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil { if err != nil {
fmt.Println(err) logger.Error.Println("healthMessageHandler解析json失败:", err)
logger.Error.Println(msg)
} }
buildMsg(chunkMsg) buildMsg(chunkMsg)
} }
func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, db map[string]end_points.DBDetail) { func (HealthMessageHandler) Destroy() {
if len(metricsPointSlice) > 0 {
logger.Info.Println("metricsMessageHandler 提交本地缓存数据:", len(metricsPointSlice))
batchWrite(metricsPointSlice)
}
if len(healthPointSlice) > 0 {
logger.Info.Println("HealthMessageHandler 提交本地缓存数据:", len(healthPointSlice))
batchWrite(healthPointSlice)
}
}
func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, db *[]byte) {
tags := make(map[string]string, ) tags := make(map[string]string, )
tags["sys_name"] = appName tags["sys_name"] = appName
tags["host"] = ip tags["host"] = ip
fields := make(map[string]interface{}) fields := make(map[string]interface{})
dbInfo := end_points.DBInfo{}
for k, v := range db { err := json.Unmarshal(*db, &dbInfo)
if err != nil {
dbDetails := end_points.DBDetail{}
err = json.Unmarshal(*db, &dbDetails)
if err == nil {
fields[dbDetails.Details.Database] = isOK(dbDetails.Status.Code)
}
}else {
for k, v := range dbInfo.Details {
var fieldName = v.Details.Database + "—" + k var fieldName = v.Details.Database + "—" + k
fields[fieldName] = isOK(v.Status.Code) fields[fieldName] = isOK(v.Status.Code)
} }
}
if len(healthPointSlice) >= submitLimit { if len(healthPointSlice) >= submitLimit {
go batchWrite(healthPointSlice) go batchWrite(healthPointSlice)
healthPointSlice = make([]*client.Point, 0, batchSize) healthPointSlice = make([]*client.Point, 0, batchSize)
...@@ -50,10 +73,12 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi ...@@ -50,10 +73,12 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi
tags["sys_name"] = appName tags["sys_name"] = appName
tags["host"] = ip tags["host"] = ip
var status = health.Status status := health.Status
fields["sever_status"] = isOK(status.Code) fields["sever_status"] = isOK(status.Code)
var diskSpace = health.Details.DiskSpace.Details redis := health.Details.Redis
fields["redis_status"] = isOK(redis.Status.Code)
diskSpace := health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free fields["disk_free"] = diskSpace.Free
...@@ -111,7 +136,8 @@ func buildMsg(chunkMsg end_points.ChunkMsg) { ...@@ -111,7 +136,8 @@ func buildMsg(chunkMsg end_points.ChunkMsg) {
buildMetricsInfluxMsg(appName, ip, unix, sysNameCount, p.Health, p.Metrics) buildMetricsInfluxMsg(appName, ip, unix, sysNameCount, p.Health, p.Metrics)
//health_info //health_info
buildHealthInfluxMsg(appName, ip, unix, sysNameCount, p.Health.Details.Db.Details) dbByte, _ := json.Marshal(p.Health.Details.Db)
buildHealthInfluxMsg(appName, ip, unix, sysNameCount, &dbByte)
} }
} }
......
package service package consumer
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
...@@ -7,12 +7,14 @@ import ( ...@@ -7,12 +7,14 @@ import (
"github.com/bsm/sarama-cluster" "github.com/bsm/sarama-cluster"
"os" "os"
"os/signal" "os/signal"
"sync/atomic"
"syscall"
) )
var consumerCount int32
func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) { func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
config := cluster.NewConfig() config := cluster.NewConfig()
config.Consumer.Return.Errors = true config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true config.Group.Return.Notifications = true
...@@ -21,10 +23,20 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -21,10 +23,20 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer func() { _ = consumer.Close() }() atomic.AddInt32(&consumerCount, 1)
// trap SIGINT to trigger a shutdown. // trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
defer func() {
_ = consumer.Close()
messageHandle.Destroy()
atomic.AddInt32(&consumerCount, -1)
logger.Info.Println("consumer结束")
if consumerCount == 0 {
os.Exit(0)
}
}()
// consume errors // consume errors
go func() { go func() {
...@@ -52,5 +64,4 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -52,5 +64,4 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
return return
} }
} }
} }
package service package consumer
type MessageHandler interface { type MessageHandler interface {
MsgProcess(msg string) MsgProcess(msg string)
Destroy()
} }
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
...@@ -19,7 +19,8 @@ type Health struct { ...@@ -19,7 +19,8 @@ type Health struct {
type Detail struct { type Detail struct {
DiskSpace DiskInfo `json:"diskSpace"` DiskSpace DiskInfo `json:"diskSpace"`
Redis RedisInfo `json:"redis"` Redis RedisInfo `json:"redis"`
Db DBInfo `json:"db"` Db interface{} `json:"db"`
} }
type DBInfo struct { type DBInfo struct {
......
package service package end_points
type TraceMsg struct { type TraceMsg struct {
TraceId string `json:"traceId"` TraceId string `json:"traceId"`
......
package file_cache package file_cache
import ( import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"net/http"
"strings"
"sync" "sync"
"time" "time"
) )
...@@ -22,6 +25,7 @@ func (s *switcher) turnOn() { ...@@ -22,6 +25,7 @@ func (s *switcher) turnOn() {
s.state = true s.state = true
s.origin = time.Now().Unix() s.origin = time.Now().Unix()
create() create()
senderDingDing()
} }
} }
...@@ -48,9 +52,16 @@ func (s *switcher) status() bool { ...@@ -48,9 +52,16 @@ func (s *switcher) status() bool {
} }
var cacheSwitcher *switcher var cacheSwitcher *switcher
var alterMsg string
const (
url = "https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"
contentType = "application/json;charset=utf-8"
)
func init() { func init() {
cacheSwitcher = &switcher{} cacheSwitcher = &switcher{}
alterMsg = buildDingDingMsg()
} }
func Enabled() bool { func Enabled() bool {
...@@ -60,3 +71,33 @@ func Enabled() bool { ...@@ -60,3 +71,33 @@ func Enabled() bool {
func OpenCache() { func OpenCache() {
cacheSwitcher.turnOn() cacheSwitcher.turnOn()
} }
func senderDingDing() {
_, err := http.Post(url, contentType, strings.NewReader(alterMsg))
if err != nil {
logger.Error.Println(err)
}
}
func buildDingDingMsg() string {
msg := dingDingMsg{
MsgType: "text",
Text: text{
Content: "influxdb 写超时,已启用文件缓存",
},
}
msgStr, err := json.Marshal(msg)
if nil != err {
logger.Error.Println("无法序列化ding ding msg", err)
}
return string(msgStr)
}
type dingDingMsg struct {
MsgType string
Text text
}
type text struct {
Content string
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment