Commit c64dac66 authored by jingbo.wang's avatar jingbo.wang

init连续查询ok,支持从apollo配置

parent 651e4e91
package dao package dao
import ( import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/global" "git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog" "git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"strings"
"time" "time"
) )
const (
//apdex公式:apdex = (满意样本 + 可容忍样本/2) / 样本总数
//样本总数:request的总量
ApdexCqAll = `CREATE CONTINUOUS QUERY cq_apdex_all ON monitor RESAMPLE FOR 1h BEGIN ` +
`SELECT count(traceId) AS ct_all INTO apdex FROM trace_info GROUP BY sys_name, time(1m) fill(1) END;`
//满意样本:request的响应时间[0, m)秒
ApdexCqSat = `CREATE CONTINUOUS QUERY cq_apdex_sat_%s ON monitor RESAMPLE FOR 1h BEGIN ` +
`SELECT count(traceId) AS sat INTO apdex FROM trace_info ` +
`WHERE sys_name = '%s' AND 'duration' < %d ` +
`GROUP BY time(1m) fill(0) END;`
//可容忍样本:request的响应时间[m, n)秒
ApdexCqTol = `CREATE CONTINUOUS QUERY cq_apdex_tol_%s ON monitor RESAMPLE FOR 1h BEGIN ` +
`SELECT count(traceId) AS tol INTO apdex FROM trace_info ` +
`WHERE sys_name = '%s' AND 'duration' >= %d AND 'duration' < %d ` +
`GROUP BY time(1m) fill(0) END;`
//获取全部服务
SysNameSql = "show tag values from trace_info with key = sys_name;"
//显示连续查询
ShowCqSql = "SHOW CONTINUOUS QUERIES;"
//删除连续查询
DropCqSql = "DROP CONTINUOUS QUERY %s ON %s;"
)
var (
ApdexDefaultThreshold = Threshold{
Good: 100,
Available: 400,
}
)
type Threshold struct {
Good int `json:"good"` //良好门限
Available int `json:"available"` //可用门限
}
func init() {
//获取默认门限
if data, ok := global.Config.Get("threshold", "threshold.default"); ok {
if err := json.Unmarshal([]byte(data), &ApdexDefaultThreshold); err != nil {
glog.Warn("apdex default threshold json unmarshal")
}
}
}
//获取门限,如果获取不到则返回default
func getApdexThreshold(sysName string) *Threshold {
rtn := Threshold{}
if data, ok := global.Config.Get("threshold", "threshold."+sysName); ok {
if err := json.Unmarshal([]byte(data), &rtn); err == nil {
return &rtn
}
}
return &ApdexDefaultThreshold
}
func apdexSatSql(sysName string, m int) string {
cqName := strings.Replace(sysName, "-", "_", 10)
return fmt.Sprintf(ApdexCqSat, cqName, sysName, m)
}
func apdexTolSql(sysName string, m int, n int) string {
cqName := strings.Replace(sysName, "-", "_", 10)
return fmt.Sprintf(ApdexCqTol, cqName, sysName, m, n)
}
func DbInit() { func DbInit() {
config := client.HTTPConfig{ config := client.HTTPConfig{
Addr: global.InfluxDbAddress, Addr: global.InfluxDbAddress,
...@@ -18,9 +92,65 @@ func DbInit() { ...@@ -18,9 +92,65 @@ func DbInit() {
return return
} }
//如果数据库不存在,创建数据库,可重复创建不会覆盖掉原数据库 //创建数据库
if _, err := c.Query(client.Query{Command: "create database " + global.InfluxDbName}); err != nil { if _, err := c.Query(client.Query{Command: "create database " + global.InfluxDbName}); err != nil {
glog.Error(err) glog.Error(err)
return return
} }
//删除连续查询
cqList, err := c.Query(client.Query{Command: ShowCqSql, Database: global.InfluxDbName})
if err != nil {
glog.Error(err)
return
}
cqNameList := make([]string, 0)
if len(cqList.Results) > 0 {
for _, row := range cqList.Results[0].Series {
if row.Name == global.InfluxDbName {
for _, v := range row.Values {
cqNameList = append(cqNameList, v[0].(string))
}
}
}
}
delCqSqls := strings.Builder{}
for _, cqName := range cqNameList {
delCqSqls.WriteString(fmt.Sprintf(DropCqSql, cqName, global.InfluxDbName))
}
glog.Info(delCqSqls.String())
if _, err := c.Query(client.Query{Command: delCqSqls.String(), Database: global.InfluxDbName}); err != nil {
glog.Error(err)
}
//添加连续查询(sat & tol)
serviceList, err := c.Query(client.Query{Command: SysNameSql, Database: global.InfluxDbName})
if err != nil {
glog.Error(err)
return
}
sysNameList := make([]string, 0)
if len(serviceList.Results) > 0 && len(serviceList.Results[0].Series) > 0 && len(serviceList.Results[0].Series[0].Values) > 1 {
for _, v := range serviceList.Results[0].Series[0].Values {
sysNameList = append(sysNameList, v[1].(string))
}
}
glog.Debug(sysNameList)
newCqSqls := strings.Builder{}
for _, sysName := range sysNameList {
//根据sys_name获取threshold
apdexThreshold := getApdexThreshold(sysName)
newCqSqls.WriteString(apdexSatSql(sysName, apdexThreshold.Good))
newCqSqls.WriteString(apdexTolSql(sysName, apdexThreshold.Good, apdexThreshold.Available))
}
glog.Info(newCqSqls.String())
if _, err := c.Query(client.Query{Command: newCqSqls.String(), Database: global.InfluxDbName}); err != nil {
glog.Error(err)
}
//创建连续查询(ALL)
if _, err := c.Query(client.Query{Command: ApdexCqAll, Database: global.InfluxDbName}); err != nil {
glog.Error(err)
return
}
} }
...@@ -10,12 +10,6 @@ import ( ...@@ -10,12 +10,6 @@ import (
) )
var ( var (
DROP_CQ_SQL = "DROP CONTINUOUS QUERY %s ON %s;"
SHOW_CQ_SQL = "SHOW CONTINUOUS QUERIES;"
APDEX_CQ_SAT = "CREATE CONTINUOUS QUERY cq_apdex_sat_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS sat INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_TOL = "CREATE CONTINUOUS QUERY cq_apdex_tol_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS tol INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" > %d AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_ALL = "CREATE CONTINUOUS QUERY cq_apdex_all ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS ct_all INTO monitor.autogen.apdex FROM monitor.autogen.trace_info GROUP BY sys_name, time(1m) fill(1) END;"
)
func query(dbName string) []string { func query(dbName string) []string {
cq := data.QueryMonitor(SHOW_CQ_SQL) cq := data.QueryMonitor(SHOW_CQ_SQL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment