Commit 0dce77d1 authored by jingbo.wang's avatar jingbo.wang

性能监控方案

parent 514b60fd
......@@ -9,7 +9,6 @@ require (
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/envoyproxy/go-control-plane v0.8.6 // indirect
github.com/gogo/googleapis v1.3.0 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/consul v1.4.0
github.com/hashicorp/go-discover v0.0.0-20190905142513-34a650575f6c // indirect
......@@ -20,6 +19,7 @@ require (
github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect
github.com/json-iterator/go v1.1.7
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/mkevac/debugcharts v0.0.0-20180124214838-d3203a8fa926
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/shirou/gopsutil v2.19.10+incompatible // indirect
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
......
......@@ -6,6 +6,8 @@ import (
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"git.quantgroup.cn/DevOps/enoch/pkg/points"
"github.com/Shopify/sarama"
_ "github.com/mkevac/debugcharts"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
......@@ -13,11 +15,8 @@ import (
"time"
)
func main() {
glog.Info(global.AppName, "启动")
//TODO 待完善
db := dao.New(global.BatchSize, time.Second*60, "http://", "/Users/fengjunkai/llog")
func handlerKafkaMsg() {
db := dao.New(global.BatchSize, time.Second*60, global.InfluxDbAddress, global.DaoFileCacheDir)
//处理调用链条信息
braveTopic := global.Config.GetOrDefault(global.NamespaceTechSleuth, "tech.brave.kafkaTopic", "")
......@@ -51,6 +50,7 @@ func main() {
glog.Fatal("监听kafka recver失败!")
}
glog.Info(global.AppName + "启动")
//平滑退出
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
......@@ -70,38 +70,26 @@ func main() {
}
glog.Info(global.AppName + "平滑退出")
}
}
/*
var quartz bool
var denv string
var didc string
flag.BoolVar(&quartz, "quartz", false, "quartz")
flag.StringVar(&denv, "env", "dev", "环境")
flag.StringVar(&didc, "idc", "local", "机房")
flag.Parse()
conf.Load(denv, didc)
file, err := os.OpenFile("quantgroup.log", os.O_RDWR|os.O_CREATE, 0666)
defer func() { _ = file.Close() }()
if err != nil {
log.Fatalln("create file error", err)
func main() {
//性能监控
go func() {
defer func() {
if err := recover(); err != nil {
glog.Error(err)
}
}()
if err := http.ListenAndServe(":9999", nil); err != nil {
glog.Error(err)
}
}()
file_cache.Load(conf.GlobalConfig.FileCachePath)
file_cache.RegisterJob(consumer.ReSubmit)
go file_cache.Delete()
port := conf.GlobalConfig.Port
logger := log.New(file, "[Info]", log.LstdFlags|log.Llongfile)
logger.Println(conf.GlobalConfig.AppName + "项目启动, port:" + port + ",环境:" + conf.GlobalConfig.Env)
defer logger.Println("项目结束")
//初始化redis连接池
data.RedisPoolInit()
intPort, _ := strconv.Atoi(port)
//更新连续查询
//处理消息
handlerKafkaMsg()
/*
if quartz {
log.Println("启动定时任务")
job.AutoEmailPerformInfo()
......
......@@ -176,6 +176,13 @@ func (d *Dao) run() {
}
func (d *Dao) writeDb(pointList []*client.Point) error {
defer func() {
if err := recover(); err != nil {
glog.Error("pointList panic!", err)
return
}
}()
config := client.HTTPConfig{
Addr: d.dbAddress,
Timeout: time.Second * 10,
......@@ -204,6 +211,13 @@ func (d *Dao) writeDb(pointList []*client.Point) error {
}
func (d *Dao) writeFile(pointList []*client.Point) error {
defer func() {
if err := recover(); err != nil {
glog.Error("writeFile panic!", err)
return
}
}()
cachePointList := make([]CachePoint, 0)
for _, point := range pointList {
cachePointList = append(cachePointList, NewPoint(point))
......@@ -229,6 +243,13 @@ func (d *Dao) writeFile(pointList []*client.Point) error {
}
func (d *Dao) batchWrite(pointList []*client.Point) {
defer func() {
if err := recover(); err != nil {
glog.Error("batch write panic!", err)
return
}
}()
d.wg.Add(1)
defer d.wg.Done()
......
......@@ -41,6 +41,8 @@ var (
Logger *logger.Logger = nil
KafkaVersion = sarama.V1_0_0_0
KafkaRecver *kafka.Recver = nil
InfluxDbAddress = ""
DaoFileCacheDir = ""
)
type EosResult struct {
......@@ -96,6 +98,9 @@ func init() {
Logger.Info("kafkaAddress:", kafkaAddress)
}
KafkaRecver = kafka.NewRecver(KafkaVersion, strings.Split(kafkaAddress, ","), kafkaLogger)
InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "")
DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var")
}
func getLoggerLevel(level string) zapcore.Level {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment