Commit 84b776f3 authored by jingbo.wang's avatar jingbo.wang

报表 trace OK

parent 2fdc36e6
......@@ -88,7 +88,11 @@ func main() {
//初始化数据库(创建DB,创建连续查询)
dao.DbInit()
//处理消息
//告警策略
//对外api
//处理消息(阻塞)
handlerKafkaMsg()
/*
if quartz {
......
......@@ -72,7 +72,7 @@ func buildTraceInfluxMsg(traceInfo TraceMsg) *client.Point {
}
fields := make(map[string]interface{})
fields["duration"] = traceInfo.Duration / 1000
fields["duration"] = traceInfo.Duration / 1000 //精度为微秒,/1000后为毫秒
fields["traceId"] = traceInfo.TraceId
fields["msg"] = util.BytesString(bytes)
......@@ -81,7 +81,7 @@ func buildTraceInfluxMsg(traceInfo TraceMsg) *client.Point {
tags["path"] = strings.ToLower(traceInfo.Tags.HttpMethod + " " + traceInfo.Tags.HttpPath)
tags["host"] = traceInfo.LocalEndpoint.Ipv4
unix := time.Unix(0, traceInfo.Timestamp*1000)
unix := time.Unix(0, traceInfo.Timestamp*1000) //精度为微秒,*1000后为纳秒
point, err := client.NewPoint("trace_info", tags, fields, unix)
if err != nil {
glog.Error("trace client new point:", err, traceInfo)
......
package report_form
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
"strings"
"time"
)
const (
getServiceInfoSql = `SELECT * FROM %s WHERE sys_name = '%s' AND time >= %d AND time < %d;`
tableTraceInfo = "trace_info"
minTrace = 50 //小于50的调用不计入统计
getServiceListSql = `show tag values from trace_info with key = sys_name`
healthCheck = `/tech/health/check`
)
//获取sql
func getSql(sysName string, table string, start time.Time, end time.Time) string {
return fmt.Sprintf(getServiceInfoSql, table, sysName, start.UnixNano(), end.UnixNano())
}
//获取row的index
func getModRowKeyIdx(key string, row models.Row) (int, bool) {
for i, v := range row.Columns {
if v == key {
return i, true
}
}
return -1, false
}
//查询
func Query(sql string) (*client.Response, error) {
config := client.HTTPConfig{
Addr: global.InfluxDbAddress,
Timeout: time.Second * 30,
}
connect, err := client.NewHTTPClient(config)
defer func() { _ = connect.Close() }()
if err != nil {
return nil, err
}
rtn, err := connect.Query(client.NewQuery(sql, global.InfluxDbName, ""))
if err != nil {
return nil, err
}
return rtn, nil
}
//从db获取服务列表
func GetServiceNameListFromDb() []string {
resp, err := Query(getServiceListSql)
if err != nil {
return []string{}
}
rtn := make([]string, 0)
for _, v := range resp.Results {
for _, row := range v.Series {
index, ok := getModRowKeyIdx("value", row)
if !ok {
continue
}
for _, value := range row.Values {
serviceName, ok := value[index].(string)
if !ok {
continue
}
rtn = append(rtn, serviceName)
}
}
}
return rtn
}
//从db拉取数据
func GetDataFromDb(startTime time.Time, endTime time.Time) *ServiceMap {
serviceNameList := GetServiceNameListFromDb()
serviceMap := NewServiceMap()
for _, serviceName := range serviceNameList {
sql := getSql(serviceName, tableTraceInfo, startTime, endTime)
resp, err := Query(sql)
if err != nil {
continue
}
for _, v := range resp.Results {
for _, row := range v.Series {
traceList := rowToTraceList(row)
for _, trace := range traceList {
serviceMap.AddTrace(serviceName, trace.Path, trace)
}
}
}
}
return serviceMap
}
func rowToTraceList(row models.Row) []*Trace {
//[time duration host msg path sys_name traceId]
rtn := make([]*Trace, 0)
durationIndex, ok := getModRowKeyIdx("duration", row)
if !ok {
return rtn
}
pathIndex, ok := getModRowKeyIdx("path", row)
if !ok {
return rtn
}
timeIndex, ok := getModRowKeyIdx("time", row)
if !ok {
return rtn
}
idIndex, ok := getModRowKeyIdx("traceId", row)
if !ok {
return rtn
}
hostIndex, ok := getModRowKeyIdx("host", row)
if !ok {
return rtn
}
for _, value := range row.Values {
durationRaw, ok := value[durationIndex].(json.Number)
if !ok {
continue
}
durationInt64, err := durationRaw.Int64()
if err != nil {
continue
}
duration := time.Duration(durationInt64) * 1000000 //精度为毫秒,*1000后为纳秒
path, ok := value[pathIndex].(string)
if !ok {
continue
}
pathSplit := strings.Split(path, " ")
if len(pathSplit) != 2 || pathSplit[1] == healthCheck {
continue
}
timeRaw, ok := value[timeIndex].(string)
if !ok {
continue
}
timePoint, err := time.Parse(time.RFC3339, timeRaw)
if err != nil {
continue
}
id, ok := value[idIndex].(string)
if !ok {
continue
}
host := value[hostIndex].(string)
if !ok {
continue
}
rtn = append(rtn, NewTrace(id, host, path, duration, timePoint))
}
return rtn
}
package report_form
import (
"testing"
"time"
)
func TestGetServiceListFromDb(t *testing.T) {
t.Log(GetServiceNameListFromDb())
}
func TestGetDataFromDb(t *testing.T) {
end := time.Now().Add(time.Second * -600)
start := end.AddDate(0, 0, -10)
sm := GetDataFromDb(start, end)
t.Log(sm.GetAverageTime(), sm.GetCount())
}
package report_form
import "time"
type Service struct {
name string
pathMap map[string]*TraceList
}
func NewService(name string) *Service {
return &Service{
name: name,
pathMap: make(map[string]*TraceList),
}
}
//添加trace
func (s *Service) AddTrace(path string, trace *Trace) {
if _, ok := s.pathMap[path]; !ok {
s.pathMap[path] = NewTraceList(path)
}
s.pathMap[path].AddTrace(trace)
}
//删除path
func (s *Service) DelPath(path string) {
delete(s.pathMap, path)
}
//求服务的平均响应时间
func (s *Service) GetAverageTime() time.Duration {
count := s.GetCount()
if count == 0 {
return 0
}
sum := time.Duration(0)
for _, v := range s.pathMap {
for _, vv := range v.traceList {
sum += vv.Duration
}
}
return sum / time.Duration(count)
}
//求服务的访问量
func (s *Service) GetCount() int {
sum := 0
for _, v := range s.pathMap {
sum += v.GetCount()
}
return sum
}
package report_form
import "time"
type ServiceMap struct {
serviceMap map[string]*Service
}
func NewServiceMap() *ServiceMap {
return &ServiceMap{
serviceMap: make(map[string]*Service),
}
}
//添加trace
func (sm *ServiceMap) AddTrace(service string, path string, trace *Trace) {
if _, ok := sm.serviceMap[service]; !ok {
sm.serviceMap[service] = NewService(service)
}
sm.serviceMap[service].AddTrace(path, trace)
}
//获取service
func (sm *ServiceMap) GetService(service string) (*Service, bool) {
s, ok := sm.serviceMap[service]
return s, ok
}
//求服务的平均响应时间
func (sm *ServiceMap) GetAverageTime() time.Duration {
count := sm.GetCount()
if count == 0 {
return 0
}
sum := time.Duration(0)
for _, v := range sm.serviceMap {
for _, vv := range v.pathMap {
for _, vvv := range vv.traceList {
sum += vvv.Duration
}
}
}
return sum / time.Duration(count)
}
//求服务的访问量
func (sm *ServiceMap) GetCount() int {
sum := 0
for _, v := range sm.serviceMap {
sum += v.GetCount()
}
return sum
}
package report_form
import (
"testing"
"time"
)
func TestNewServiceMap(t *testing.T) {
const (
service = "123"
path = "/hello"
host = "1.1.1.1"
)
sm := NewServiceMap()
sm.AddTrace(service, path, NewTrace("1", host, path, 1, time.Now()))
s, ok := sm.GetService(service)
if !ok {
t.Error("can not found service:", service)
}
t.Log(s.GetCount())
}
package report_form
import (
"strconv"
"testing"
"time"
)
func TestNewService(t *testing.T) {
const (
path = "/hello"
host = "1.1.1.1"
)
s := NewService("aaaa")
for i := 1; i <= 10; i++ {
s.AddTrace(path, NewTrace(strconv.Itoa(i), host, path, time.Duration(i), time.Now().Add(time.Duration(i))))
}
t.Log(s.GetAverageTime())
t.Log(s.GetCount())
}
package report_form
import (
"math"
"sort"
"time"
)
type Trace struct {
Id string
Host string
Path string
Duration time.Duration
TimePoint time.Time
}
func NewTrace(id string, host string, path string, duration time.Duration, timePoint time.Time) *Trace {
return &Trace{
Id: id,
Host: host,
Path: path,
Duration: duration,
TimePoint: timePoint,
}
}
type TraceList struct {
Path string
traceList []*Trace
isSort bool
}
func NewTraceList(path string) *TraceList {
return &TraceList{
Path: path,
traceList: make([]*Trace, 0),
}
}
func (tl *TraceList) AddTrace(trace *Trace) {
tl.traceList = append(tl.traceList, trace)
tl.isSort = false
}
func (tl *TraceList) GetTraceList() []*Trace {
tl.sortTime()
return tl.traceList
}
func (tl *TraceList) GetPath() string {
return tl.Path
}
//排序
func (tl *TraceList) sortTime() {
if tl.isSort {
return
}
sort.Slice(tl.traceList, func(i, j int) bool {
if tl.traceList[i].Duration < tl.traceList[j].Duration {
return false
}
return true
})
tl.isSort = true
}
//获取平均值
func (tl *TraceList) GetAverageTime() time.Duration {
if len(tl.traceList) == 0 {
return 0
}
sum := time.Duration(0)
for _, v := range tl.traceList {
sum += v.Duration
}
return sum / time.Duration(len(tl.traceList))
}
//获取中位trace
func (tl *TraceList) GetMediaTrace() (*Trace, bool) {
if len(tl.traceList) == 0 {
return nil, false
}
tl.sortTime()
return tl.traceList[len(tl.traceList)/2], true
}
//获取最大trace
func (tl *TraceList) GetMaxTrace() (*Trace, bool) {
if len(tl.traceList) == 0 {
return nil, false
}
tl.sortTime()
return tl.traceList[0], true
}
//获取最小trace
func (tl *TraceList) GetMinTrace() (*Trace, bool) {
if len(tl.traceList) == 0 {
return nil, false
}
tl.sortTime()
return tl.traceList[len(tl.traceList)-1], true
}
//获取请求次数
func (tl *TraceList) GetCount() int {
return len(tl.traceList)
}
//标准差
//注:标准差越大说明分布越不稳定
func (tl *TraceList) GetStandardDeviation() float64 {
if len(tl.traceList) == 0 {
return 0
}
average := tl.GetAverageTime()
sum := time.Duration(0)
for _, v := range tl.traceList {
sum += (average - v.Duration) * (average - v.Duration)
}
return math.Sqrt(float64(sum))
}
package report_form
import (
"strconv"
"testing"
"time"
)
func TestNewTraceList(t *testing.T) {
const (
path = "/hello"
host = "1.1.1.1"
)
tl := NewTraceList(path)
for i := 1; i <= 10; i++ {
tl.AddTrace(NewTrace(strconv.Itoa(i), host, path, time.Duration(i), time.Now().Add(time.Duration(i))))
}
t.Log(tl.GetCount())
t.Log(tl.GetAverageTime())
t.Log(tl.GetMaxTrace())
t.Log(tl.GetMinTrace())
t.Log(tl.GetPath())
t.Log(tl.GetStandardDeviation())
t.Log(tl.GetMediaTrace())
t.Log(tl.GetTraceList())
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment