Commit 7313cc4d authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

Merge branch 'continuousQueries' into 'master'

Continuous queries



See merge request !2
parents 32f44d11 6830ff91
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"flag" "flag"
"git.quantgroup.cn/DevOps/enoch/service" "git.quantgroup.cn/DevOps/enoch/service"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/continuous_queries"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/job" "git.quantgroup.cn/DevOps/enoch/service/job"
"log" "log"
...@@ -38,7 +39,6 @@ func main() { ...@@ -38,7 +39,6 @@ func main() {
//初始化redis连接池 //初始化redis连接池
data.RedisPoolInit() data.RedisPoolInit()
go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{}) go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{})
go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{}) go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port) intPort, _ := strconv.Atoi(port)
...@@ -49,7 +49,7 @@ func main() { ...@@ -49,7 +49,7 @@ func main() {
} }
job.AutoAlarm() job.AutoAlarm()
continuous_queries.Load()
go func() { go func() {
http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil) http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil)
}() }()
......
package alarm
const (
MONITOR = "monitor"
SYSNAME_SQL = "show tag values from machine_info with key = sys_name"
HOST_SQL = "show tag values from machine_info with key= host where sys_name = "
)
package alarm
import (
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func Query(sql string, db string) []client.Result {
con := data.NewClient()
q := client.Query{Command: sql, Database: db}
res, err := con.Query(q)
if nil != err {
logger.Error.Println("influxdb client init error", err)
}
if nil != res.Error() {
logger.Error.Println("query error", db, sql, res.Error())
}
return res.Results
}
func QueryMonitor(sql string) []client.Result {
return Query(sql, MONITOR)
}
func QuerySysName() []string {
res := QueryMonitor(SYSNAME_SQL)
var values = res[0].Series[0].Values
sysName := make([]string, 0)
for _, v := range values {
sysName = append(sysName, v[1].(string))
}
return sysName
}
func QueryHost(sysName []string) map[string][]string {
var sqlBuff bytes.Buffer
for _, name := range sysName {
sqlBuff.WriteString(HOST_SQL)
sqlBuff.WriteString("'")
sqlBuff.WriteString(name)
sqlBuff.WriteString("';")
}
res := QueryMonitor(sqlBuff.String())
nodeTree := make(map[string][]string)
for index, result := range res {
ipList := make([]string, 0)
values := result.Series[0].Values
for _, v := range values {
ipList = append(ipList, v[1].(string))
}
nodeTree[sysName[index]] = ipList
}
return nodeTree
}
...@@ -3,18 +3,19 @@ package alarm ...@@ -3,18 +3,19 @@ package alarm
import ( import (
"encoding/json" "encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil" "io/ioutil"
) )
func Load() { func Load() {
data,err := ioutil.ReadFile(conf.GlobalConfig.StrategyConfPath) config,err := ioutil.ReadFile(conf.GlobalConfig.StrategyConfPath)
if err != nil { if err != nil {
logger.Error.Fatal("未找到配置文件") logger.Error.Fatal("未找到配置文件")
} }
strategies := make([]Strategy, 0) strategies := make([]Strategy, 0)
err = json.Unmarshal(data, &strategies) err = json.Unmarshal(config, &strategies)
if err != nil { if err != nil {
logger.Error.Fatal("策略文件格式错误:", err) logger.Error.Fatal("策略文件格式错误:", err)
...@@ -25,7 +26,7 @@ func Load() { ...@@ -25,7 +26,7 @@ func Load() {
} }
sql := BuildSql(strategies) sql := BuildSql(strategies)
result := QueryMonitor(sql) result := data.QueryMonitor(sql)
DealResult(result, strategies) DealResult(result, strategies)
} }
...@@ -144,7 +144,7 @@ func BuildSql(strategies []Strategy) string { ...@@ -144,7 +144,7 @@ func BuildSql(strategies []Strategy) string {
func hasNilValue(params []reflect.Value) bool { func hasNilValue(params []reflect.Value) bool {
for _, c := range params { for _, c := range params {
return c.IsValid() return !c.IsValid()
} }
return false return false
......
...@@ -9,6 +9,7 @@ type Config struct { ...@@ -9,6 +9,7 @@ type Config struct {
Redis Redis `json:"redis"` Redis Redis `json:"redis"`
Kafka Kafka `json:"kafka"` Kafka Kafka `json:"kafka"`
InfluxDb InfluxDb `json:"influx_db"` InfluxDb InfluxDb `json:"influx_db"`
ApdexThreshold ApdexThreshold `json:"apdex_threshold"`
StrategyConfPath string `json:"strategy_conf_path"` StrategyConfPath string `json:"strategy_conf_path"`
} }
...@@ -29,3 +30,7 @@ type InfluxDb struct { ...@@ -29,3 +30,7 @@ type InfluxDb struct {
Port string `json:"port"` Port string `json:"port"`
} }
type ApdexThreshold struct {
Common int `json:"common"`
Personal map[string]int `json:"personal"`
}
package continuous_queries
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"net"
"os"
"strings"
)
var (
DROP_CQ_SQL = "DROP CONTINUOUS QUERY %s ON %s;"
SHOW_CQ_SQL = "SHOW CONTINUOUS QUERIES;"
APDEX_CQ_SAT = "CREATE CONTINUOUS QUERY cq_apdex_sat_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS sat INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_TOL = "CREATE CONTINUOUS QUERY cq_apdex_tol_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS tol INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" > %d AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_ALL = "CREATE CONTINUOUS QUERY cq_apdex_all ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS ct_all INTO monitor.autogen.apdex FROM monitor.autogen.trace_info GROUP BY sys_name, time(1m) fill(1) END;"
)
func query(dbName string) []string {
cq := data.QueryMonitor(SHOW_CQ_SQL)
logger.Info.Println(cq)
cqName := make([]string, 0)
for _, row := range cq[0].Series {
if row.Name == dbName {
for _, v := range row.Values {
cqName = append(cqName, v[0].(string))
}
}
}
return cqName
}
func delete(db string, cqName [] string) bool {
sql := strings.Builder{}
for _, name := range cqName {
sql.WriteString(fmt.Sprintf(DROP_CQ_SQL, name, db))
}
rs := data.QueryMonitor(sql.String())
return rs == nil
}
func create(apdexThreshold conf.ApdexThreshold) {
sysName := data.QuerySysName()
logger.Info.Println(sysName)
sql := strings.Builder{}
for _, name := range sysName {
cqName := strings.Replace(name, "-", "_", 10)
threshold := apdexThreshold.Common
if c, ok := apdexThreshold.Personal[name]; ok {
threshold = c
}
sql.WriteString(buildSatSql(cqName, name, threshold))
sql.WriteString(buildTolSql(cqName, name, threshold))
}
sql.WriteString(APDEX_CQ_ALL)
data.QueryMonitor(sql.String())
}
func buildSatSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_SAT, cqName, sysName, threshold)
}
func buildTolSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_TOL, cqName, sysName, threshold, threshold*4)
}
func Load() {
if !checkIp("172.30.12.22"){
return
}
name := query(data.MONITOR)
logger.Info.Println("old: ", name)
delete(data.MONITOR, name)
create(conf.GlobalConfig.ApdexThreshold)
name = query(data.MONITOR)
logger.Info.Println("new: ", name)
}
func checkIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info.Println(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
\ No newline at end of file
package data
const (
MONITOR = "monitor"
SYSNAME_SQL = "show tag values from trace_info with key = sys_name;"
HOST_SQL = "show tag values from trace_info with key= host where sys_name = "
)
package data package data
import ( import (
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
...@@ -17,5 +18,56 @@ func NewClient() client.Client { ...@@ -17,5 +18,56 @@ func NewClient() client.Client {
} }
return con return con
}
func Query(sql string, db string) []client.Result {
con := NewClient()
q := client.Query{Command: sql, Database: db}
res, err := con.Query(q)
if nil != err {
logger.Error.Println("influxdb client init error", err)
}
if nil != res.Error() {
logger.Error.Println("query error", db, sql, res.Error())
}
return res.Results
}
func QueryMonitor(sql string) []client.Result {
return Query(sql, MONITOR)
}
func QuerySysName() []string {
res := QueryMonitor(SYSNAME_SQL)
var values = res[0].Series[0].Values
sysName := make([]string, 0)
for _, v := range values {
sysName = append(sysName, v[1].(string))
}
return sysName
}
func QueryHost(sysName []string) map[string][]string {
var sqlBuff bytes.Buffer
for _, name := range sysName {
sqlBuff.WriteString(HOST_SQL)
sqlBuff.WriteString("'")
sqlBuff.WriteString(name)
sqlBuff.WriteString("';")
}
res := QueryMonitor(sqlBuff.String())
nodeTree := make(map[string][]string)
for index, result := range res {
ipList := make([]string, 0)
values := result.Series[0].Values
for _, v := range values {
ipList = append(ipList, v[1].(string))
}
nodeTree[sysName[index]] = ipList
}
return nodeTree
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment