Commit ae0a0612 authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

# 添加报警服务

parent 1b6f4710
...@@ -3,13 +3,12 @@ package main ...@@ -3,13 +3,12 @@ package main
import ( import (
"flag" "flag"
"git.quantgroup.cn/DevOps/enoch/service" "git.quantgroup.cn/DevOps/enoch/service"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/log"
"git.quantgroup.cn/DevOps/enoch/service/job"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os"
"strconv" "strconv"
) )
...@@ -25,27 +24,23 @@ func main() { ...@@ -25,27 +24,23 @@ func main() {
flag.Parse() flag.Parse()
conf.Load(denv, didc) conf.Load(denv, didc)
file, err := os.OpenFile("quantgroup.log", os.O_RDWR|os.O_CREATE, 0666) logger.Info.Println(conf.GlobalConfig.AppName + "项目启动, port:" + port + ",环境:" + conf.GlobalConfig.Env)
defer func() { _ = file.Close() }() defer logger.Info.Println("项目结束")
if err != nil {
log.Fatalln("create file error", err)
}
logger := log.New(file, "[Info]", log.LstdFlags|log.Llongfile)
logger.Println(conf.GlobalConfig.AppName + "项目启动, port:" + port + ",环境:" + conf.GlobalConfig.Env)
defer logger.Println("项目结束")
//初始化redis连接池 //初始化redis连接池
data.RedisPoolInit() //data.RedisPoolInit()
//初始化alarm
alarm.Load()
//初始化kafka 消费者
go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{}) go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{})
go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{}) //go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port) intPort, _ := strconv.Atoi(port)
if quartz { if quartz {
log.Println("启动定时任务") log.Println("启动定时任务")
job.AutoEmailPerformInfo() //job.AutoEmailPerformInfo()
} }
go func() { go func() {
...@@ -55,9 +50,9 @@ func main() { ...@@ -55,9 +50,9 @@ func main() {
http.HandleFunc("/duration", service.DurationInterface) http.HandleFunc("/duration", service.DurationInterface)
http.HandleFunc("/counter", service.CounterInterface) http.HandleFunc("/counter", service.CounterInterface)
err = http.ListenAndServe(":"+port, nil) err := http.ListenAndServe(":"+port, nil)
if err != nil { if err != nil {
log.Fatalln("服务启动失败", err) logger.Error.Fatalln("服务启动失败", err)
} }
} }
...@@ -87,6 +87,7 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi ...@@ -87,6 +87,7 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi
fields["uptime"] = metrics.Uptime fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage fields["system_load_average"] = metrics.SystemloadAverage
fields["processors"] = metrics.Processors
if len(metricsPointSlice) >= submitLimit { if len(metricsPointSlice) >= submitLimit {
go batchWrite(metricsPointSlice) go batchWrite(metricsPointSlice)
......
package alarm
import (
"git.quantgroup.cn/DevOps/enoch/service/util"
"net/http"
"strings"
)
func Sender(target []string, triggerStrategy string) {
}
/**
加载发送策略
*/
func LoadSendStrategy() {
}
func senderMail(info string, receiver string) {
util.SendEmail("接口耗时情况", info, receiver)
}
func senderDingDing(info string, receiver string) {
http.Post(receiver, "application/json", )
}
/**
判断是否符合报警的时间间隔
*/
func isExpired() {
}
/**
持久化报警
*/
func persistence() {
}
package alarm
const (
MONITOR = "monitor"
SYSNAME_SQL = "show tag values from machine_info with key = sys_name"
HOST_SQL = "show tag values from machine_info with key= host where sys_name = "
)
package alarm
import (
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func Query(sql string, db string) []client.Result {
con := data.NewClient()
q := client.Query{Command: sql, Database: db}
res, err := con.Query(q)
if nil != err {
logger.Error.Println("influxdb client init error", err)
}
if nil != res.Error() {
logger.Error.Println("query error", db, sql, res.Error())
}
return res.Results
}
func QueryMonitor(sql string) []client.Result {
return Query(sql, MONITOR)
}
func QuerySysName() []string {
res := QueryMonitor(SYSNAME_SQL)
var values = res[0].Series[0].Values
sysName := make([]string, 0)
for _, v := range values {
sysName = append(sysName, v[1].(string))
}
return sysName
}
func QueryHost(sysName []string) map[string][]string {
var sqlBuff bytes.Buffer
for _, name := range sysName {
sqlBuff.WriteString(HOST_SQL)
sqlBuff.WriteString("'")
sqlBuff.WriteString(name)
sqlBuff.WriteString("';")
}
res := QueryMonitor(sqlBuff.String())
nodeTree := make(map[string][]string)
for index, result := range res {
ipList := make([]string, 0)
values := result.Series[0].Values
for _, v := range values {
ipList = append(ipList, v[1].(string))
}
nodeTree[sysName[index]] = ipList
}
return nodeTree
}
package alarm
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil"
)
func Load() {
data,err := ioutil.ReadFile("service/conf/dev/tsconfig.json")
if err != nil {
logger.Error.Fatal("未找打配置文件")
}
strategies := make([]Strategy, 0)
err = json.Unmarshal(data, &strategies)
if err != nil {
logger.Error.Fatal("策略文件格式错误:", err)
}
if !CheckArray(strategies) {
logger.Error.Fatal("策略文件未通过校验")
}
sql := BuildSql(strategies)
logger.Info.Println(sql)
result := QueryMonitor(sql)
DealResult(result, strategies)
}
package alarm
type NodeTree struct {
AppName string
}
package alarm
import (
"git.quantgroup.cn/DevOps/enoch/service/log"
"strconv"
)
type Operator interface {
Less(alter string, real string)
Grater(alter string, real string)
Equal(alter string, real string)
Between(floor string, ceil string, real string)
ComparedWithSame(old string, current string, alter string)
}
type Compare struct {
}
/**
real < alter
*/
func (Compare) Less(alter string, real string) bool {
return real < alter
}
/**
real > alter
*/
func (Compare) Greater(alter string, real string) bool {
return real > alter
}
/**
real = alter
*/
func (Compare) Equal(alter string, real string) bool {
return real == alter
}
func (Compare) Between(floor string, ceil string, real string) bool {
return floor < real && ceil > real
}
/**
同比超过alter
*/
func (Compare) ComparedWithSame(alter string, old string, current string) bool {
cf := parseToFloat(current)
of := parseToFloat(old)
return (cf - of)/of > parseToFloat(alter)
}
func parseToFloat(value string) float64 {
rs, err := strconv.ParseFloat(value, 64)
if nil != err {
logger.Error.Println(err)
}
return rs
}
package alarm
import (
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"reflect"
"strings"
)
func BuildSql(strategies []Strategy) string {
var sqlBuff strings.Builder
for _, strategy := range strategies {
sqlBuff.WriteString(strategy.Sql)
}
return sqlBuff.String()
}
var operators = reflect.ValueOf(Compare{})
func needIgnore(ignoreTag map[string]bool, tags []string) bool {
for _, k := range tags {
if ignoreTag[k] {
return true
}
}
return false
}
func getAlterValue(personalAlterValue map[string][]string, uniqueTag string, tags []string, alterValue []string) []string {
value := personalAlterValue[uniqueTag]
if nil != value {
return value
}
tagLen := len(tags)
for i := 1; i < tagLen; i++ {
if nil != personalAlterValue[tags[i]] {
return personalAlterValue[tags[i]]
}
}
return alterValue
}
func getTagValues(keys []string, tag map[string]string) []string {
values := make([]string, len(keys))
for i, k := range keys {
values[i] = tag[k]
}
return values
}
func DealResult(res []client.Result, strategies []Strategy) {
var resIndex = 0
for _, strategy := range strategies {
logger.Info.Println("-------", strategy.Name, resIndex, "---------")
operator := strategy.Operator
method := operators.MethodByName(operator)
alterValue := strategy.AlterValue
alterValueLen := len(alterValue)
ignoreTag := strategy.IgnoreTag
tags := strategy.Tag
personalAlterValue := strategy.PersonalAlterValue
if strategy.SqlLen == 1 { //单sql
result := res[resIndex]
for _, series := range result.Series {
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
if needIgnore(ignoreTag, tags) {
continue
}
value := series.Values[0][1]
if nil == value { //todo 空值报警
logger.Warning.Println(strategy.Name, ":", uniqueTag, ":", "空值")
}
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
params := make([]reflect.Value, alterValueLen+1)
for j, arg := range currentAlterValue {
params[j] = reflect.ValueOf(arg)
}
params[alterValueLen] = reflect.ValueOf(reflect.ValueOf(value).String())
logger.Info.Println(uniqueTag, params)
rs := method.Call(params)
logger.Info.Println(uniqueTag, ": real:", value, ": check", rs[0])
}
} else {
params := make(map[string][]reflect.Value)
for i := 0; i < strategy.SqlLen; i++ {
resIndex += i
for _, series := range res[resIndex].Series {
if needIgnore(ignoreTag, tags) {
continue
}
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
value := series.Values[0][1]
if nil == value { //todo 空值报警
params[uniqueTag] = nil
logger.Warning.Println(strategy.Name, ":", uniqueTag, ":", "空值")
}
if nil == params[uniqueTag] {
currentParams := make([]reflect.Value, strategy.SqlLen+alterValueLen)
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
for k, arg := range currentAlterValue {
currentParams[k] = reflect.ValueOf(arg)
}
params[uniqueTag] = currentParams
}
params[uniqueTag][alterValueLen+i] = reflect.ValueOf(reflect.ValueOf(value).String())
}
}
for k, v := range params {
if nil == v {
continue
}
logger.Info.Println(k, v)
rs := method.Call(v)
//todo 结果报警
logger.Info.Println(k, ": real:", v, ": check", rs[0])
}
}
//循环结果集
resIndex += 1
}
logger.Info.Fatal("结束")
}
func CheckArray(strategies []Strategy) bool {
for _, strategy := range strategies {
if !Check(strategy) {
return false
}
}
return true
}
func Check(strategy Strategy) bool {
if "" == strategy.Name {
return false
}
if "" == strategy.Name {
return false
}
if strategy.SqlLen != strings.Count(strategy.Sql, ";") {
return false
}
if len(strategy.Tag) < 1 {
return false
}
length := len(strategy.AlterValue)
for _, v := range strategy.PersonalAlterValue {
if len(v) != length {
return false
}
}
return true
}
package alarm
type StrategyType int8
const (
SERVICE StrategyType = 1 //监控服务的策略
MACHINE StrategyType = 2 //监控机器的策略
COMMON StrategyType = 3 //通用策略
)
type Strategy struct {
Name string `json:"name"`
Sql string `json:"sql"`
SqlLen int `json:"sql_len"`
Type StrategyType `json:"type"`
Operator string `json:"operator"`
AlterValue []string `json:"alter_value"`
Tag []string `json:"tag"`
IgnoreTag map[string]bool `json:"ignore_tag"`
PersonalAlterValue map[string][]string `json:"personal_alter_value"`
}
[
{
"name": "apdex",
"sql": "select (mean(sat)+mean(tol))/mean(ct_all) as cu from apdex where time > now() - 3m group by sys_name fill(0);",
"sql_len": 1,
"type": 2,
"operator": "Less",
"alter_value": [
"0.8"
],
"tag": ["sys_name"],
"ignore_tag": {
"msg": true
},
"personal_alter_value": {
"feature": [
"0.7"
],
"xyqb": [
"1"
]
}
},
{
"name": "访问量同比",
"sql": "select count(traceId) from trace_info where time > now() - 1d5m and time < now() - 1d group by sys_name fill(0);select count(traceId) from trace_info where time > now() - 5m group by sys_name fill(1);",
"sql_len": 2,
"type": 2,
"operator": "ComparedWithSame",
"alter_value": [
"0.1"
],
"tag": ["sys_name"],
"ignore_tag": {
"msg": true
},
"personal_alter_value": {
"feature": [
"0.7"
],
"xyqb": [
"1"
]
}
},
{
"name": "cpu负载",
"sql": "select mean(system_load_average)/mean(processors) from machine_info where time > now() - 1m group by sys_name, host fill(1);",
"sql_len": 1,
"type": 1,
"operator": "Greater",
"alter_value": [
"0.9"
],
"tag": ["host", "sys_name"],
"personal_alter_value": {
"feature": [
"0.7"
],
"xyqb": [
"1"
]
}
},
{
"name": "内存负载",
"sql": "select mean(mem_free)/mean(mem_tol) from machine_info where time > now() - 2m group by sys_name,host fill(0);",
"sql_len": 1,
"type": 1,
"operator": "Less",
"alter_value": [
"0.2"
],
"tag": ["host", "sys_name"],
"personal_alter_value": {
"feature": [
"0.7"
],
"xyqb": [
"1"
]
}
},
{
"name": "硬盘负载",
"sql": "select last(disk_free)/last(disk_tol) from machine_info where time > now() - 2m group by sys_name,host fill(0);",
"sql_len": 1,
"type": 1,
"operator": "Less",
"alter_value": [
"0.2"
],
"tag": ["host", "sys_name"],
"personal_alter_value": {
"feature": [
"0.7"
],
"xyqb": [
"1"
]
}
}
]
...@@ -2,7 +2,7 @@ package conf ...@@ -2,7 +2,7 @@ package conf
import ( import (
"encoding/json" "encoding/json"
"fmt" "git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
) )
...@@ -16,7 +16,8 @@ func Load(denv string, didc string) { ...@@ -16,7 +16,8 @@ func Load(denv string, didc string) {
bytes, err := ioutil.ReadAll(resp.Body) bytes, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(bytes, &GlobalConfig) err = json.Unmarshal(bytes, &GlobalConfig)
GlobalConfig.Env = denv GlobalConfig.Env = denv
GlobalConfig.InfluxDb.Host = "http://172.20.6.33"
if nil != err { if nil != err {
fmt.Println(err) logger.Error.Println(err)
} }
} }
...@@ -2,8 +2,8 @@ package data ...@@ -2,8 +2,8 @@ package data
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"log"
) )
func NewClient() client.Client { func NewClient() client.Client {
...@@ -13,7 +13,7 @@ func NewClient() client.Client { ...@@ -13,7 +13,7 @@ func NewClient() client.Client {
con, err := client.NewHTTPClient(httpConfig) con, err := client.NewHTTPClient(httpConfig)
if err != nil { if err != nil {
log.Fatal(err) logger.Error.Println(err.Error())
} }
return con return con
......
...@@ -2,9 +2,9 @@ package service ...@@ -2,9 +2,9 @@ package service
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster" "github.com/bsm/sarama-cluster"
"log"
"os" "os"
"os/signal" "os/signal"
) )
...@@ -29,14 +29,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -29,14 +29,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
// consume errors // consume errors
go func() { go func() {
for err := range consumer.Errors() { for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error()) logger.Error.Println("consume error:", err.Error())
} }
}() }()
// consume notifications // consume notifications
go func() { go func() {
for ntf := range consumer.Notifications() { for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf) logger.Info.Printf("Rebalanced: %+v\n", ntf)
} }
}() }()
......
package logger
import (
"io"
"log"
"os"
)
var (
Info *log.Logger
Warning *log.Logger
Error * log.Logger
)
func init(){
file, err := os.OpenFile("quantgroup.log", os.O_RDWR|os.O_CREATE, 0666)
if err!=nil{
log.Fatalln("打开日志文件失败:",err)
}
Info = log.New(io.MultiWriter(os.Stderr,file),"Info:",log.Ldate | log.Ltime | log.Lshortfile)
Warning = log.New(io.MultiWriter(os.Stderr,file),"Warning:",log.Ldate | log.Ltime | log.Lshortfile)
Error = log.New(io.MultiWriter(os.Stderr,file),"Error:",log.Ldate | log.Ltime | log.Lshortfile)
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"log" "log"
...@@ -95,9 +96,9 @@ func batchWrite(pointArray []*client.Point) { ...@@ -95,9 +96,9 @@ func batchWrite(pointArray []*client.Point) {
points.AddPoints(pointArray) points.AddPoints(pointArray)
err = c.Write(points) err = c.Write(points)
fmt.Println("写入数据", len(pointArray)) logger.Info.Println("写入数据", len(pointArray))
if err != nil { if err != nil {
log.Fatal(err) logger.Error.Fatal(err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment