Commit 7d96165c authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

Merge branch 'alter' into 'master'

Alter



See merge request !1
parents 0f6fc03b 35c9f6c5
...@@ -17,6 +17,7 @@ require ( ...@@ -17,6 +17,7 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/timest/env v0.0.0-20180717050204-5fce78d35255 github.com/timest/env v0.0.0-20180717050204-5fce78d35255
golang.org/x/text v0.3.0
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
) )
...@@ -48,6 +48,8 @@ func main() { ...@@ -48,6 +48,8 @@ func main() {
job.AutoEmailPerformInfo() job.AutoEmailPerformInfo()
} }
job.AutoAlarm()
go func() { go func() {
http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil) http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil)
}() }()
......
...@@ -87,6 +87,7 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi ...@@ -87,6 +87,7 @@ func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submi
fields["uptime"] = metrics.Uptime fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage fields["system_load_average"] = metrics.SystemloadAverage
fields["processors"] = metrics.Processors
if len(metricsPointSlice) >= submitLimit { if len(metricsPointSlice) >= submitLimit {
go batchWrite(metricsPointSlice) go batchWrite(metricsPointSlice)
......
package alarm
const (
MONITOR = "monitor"
SYSNAME_SQL = "show tag values from machine_info with key = sys_name"
HOST_SQL = "show tag values from machine_info with key= host where sys_name = "
)
package alarm
import (
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func Query(sql string, db string) []client.Result {
con := data.NewClient()
q := client.Query{Command: sql, Database: db}
res, err := con.Query(q)
if nil != err {
logger.Error.Println("influxdb client init error", err)
}
if nil != res.Error() {
logger.Error.Println("query error", db, sql, res.Error())
}
return res.Results
}
func QueryMonitor(sql string) []client.Result {
return Query(sql, MONITOR)
}
func QuerySysName() []string {
res := QueryMonitor(SYSNAME_SQL)
var values = res[0].Series[0].Values
sysName := make([]string, 0)
for _, v := range values {
sysName = append(sysName, v[1].(string))
}
return sysName
}
func QueryHost(sysName []string) map[string][]string {
var sqlBuff bytes.Buffer
for _, name := range sysName {
sqlBuff.WriteString(HOST_SQL)
sqlBuff.WriteString("'")
sqlBuff.WriteString(name)
sqlBuff.WriteString("';")
}
res := QueryMonitor(sqlBuff.String())
nodeTree := make(map[string][]string)
for index, result := range res {
ipList := make([]string, 0)
values := result.Series[0].Values
for _, v := range values {
ipList = append(ipList, v[1].(string))
}
nodeTree[sysName[index]] = ipList
}
return nodeTree
}
package alarm
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil"
)
func Load() {
data,err := ioutil.ReadFile(conf.GlobalConfig.StrategyConfPath)
if err != nil {
logger.Error.Fatal("未找到配置文件")
}
strategies := make([]Strategy, 0)
err = json.Unmarshal(data, &strategies)
if err != nil {
logger.Error.Fatal("策略文件格式错误:", err)
}
if !CheckArray(strategies) {
logger.Error.Fatal("策略文件未通过校验")
}
sql := BuildSql(strategies)
result := QueryMonitor(sql)
DealResult(result, strategies)
}
package alarm
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log"
"git.quantgroup.cn/DevOps/enoch/service/util"
"net/http"
"strings"
"sync"
"time"
)
var SenderWatcher sync.Map
func Sender(target []string, title string, info string, notice Notice) {
uniqueKey := buildUniqueKey(target)
if !isExpired(uniqueKey, notice.Interval) { //小于报警周期
return
}
info = uniqueKey + info
switch notice.Sender {
case DING_DING:
senderDingDing(title, info, notice.Receiver)
break
case MAIL:
senderMail(title, info, notice.Receiver)
default:
logger.Error.Println("策略配置错误,未匹配到对应的Sender")
}
}
func senderMail(title string, info string, receiver []string) {
for _, r := range receiver {
util.SendEmail(title, info, r)
}
}
func senderDingDing(title string, info string, receiver [] string) {
bodyStr := buildDingDingMsg(title, info)
for _, r := range receiver {
data := strings.NewReader(string(bodyStr))
_, err := http.Post(r, "application/json;charset=utf-8", data)
if err != nil {
logger.Error.Println(err)
}
}
}
func buildDingDingMsg(title string, info string) []byte {
msg := DinDingMsg{
MsgType: "link",
Link: Link{
Title: title,
Text: info,
MessageUrl: "http://172.20.6.33:3000/d/y1Ju2slik/apdexlist?refresh=1m&orgId=1",
},
}
msgStr, err := json.Marshal(msg)
if nil != err {
logger.Error.Println("无法序列化ding ding msg", err)
}
return msgStr
}
/**
判断是否符合报警的时间间隔
*/
func isExpired(key string, interval int64) bool {
now := time.Now().Unix()
lastTime, hasValue := SenderWatcher.LoadOrStore(key, now)
logger.Info.Println("---------时间间隔:old:", lastTime, ",new:", now)
if hasValue { // 存在旧值,判断是否过期
if now-lastTime.(int64) >= interval { //过期
SenderWatcher.Store(key, now)
return true
}
return false
}
return true
}
func buildUniqueKey(target []string) string {
length := len(target)
str := strings.Builder{}
for i := length - 1; i >= 0; i-- {
str.WriteString(target[i])
str.WriteString(":")
}
return str.String()
}
/**
持久化报警
*/
func persistence() {
}
package alarm
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"strconv"
)
type Compare struct {
}
/**
real < alter
*/
func (Compare) Less(alter string, real string) bool {
return real < alter
}
/**
real > alter
*/
func (Compare) Greater(alter string, real string) bool {
return real > alter
}
/**
real = alter
*/
func (Compare) Equal(alter string, real string) bool {
return real == alter
}
func (Compare) Between(floor string, ceil string, real string) bool {
return floor < real && ceil > real
}
/**
同比超过alter
*/
func (Compare) ComparedWithSame(alter string, old string, current string) bool {
cf := parseToFloat(current)
of := parseToFloat(old)
return (cf-of)/of > parseToFloat(alter)
}
func parseToFloat(value string) float64 {
rs, err := strconv.ParseFloat(value, 64)
if nil != err {
logger.Error.Println(err)
}
return rs
}
type MsgBuilder struct {
}
func (MsgBuilder) Less(alter string, real string) string {
return fmt.Sprintf(" 当前值:%.3f, 低于阈值:%s",parseToFloat(real), alter)
}
/**
real > alter
*/
func (MsgBuilder) Greater(alter string, real string) string {
return fmt.Sprintf(" 当前值:%.3f, 高于阈值:%s", parseToFloat(real), alter)
}
/**
real = alter
*/
func (MsgBuilder) Equal(alter string, real string) string {
return fmt.Sprintf(" 当前值:%.3f, 等于阈值:%.3s", parseToFloat(real), alter)
}
func (MsgBuilder) Between(floor string, ceil string, real string) string {
return fmt.Sprintf(" 当前值:%.3f, 介于于阈值:%s 和 %s 之间", parseToFloat(real), ceil, floor)
}
/**
同比超过alter
*/
func (MsgBuilder) ComparedWithSame(alter string, old string, current string) string {
return "同比超过" + alter
}
package alarm
import (
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"reflect"
"strings"
)
var operators = reflect.ValueOf(Compare{})
var msgBuilder = reflect.ValueOf(MsgBuilder{})
func DealResult(res []client.Result, strategies []Strategy) {
var resIndex = 0
for _, strategy := range strategies {
logger.Info.Println("-------", strategy.Name, resIndex, "---------")
operator := strategy.Operator
operatorMethod := operators.MethodByName(operator)
buildMsgMethod := msgBuilder.MethodByName(operator)
alterValue := strategy.AlterValue
alterValueLen := len(alterValue)
ignoreTag := strategy.IgnoreTag
tags := strategy.Tag
notice := strategy.Notice
noDataAlter := strategy.NoDataAlter
personalAlterValue := strategy.PersonalAlterValue
if strategy.SqlLen == 1 { //单sql
result := res[resIndex]
for _, series := range result.Series {
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
if needIgnore(ignoreTag, tagValues) {
continue
}
value := series.Values[0][1]
if nil == value && noDataAlter { //空值报警
Sender(tagValues, strategy.Name, "no data", notice)
}
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
params := make([]reflect.Value, alterValueLen+1)
for j, arg := range currentAlterValue {
params[j] = reflect.ValueOf(arg)
}
params[alterValueLen] = reflect.ValueOf(reflect.ValueOf(value).String())
logger.Info.Println(uniqueTag, ":", params)
rs := operatorMethod.Call(params)
if rs[0].Bool() { //触发报警策略
Sender(tagValues, strategy.Name, buildMsgMethod.Call(params)[0].String(), notice) // 报警
}
}
} else {
params := make(map[string][]reflect.Value)
tagValueMap := make(map[string][]string)
for i := 0; i < strategy.SqlLen; i++ {
resIndex += i
for _, series := range res[resIndex].Series {
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
if needIgnore(ignoreTag, tagValues) {
continue
}
tagValueMap[uniqueTag] = tagValues
value := series.Values[0][1]
if nil == params[uniqueTag] {
currentParams := make([]reflect.Value, strategy.SqlLen+alterValueLen)
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
for k, arg := range currentAlterValue {
currentParams[k] = reflect.ValueOf(arg)
}
params[uniqueTag] = currentParams
}
valueStr := reflect.ValueOf(value).String()
params[uniqueTag][alterValueLen+i] = reflect.ValueOf(valueStr)
}
}
for k, v := range params {
if nil == v || hasNilValue(v) {
continue
}
logger.Info.Println(k, ":", v)
rs := operatorMethod.Call(v)
//结果报警
if rs[0].Bool() { //触发报警策略
Sender(tagValueMap[k], strategy.Name, buildMsgMethod.Call(v)[0].String(), notice) // 报警
}
}
}
//循环结果集
resIndex += 1
}
}
func CheckArray(strategies []Strategy) bool {
for _, strategy := range strategies {
if !Check(strategy) {
return false
}
}
return true
}
func Check(strategy Strategy) bool {
if "" == strategy.Name {
return false
}
if "" == strategy.Name {
return false
}
if strategy.SqlLen != strings.Count(strategy.Sql, ";") {
return false
}
if len(strategy.Tag) < 1 {
return false
}
length := len(strategy.AlterValue)
for _, v := range strategy.PersonalAlterValue {
if len(v) != length {
return false
}
}
return true
}
func BuildSql(strategies []Strategy) string {
var sqlBuff strings.Builder
for _, strategy := range strategies {
sqlBuff.WriteString(strategy.Sql)
}
return sqlBuff.String()
}
func hasNilValue(params []reflect.Value) bool {
for _, c := range params {
return c.IsValid()
}
return false
}
func needIgnore(ignoreTag map[string]bool, tags []string) bool {
for _, k := range tags {
if ignoreTag[k] {
return true
}
}
return false
}
func getAlterValue(personalAlterValue map[string][]string, uniqueTag string, tags []string, alterValue []string) []string {
value := personalAlterValue[uniqueTag]
if nil != value {
return value
}
tagLen := len(tags)
for i := 1; i < tagLen; i++ {
if nil != personalAlterValue[tags[i]] {
return personalAlterValue[tags[i]]
}
}
return alterValue
}
func getTagValues(keys []string, tag map[string]string) []string {
values := make([]string, len(keys))
for i, k := range keys {
values[i] = tag[k]
}
return values
}
package alarm
type StrategyType int8
type NoticeSender int8
const (
SERVICE StrategyType = 1 //监控服务的策略
MACHINE StrategyType = 2 //监控机器的策略
COMMON StrategyType = 3 //通用策略
)
const (
DING_DING NoticeSender = 1 //钉钉
MAIL NoticeSender = 2 //邮箱
)
type Strategy struct {
Name string `json:"name"` //策略名称
Sql string `json:"sql"`//sql 每条sql用 ; 分割
SqlLen int `json:"sql_len"` //包含的sql条数,
Type StrategyType `json:"type"` //策略类型
Operator string `json:"operator"` //结果比较方式
AlterValue []string `json:"alter_value"` //触发点
Tag []string `json:"tag"` //sql中需要展示的tag名称,将最细粒度的tag名称放在array[0], sys_name>host
IgnoreTag map[string]bool `json:"ignore_tag"` //需要忽略的服务、主机
PersonalAlterValue map[string][]string `json:"personal_alter_value"` //针对服务、主机个性化触发点配置
NoDataAlter bool `json:"no_data_alter"` //空值报警
Notice Notice `json:"notice"` //通知方式
}
type Notice struct {
Sender NoticeSender `json:"sender"` //发送方式 dingDing(1)/mail(2)
Interval int64 `json:"interval"` //相同的(同服务/主机,同策略)报警发送间隔 单位 秒
Receiver []string //接收者,dingDing 则为webHookUrl, mail 则为邮箱
}
type DinDingMsg struct {
MsgType string `json:"msgtype"`
Link Link `json:"link"`
}
type Link struct {
Title string `json:"title"`
Text string `json:"text"`
MessageUrl string `json:"messageUrl"`
}
...@@ -9,6 +9,7 @@ type Config struct { ...@@ -9,6 +9,7 @@ type Config struct {
Redis Redis `json:"redis"` Redis Redis `json:"redis"`
Kafka Kafka `json:"kafka"` Kafka Kafka `json:"kafka"`
InfluxDb InfluxDb `json:"influx_db"` InfluxDb InfluxDb `json:"influx_db"`
StrategyConfPath string `json:"strategy_conf_path"`
} }
type Redis struct { type Redis struct {
......
...@@ -2,7 +2,7 @@ package conf ...@@ -2,7 +2,7 @@ package conf
import ( import (
"encoding/json" "encoding/json"
"fmt" "git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
) )
...@@ -16,7 +16,8 @@ func Load(denv string, didc string) { ...@@ -16,7 +16,8 @@ func Load(denv string, didc string) {
bytes, err := ioutil.ReadAll(resp.Body) bytes, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(bytes, &GlobalConfig) err = json.Unmarshal(bytes, &GlobalConfig)
GlobalConfig.Env = denv GlobalConfig.Env = denv
GlobalConfig.InfluxDb.Host = "http://172.20.6.33"
if nil != err { if nil != err {
fmt.Println(err) logger.Error.Println(err)
} }
} }
...@@ -2,8 +2,8 @@ package data ...@@ -2,8 +2,8 @@ package data
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"log"
) )
func NewClient() client.Client { func NewClient() client.Client {
...@@ -13,7 +13,7 @@ func NewClient() client.Client { ...@@ -13,7 +13,7 @@ func NewClient() client.Client {
con, err := client.NewHTTPClient(httpConfig) con, err := client.NewHTTPClient(httpConfig)
if err != nil { if err != nil {
log.Fatal(err) logger.Error.Println(err.Error())
} }
return con return con
......
package job
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"net"
"os"
"time"
)
/**
报警定时任务,每分钟执行一次
*/
func AutoAlarm() {
if !checkIp("172.30.12.22"){
return
}
c := cron.New()
err := c.AddFunc("@every 1m", func() {
logger.Info.Println("开始执行定时任务", time.Now().Minute())
alarm.Load()
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
func checkIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info.Println(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
...@@ -2,9 +2,9 @@ package service ...@@ -2,9 +2,9 @@ package service
import ( import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster" "github.com/bsm/sarama-cluster"
"log"
"os" "os"
"os/signal" "os/signal"
) )
...@@ -29,14 +29,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler ...@@ -29,14 +29,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
// consume errors // consume errors
go func() { go func() {
for err := range consumer.Errors() { for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error()) logger.Error.Println("consume error:", err.Error())
} }
}() }()
// consume notifications // consume notifications
go func() { go func() {
for ntf := range consumer.Notifications() { for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf) logger.Info.Printf("Rebalanced: %+v\n", ntf)
} }
}() }()
......
package logger
import (
"io"
"log"
"os"
)
var (
Info *log.Logger
Warning *log.Logger
Error * log.Logger
)
func init(){
file, err := os.OpenFile("quantgroup.log", os.O_RDWR|os.O_CREATE, 0666)
if err!=nil{
log.Fatalln("打开日志文件失败:",err)
}
Info = log.New(io.MultiWriter(os.Stderr,file),"Info:",log.Ldate | log.Ltime | log.Lshortfile)
Warning = log.New(io.MultiWriter(os.Stderr,file),"Warning:",log.Ldate | log.Ltime | log.Lshortfile)
Error = log.New(io.MultiWriter(os.Stderr,file),"Error:",log.Ldate | log.Ltime | log.Lshortfile)
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"log" "log"
...@@ -95,9 +96,9 @@ func batchWrite(pointArray []*client.Point) { ...@@ -95,9 +96,9 @@ func batchWrite(pointArray []*client.Point) {
points.AddPoints(pointArray) points.AddPoints(pointArray)
err = c.Write(points) err = c.Write(points)
fmt.Println("写入数据", len(pointArray)) logger.Info.Println("写入数据", len(pointArray))
if err != nil { if err != nil {
log.Fatal(err) logger.Error.Fatal(err)
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment