Commit 99f2d204 authored by jingbo.wang's avatar jingbo.wang

列表,查询path ok

parent a3b022c4
package main
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/dao"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
......@@ -16,6 +17,7 @@ import (
)
func handlerKafkaMsg() {
fmt.Println(global.InfluxDbAddress)
db := dao.New(global.BatchSize, time.Second*60, global.InfluxDbAddress, global.DaoFileCacheDir)
//处理调用链条信息
......
......@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/gob"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/influxdata/influxdb/client/v2"
......@@ -102,10 +101,6 @@ func (d *Dao) flashFileCache() {
continue
}
for _, v := range cachePointList {
fmt.Println(v)
}
pointList := make([]*client.Point, 0)
for _, cachePoint := range cachePointList {
point, err := client.NewPoint(cachePoint.Name, cachePoint.Tags, cachePoint.Fields, cachePoint.Time)
......
......@@ -47,6 +47,7 @@ var (
"put": {},
"delete": {},
"patch": {},
"head": {},
}
)
......@@ -94,7 +95,7 @@ func buildTraceInfluxMsg(traceInfo TraceMsg) *client.Point {
tags := make(map[string]string)
tags["sys_name"] = traceInfo.LocalEndpoint.ServiceName
if _, ok := methodMap[strings.ToLower(traceInfo.Tags.HttpMethod)]; ok {
if _, ok := methodMap[strings.ToLower(traceInfo.Name)]; ok {
tags["path"] = strings.ToLower(traceInfo.Tags.HttpMethod + " " + traceInfo.Tags.HttpPath)
} else {
tags["path"] = strings.ToLower(traceInfo.Name)
......
sys_name=black-hole:path=get\ /contractno\=lhpgdxd231231t16e88bb63a418482250090
sys_name=black-hole:path=get\ /contractno\=lhpgdxd231231t16e88bb63a418482250090&templateid\=145
sys_name=black-hole:path=options\ /variable/142
sys_name=cash-loan-flow:path=POST\ /ex/api/qiancheng/lhp%20%20%20
sys_name=cuishou:path=OPTIONS\ /bill/0/10008645
sys_name=cuishou:path=OPTIONS\ /installment/0/100005599
sys_name=cuishou:path=OPTIONS\ /user/10008645
sys_name=cuishou:path=options\ /bill/0/10014221
sys_name=cuishou:path=options\ /maxreliefamount/1/131
sys_name=cuishou:path=options\ /installment/0/102101210
sys_name=cuishou:path=options\ /debt/1/11673039
sys_name=cuishou:path=options\ /user/10014221
sys_name=gyxd:path=options\ /ex/report/list/5028
sys_name=xyqb-mall:path=GET\ /ex/train/order_detail/trainmall1483586942176
sys_name=xyqb:path=POST\ /clear-proof/condition/100254929
sys_name=xyqb-user2:path=get\ /static/js/0.30b581b8c33f83f35112.js
sys_name=xyqb:path=get\ /clear-proof/condition/102404530
sys_name=xyqb:path=post\ /clear-proof/condition/100827030
package report_form
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"time"
)
type TracePoint struct {
ServiceName string
Path string
TraceId string
Duration time.Duration
Timestamp time.Time
}
type Path struct {
startTime time.Time //开始时间
endTime time.Time //结束时间
serviceName string //服务名称
path string //路径
count int //访问次数
averageDuration time.Duration //平均响应时间
sumDuration time.Duration //总响应时间
medianDuration time.Duration //中位响应时间
variance float64 //标准差
maxDurationTracePoint TracePoint //最大响应时间的TracePoint
}
//创建Path对象,会访问数据库
func NewPath(serviceName string, path string, startTime time.Time, endTime time.Time) (*Path, bool) {
rtn := &Path{
startTime: startTime,
endTime: endTime,
serviceName: serviceName,
path: path,
}
//初始化,备注:顺序存在依赖关系
rtn.initCount()
rtn.initSumDuration()
rtn.initAverageDuration()
rtn.initMedianDuration()
rtn.initMaxDurationTracePoint()
return rtn, true
}
//初始化访问次数
func (p *Path) initCount() {
const sqlGetCount = `SELECT count("traceId") AS count FROM trace_info ` +
`WHERE sys_name = '%s' AND path = '%s' AND time >= %d AND time < %d;`
sql := fmt.Sprintf(sqlGetCount, p.serviceName, p.path, p.startTime.UnixNano(), p.endTime.UnixNano())
glog.Debug(sql)
count, ok := queryOneAndToInt(sql)
if !ok {
glog.Error("init count query one")
return
}
p.count = count
}
//初始化总响应时间
func (p *Path) initSumDuration() {
const sqlGetAverageDuration = `SELECT sum("duration") AS average FROM trace_info ` +
`WHERE sys_name = '%s' AND path = '%s' AND time >= %d AND time < %d;`
sql := fmt.Sprintf(sqlGetAverageDuration, p.serviceName, p.path, p.startTime.UnixNano(), p.endTime.UnixNano())
glog.Debug(sql)
sum, ok := queryOneAndToInt(sql)
if !ok {
glog.Error("init average query one")
return
}
p.sumDuration = time.Duration(sum * 1e6) //毫秒转纳秒
}
//初始化平均时间
func (p *Path) initAverageDuration() {
//当没有数据时,直接返回
if p.count == 0 {
return
}
p.averageDuration = p.sumDuration / time.Duration(p.count)
}
//初始化时间中位响应时间
func (p *Path) initMedianDuration() {
const sqlGetMedianDuration = `SELECT median("duration") AS media FROM trace_info ` +
`WHERE sys_name = '%s' AND path = '%s' AND time >= %d AND time < %d;`
sql := fmt.Sprintf(sqlGetMedianDuration, p.serviceName, p.path, p.startTime.UnixNano(), p.endTime.UnixNano())
glog.Debug(sql)
median, ok := queryOneAndToInt(sql)
if !ok {
glog.Error("init median query one")
return
}
p.medianDuration = time.Duration(median * 1e6)
}
//初始化时间最大响应TracePoint
func (p *Path) initMaxDurationTracePoint() {
const sqlGetMaxDurationTracePoint = `SELECT top("duration", "traceId", 1) AS max FROM trace_info ` +
`WHERE sys_name = '%s' AND path = '%s' AND time >= %d AND time < %d;`
sql := fmt.Sprintf(sqlGetMaxDurationTracePoint, p.serviceName, p.path, p.startTime.UnixNano(), p.endTime.UnixNano())
glog.Debug(sql)
req, err := query(sql)
if err != nil {
glog.Error("init max duration trace point:", err)
return
}
if len(req.Results) != 1 {
return
}
resp := req.Results[0]
if len(resp.Series) != 1 {
return
}
row := resp.Series[0]
if len(row.Values) != 1 {
return
}
idxTime, ok1 := getModRowKeyIdx("time", row)
idxMaxDuration, ok2 := getModRowKeyIdx("max", row)
idxTraceId, ok3 := getModRowKeyIdx("traceId", row)
if !ok1 || !ok2 || !ok3 {
return
}
//时间戳转换
time3339, ok := row.Values[0][idxTime].(string)
if !ok {
return
}
timestamp, err := time.Parse(time.RFC3339, time3339)
if err != nil {
return
}
//最大时间
maxDurationJson, ok := row.Values[0][idxMaxDuration].(json.Number)
if !ok {
return
}
maxDurationInt64, err := maxDurationJson.Int64()
duration := time.Duration(maxDurationInt64 * 1e6) //毫秒转纳秒
if err != nil {
return
}
//traceId
traceId, ok := row.Values[0][idxTraceId].(string)
if !ok {
return
}
p.maxDurationTracePoint = TracePoint{
ServiceName: p.serviceName,
Path: p.path,
TraceId: traceId,
Duration: duration,
Timestamp: timestamp,
}
}
//获取开始时间
func (p *Path) GetStartTime() time.Time {
return p.startTime
}
//获取结束时间
func (p *Path) GetEndTime() time.Time {
return p.endTime
}
//获取服务名称
func (p *Path) GetServiceName() string {
return p.serviceName
}
//获取路径
func (p *Path) GetPath() string {
return p.path
}
//获取访问次数
func (p *Path) GetCount() int {
return p.count
}
//获取总响应时间
func (p *Path) GetSumDuration() time.Duration {
return p.sumDuration
}
//获取平均响应时间
func (p *Path) GetAverageDuration() time.Duration {
return p.averageDuration
}
//获取最大响应时间的TracePoint
func (p *Path) GetMaxDurationTracePoint() TracePoint {
return p.maxDurationTracePoint
}
//获取中位响应时间的TracePoint
func (p *Path) GetMedianDuration() time.Duration {
return p.medianDuration
}
//获取标准差
func (p *Path) GetVariance() float64 {
return p.variance
}
package report_form
import (
"testing"
"time"
)
func TestNewPath(t *testing.T) {
p, ok := NewPath("cuishou", "head /tech/health/check", time.Now().Add(time.Hour*-100), time.Now())
if !ok {
t.Error("new path !ok")
} else {
t.Log(p.GetCount())
t.Log(p.GetSumDuration())
t.Log(p.GetAverageDuration())
t.Log(p.GetMedianDuration())
t.Log(p.GetMaxDurationTracePoint())
}
}
package report_form
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
)
//查询
func query(sql string) (*client.Response, error) {
config := client.HTTPConfig{
Addr: global.InfluxDbAddress,
}
connect, err := client.NewHTTPClient(config)
defer func() { _ = connect.Close() }()
if err != nil {
return nil, err
}
rtn, err := connect.Query(client.NewQuery(sql, global.InfluxDbName, ""))
if err != nil {
return nil, err
}
return rtn, nil
}
//获取row的index
func getModRowKeyIdx(key string, row models.Row) (int, bool) {
for i, v := range row.Columns {
if v == key {
return i, true
}
}
return -1, false
}
//查询单个结果
func queryOne(sql string) (interface{}, bool) {
resp, err := query(sql)
if err != nil {
glog.Error("query One:", err)
return nil, false
}
//过滤结果
if len(resp.Results) != 1 {
return nil, false
}
res := resp.Results[0]
if len(res.Series) != 1 {
return nil, false
}
row := res.Series[0]
if len(row.Values) != 1 {
return nil, false
}
v := row.Values[0]
if len(v) != 2 {
return nil, false
}
return v[1], true
}
//json.Number格式转换
func jsonNumberToInt(i interface{}) (int, bool) {
j, ok := i.(json.Number)
if !ok {
return -1, false
}
rtn64, err := j.Int64()
if err != nil {
return -1, false
}
return int(rtn64), true
}
//查询并且转换成int
func queryOneAndToInt(sql string) (int, bool) {
req, ok := queryOne(sql)
if !ok {
return -1, false
}
rtn, ok := jsonNumberToInt(req)
if !ok {
return -1, false
}
return rtn, true
}
package report_form
/*
import (
"encoding/json"
"fmt"
......@@ -200,3 +201,4 @@ func GetGeneralTable(startTime time.Time, endTime time.Time) {
fmt.Print(rtn)
}
*/
package report_form
/*
import (
"fmt"
"strings"
"testing"
"time"
)
......@@ -48,7 +49,10 @@ func TestPath(t *testing.T) {
}
}
fmt.Println(sum)
/*
}
*/
/*
resp, err = Query(strings.Join(sqlList, " "))
if err != nil {
t.Error(err)
......@@ -56,9 +60,9 @@ func TestPath(t *testing.T) {
if resp.Err != "" {
t.Error(resp.Err)
}
*/
*/
/*
/*
sqlList := make([]string, 0, 5120)
for _, v := range resp.Results {
for _, row := range v.Series {
......@@ -96,9 +100,9 @@ func TestPath(t *testing.T) {
if resp.Err != "" {
t.Error(resp.Err)
}
*/
}
*/
/*
func getPath(s string) string {
start := strings.Index(s, "path=")
ss := s[start+len("path="):]
......@@ -144,3 +148,4 @@ func isDel(s string) bool {
}
return false
}
*/
This diff is collapsed.
package report_form
/*
import "time"
type Service struct {
......@@ -68,3 +69,4 @@ func (s *Service) GetCount() int {
}
return sum
}
*/
package report_form
/*
import "time"
type ServiceMap map[string]*Service
......@@ -50,3 +51,4 @@ func (sm ServiceMap) GetCount() int {
}
return sum
}
*/
package report_form
/*
import (
"testing"
"time"
......@@ -19,3 +20,4 @@ func TestNewServiceMap(t *testing.T) {
}
t.Log(s.GetCount())
}
*/
package report_form
/*
import (
"strconv"
"testing"
......@@ -21,3 +22,4 @@ func TestNewService(t *testing.T) {
t.Log(s.GetAverageTime())
t.Log(s.GetCount())
}
*/
package report_form
/*
import (
"math"
"sort"
......@@ -131,3 +132,4 @@ func (tl *TraceList) GetStandardDeviation() float64 {
return math.Sqrt(float64(sum))
}
*/
package report_form
/*
import (
"strconv"
"testing"
......@@ -26,3 +27,4 @@ func TestNewTraceList(t *testing.T) {
t.Log(tl.GetMediaTrace())
t.Log(tl.GetTraceList())
}
*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment