Commit 9628d0ed authored by jingbo.wang's avatar jingbo.wang

删除influxDB中的错误数据

parent 3fe4ee2d
...@@ -103,6 +103,7 @@ func init() { ...@@ -103,6 +103,7 @@ func init() {
KafkaRecver = kafka.NewRecver(KafkaVersion, strings.Split(kafkaAddress, ","), kafkaLogger) KafkaRecver = kafka.NewRecver(KafkaVersion, strings.Split(kafkaAddress, ","), kafkaLogger)
InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "") InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "")
InfluxDbAddress = "http://172.20.6.33:8086"
DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var") DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var")
//初始化registry //初始化registry
......
...@@ -78,7 +78,7 @@ func buildTraceInfluxMsg(traceInfo TraceMsg) *client.Point { ...@@ -78,7 +78,7 @@ func buildTraceInfluxMsg(traceInfo TraceMsg) *client.Point {
tags := make(map[string]string) tags := make(map[string]string)
tags["sys_name"] = traceInfo.LocalEndpoint.ServiceName tags["sys_name"] = traceInfo.LocalEndpoint.ServiceName
tags["path"] = strings.ToLower(traceInfo.Tags.HttpMethod + " " + traceInfo.Tags.HttpPath) tags["path"] = strings.ToLower(traceInfo.Tags.HttpMethod + " " + traceInfo.Name)
tags["host"] = traceInfo.LocalEndpoint.Ipv4 tags["host"] = traceInfo.LocalEndpoint.Ipv4
unix := time.Unix(0, traceInfo.Timestamp*1000) //精度为微秒,*1000后为纳秒 unix := time.Unix(0, traceInfo.Timestamp*1000) //精度为微秒,*1000后为纳秒
......
sys_name=black-hole:path=get\ /contractno\=lhpgdxd231231t16e88bb63a418482250090
sys_name=black-hole:path=get\ /contractno\=lhpgdxd231231t16e88bb63a418482250090&templateid\=145
sys_name=black-hole:path=options\ /variable/142
sys_name=cash-loan-flow:path=POST\ /ex/api/qiancheng/lhp%20%20%20
sys_name=cuishou:path=OPTIONS\ /bill/0/10008645
sys_name=cuishou:path=OPTIONS\ /installment/0/100005599
sys_name=cuishou:path=OPTIONS\ /user/10008645
sys_name=cuishou:path=options\ /bill/0/10014221
sys_name=cuishou:path=options\ /maxreliefamount/1/131
sys_name=cuishou:path=options\ /installment/0/102101210
sys_name=cuishou:path=options\ /debt/1/11673039
sys_name=cuishou:path=options\ /user/10014221
sys_name=gyxd:path=options\ /ex/report/list/5028
sys_name=xyqb-mall:path=GET\ /ex/train/order_detail/trainmall1483586942176
sys_name=xyqb:path=POST\ /clear-proof/condition/100254929
sys_name=xyqb-user2:path=get\ /static/js/0.30b581b8c33f83f35112.js
sys_name=xyqb:path=get\ /clear-proof/condition/102404530
sys_name=xyqb:path=post\ /clear-proof/condition/100827030
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/global" "git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"strings" "strings"
...@@ -14,6 +15,7 @@ const ( ...@@ -14,6 +15,7 @@ const (
getServiceInfoSql = `SELECT * FROM %s WHERE sys_name = '%s' AND time >= %d AND time < %d;` getServiceInfoSql = `SELECT * FROM %s WHERE sys_name = '%s' AND time >= %d AND time < %d;`
tableTraceInfo = "trace_info" tableTraceInfo = "trace_info"
minTrace = 50 //小于50的调用不计入统计 minTrace = 50 //小于50的调用不计入统计
minDuration = time.Millisecond * 10
getServiceListSql = `show tag values from trace_info with key = sys_name` getServiceListSql = `show tag values from trace_info with key = sys_name`
healthCheck = `/tech/health/check` healthCheck = `/tech/health/check`
) )
...@@ -38,7 +40,6 @@ func getModRowKeyIdx(key string, row models.Row) (int, bool) { ...@@ -38,7 +40,6 @@ func getModRowKeyIdx(key string, row models.Row) (int, bool) {
func Query(sql string) (*client.Response, error) { func Query(sql string) (*client.Response, error) {
config := client.HTTPConfig{ config := client.HTTPConfig{
Addr: global.InfluxDbAddress, Addr: global.InfluxDbAddress,
Timeout: time.Second * 30,
} }
connect, err := client.NewHTTPClient(config) connect, err := client.NewHTTPClient(config)
...@@ -90,6 +91,7 @@ func GetDataFromDb(startTime time.Time, endTime time.Time) ServiceMap { ...@@ -90,6 +91,7 @@ func GetDataFromDb(startTime time.Time, endTime time.Time) ServiceMap {
sql := getSql(serviceName, tableTraceInfo, startTime, endTime) sql := getSql(serviceName, tableTraceInfo, startTime, endTime)
resp, err := Query(sql) resp, err := Query(sql)
if err != nil { if err != nil {
glog.Error("get service trace errror:", err)
continue continue
} }
...@@ -170,3 +172,31 @@ func rowToTraceList(serviceName string, row models.Row) []*Trace { ...@@ -170,3 +172,31 @@ func rowToTraceList(serviceName string, row models.Row) []*Trace {
return rtn return rtn
} }
func GetGeneralTable(startTime time.Time, endTime time.Time) {
rtn := ""
sm := GetDataFromDb(startTime, endTime)
rtn += fmt.Sprintf("API访问平均时间:%v, API访问量:%d\n", sm.GetAverageTime(), sm.GetCount())
//删除掉小于50访问量的节点
for _, service := range sm {
for path, traceList := range service.GetPathMap() {
if traceList.GetCount() < minTrace {
service.DelPath(path)
}
}
}
//生成
traceList := NewTraceList("")
for _, v := range sm {
if trace, ok := v.GetMaxMediaTrace(); ok && trace.Duration > minDuration {
traceList.AddTrace(trace)
}
}
for _, v := range traceList.GetTraceList() {
rtn += fmt.Sprintln(v.Duration, v.ServiceName, v.TimePoint, v.Id, v.Path)
}
fmt.Print(rtn)
}
package report_form package report_form
import ( import (
"fmt"
"strings"
"testing" "testing"
"time" "time"
) )
...@@ -10,19 +12,135 @@ func TestGetServiceListFromDb(t *testing.T) { ...@@ -10,19 +12,135 @@ func TestGetServiceListFromDb(t *testing.T) {
} }
func TestGetDataFromDb(t *testing.T) { func TestGetDataFromDb(t *testing.T) {
end := time.Now().Add(time.Second * -600) start := time.Now().Add(time.Second*-86400 - 300)
start := end.AddDate(0, 0, -10) end := time.Now().Add(time.Second * -300)
sm := GetDataFromDb(start, end) GetGeneralTable(start, end)
t.Log(sm.GetAverageTime(), sm.GetCount()) }
tl := NewTraceList("") //获取path列表
for _, v := range sm { func TestPath(t *testing.T) {
if trace, ok := v.GetMaxMediaTrace(); ok { fmt.Println("111")
tl.AddTrace(trace) resp, err := Query(`show series`)
if err != nil {
t.Error(err)
}
fmt.Println("222")
//f, err := os.Create("series_list")
if err != nil {
t.Error(err)
}
sum := 0
sqlList := make([]string, 0, 5120)
for _, v := range resp.Results {
for _, row := range v.Series {
for _, series := range row.Values {
msg := series[0].(string)
path := getPath(msg)
sum++
if isDel(path) {
fmt.Println(path)
sql := fmt.Sprintf("DELETE FROM trace_info WHERE path = '%s';", path)
sqlList = append(sqlList, sql)
}
}
} }
} }
list := tl.GetTraceList() fmt.Println(sum)
for _, v := range list { /*
t.Log(v.Duration, v.ServiceName, v.Id, v.Path) resp, err = Query(strings.Join(sqlList, " "))
if err != nil {
t.Error(err)
}
if resp.Err != "" {
t.Error(resp.Err)
}
*/
/*
sqlList := make([]string, 0, 5120)
for _, v := range resp.Results {
for _, row := range v.Series {
for _, series := range row.Values {
msg := series[0].(string)
seriesSplit := strings.Split(series[0].(string), ",")
if len(seriesSplit) > 3 {
path := getPath(msg)
if isDel(path) {
//if len(path) > 7 && strings.ToLower(path[0:7]) == "options" {
path = strings.ReplaceAll(path, "\\", "")
sql := fmt.Sprintf("DELETE FROM trace_info WHERE path = '%s';", path)
sqlList = append(sqlList, sql)
if len(sqlList) == 5120 {
fmt.Println("query start")
resp, err = Query(strings.Join(sqlList, " "))
if err != nil {
t.Error(err)
}
if resp.Err != "" {
t.Error(resp.Err)
}
sqlList = make([]string, 0, 5120)
}
}
}
}
}
}
resp, err = Query(strings.Join(sqlList, " "))
if err != nil {
t.Error(err)
}
if resp.Err != "" {
t.Error(resp.Err)
}
*/
}
func getPath(s string) string {
start := strings.Index(s, "path=")
ss := s[start+len("path="):]
end := strings.Index(ss, "sys_name=")
rtn := ss
if end != -1 {
rtn = ss[0 : end-1]
}
rtn = strings.ReplaceAll(rtn, "\\", "")
// rtn = strings.ToLower(rtn)
return rtn
}
var delList = []string{
`get /contractno=`,
`options /variable/`,
`POST /ex/api/qiancheng/`,
`OPTIONS /bill/0/`,
`OPTIONS /installment/`,
`OPTIONS /user/`,
`options /bill/`,
`options /debt/`,
`options /installment/`,
`options /maxreliefamount/`,
`options /user/`,
`options /ex/report/list/`,
`GET /ex/train/order_detail/`,
`get /static/js/`,
`get /clear-proof/condition/`,
}
func isDel(s string) bool {
if strings.HasPrefix(s, "options") {
return true
}
if strings.HasPrefix(s, "OPTIONS") {
return true
}
for _, prefix := range delList {
if strings.HasPrefix(s, prefix) {
return true
}
} }
return false
} }
This diff is collapsed.
...@@ -23,6 +23,10 @@ func (s *Service) AddTrace(path string, trace *Trace) { ...@@ -23,6 +23,10 @@ func (s *Service) AddTrace(path string, trace *Trace) {
s.pathMap[path].AddTrace(trace) s.pathMap[path].AddTrace(trace)
} }
func (s *Service) GetPathMap() map[string]*TraceList {
return s.pathMap
}
//删除path //删除path
func (s *Service) DelPath(path string) { func (s *Service) DelPath(path string) {
delete(s.pathMap, path) delete(s.pathMap, path)
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment