Commit 17c7a4e6 authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

# 连续查询动态配置

parent 156788a1
......@@ -4,6 +4,7 @@ import (
"flag"
"git.quantgroup.cn/DevOps/enoch/service"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/continuous_queries"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/job"
"log"
......@@ -38,7 +39,6 @@ func main() {
//初始化redis连接池
data.RedisPoolInit()
go service.AgentClusterConsumer(conf.HealthTopic(), service.HealthMessageHandler{})
go service.AgentClusterConsumer(conf.BraveTopic(), service.BraveMessageHandler{})
intPort, _ := strconv.Atoi(port)
......@@ -49,7 +49,7 @@ func main() {
}
job.AutoAlarm()
continuous_queries.Load()
go func() {
http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil)
}()
......
......@@ -9,6 +9,7 @@ type Config struct {
Redis Redis `json:"redis"`
Kafka Kafka `json:"kafka"`
InfluxDb InfluxDb `json:"influx_db"`
ApdexThreshold ApdexThreshold `json:"apdex_threshold"`
StrategyConfPath string `json:"strategy_conf_path"`
}
......@@ -29,3 +30,7 @@ type InfluxDb struct {
Port string `json:"port"`
}
type ApdexThreshold struct {
Common int `json:"common"`
Personal map[string]int `json:"personal"`
}
package continuous_queries
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"net"
"os"
"strings"
)
var (
DROP_CQ_SQL = "DROP CONTINUOUS QUERY %s ON %s"
SHOW_CQ_SQL = ""
DROP_CQ_SQL = "DROP CONTINUOUS QUERY %s ON %s;"
SHOW_CQ_SQL = "SHOW CONTINUOUS QUERIES;"
APDEX_CQ_SAT = "CREATE CONTINUOUS QUERY cq_apdex_sat_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS sat INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_TOL = "CREATE CONTINUOUS QUERY cq_apdex_tol_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS tol INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" > %d AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_ALL = "CREATE CONTINUOUS QUERY cq_apdex_all ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS ct_all INTO monitor.autogen.apdex FROM monitor.autogen.trace_info GROUP BY sys_name, time(1m) fill(1) END;"
)
func query(db string) {
func query(dbName string) []string {
cq := data.QueryMonitor(SHOW_CQ_SQL)
logger.Info.Println(cq)
cqName := make([]string, 0)
for _, row := range cq[0].Series {
if row.Name == dbName {
for _, v := range row.Values {
cqName = append(cqName, v[0].(string))
}
}
}
return cqName
}
func delete(db string, cqName [] string) bool {
sql := strings.Builder{}
for _, name := range cqName {
sql.WriteString(fmt.Sprintf(DROP_CQ_SQL, name, db))
}
rs := data.QueryMonitor(sql.String())
return rs == nil
}
func clear(db string) bool {
func create(apdexThreshold conf.ApdexThreshold) {
sysName := data.QuerySysName()
logger.Info.Println(sysName)
sql := strings.Builder{}
for _, name := range sysName {
cqName := strings.Replace(name, "-", "_", 10)
threshold := apdexThreshold.Common
if c, ok := apdexThreshold.Personal[name]; ok {
threshold = c
}
sql.WriteString(buildSatSql(cqName, name, threshold))
sql.WriteString(buildTolSql(cqName, name, threshold))
}
sql.WriteString(APDEX_CQ_ALL)
data.QueryMonitor(sql.String())
}
func buildSatSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_SAT, cqName, sysName, threshold)
}
func delete(db string, cqName string) bool {
func buildTolSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_TOL, cqName, sysName, threshold, threshold*4)
}
func Load() {
if !checkIp("172.30.12.22"){
return
}
name := query(data.MONITOR)
logger.Info.Println("old: ", name)
delete(data.MONITOR, name)
create(conf.GlobalConfig.ApdexThreshold)
name = query(data.MONITOR)
logger.Info.Println("new: ", name)
}
func create(sql string) {
}
func checkIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info.Println(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
\ No newline at end of file
......@@ -2,7 +2,7 @@ package data
const (
MONITOR = "monitor"
SYSNAME_SQL = "show tag values from machine_info with key = sys_name"
HOST_SQL = "show tag values from machine_info with key= host where sys_name = "
SYSNAME_SQL = "show tag values from trace_info with key = sys_name;"
HOST_SQL = "show tag values from trace_info with key= host where sys_name = "
)
......@@ -20,7 +20,6 @@ func NewClient() client.Client {
return con
}
func Query(sql string, db string) []client.Result {
con := NewClient()
q := client.Query{Command: sql, Database: db}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment