Commit 1525d70c authored by meng.cheng's avatar meng.cheng

添加监控

parent 68ccadcd
...@@ -3,4 +3,6 @@ ...@@ -3,4 +3,6 @@
enoch enoch
go.sum go.sum
test.go test.go
cache* cache*
\ No newline at end of file /enoch.cache_file
/main.cache_file
module git.quantgroup.cn/DevOps/enoch module git.quantgroup.cn/DevOps/enoch
go 1.12 go 1.18
require ( require (
github.com/Shopify/sarama v1.23.1 github.com/Shopify/sarama v1.32.0
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/buaazp/fasthttprouter v0.1.1 github.com/buaazp/fasthttprouter v0.1.1
github.com/gomodule/redigo v2.0.0+incompatible github.com/influxdata/influxdb v1.9.6
github.com/google/go-cmp v0.3.1 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/hashicorp/consul/api v1.2.0
github.com/influxdata/influxdb v1.7.9
github.com/mkevac/debugcharts v0.0.0-20180124214838-d3203a8fa926 github.com/mkevac/debugcharts v0.0.0-20180124214838-d3203a8fa926
github.com/onsi/ginkgo v1.12.0 // indirect github.com/valyala/fasthttp v1.6.0
github.com/onsi/gomega v1.9.0 // indirect github.com/vrg0/go-common v0.0.0-20200413134954-468cfa333e9c
github.com/robfig/cron v1.2.0 go.uber.org/zap v1.21.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
)
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/hashicorp/consul/api v1.12.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/hashicorp/go-hclog v0.12.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/hashicorp/serf v0.9.6 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.14.4 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/shima-park/agollo v1.1.0 // indirect
github.com/shirou/gopsutil v2.19.11+incompatible // indirect github.com/shirou/gopsutil v2.19.11+incompatible // indirect
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
github.com/valyala/fasthttp v1.6.0 github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vrg0/go-common v0.0.0-20191213082238-e4e6080702f1 go.uber.org/atomic v1.7.0 // indirect
go.uber.org/zap v1.13.0 go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.0.0-20200219091948-cb0a6d8edb6c // indirect golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/yaml.v2 v2.4.0 // indirect
) )
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"git.quantgroup.cn/DevOps/enoch/pkg/global" "git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog" "git.quantgroup.cn/DevOps/enoch/pkg/glog"
node_check "git.quantgroup.cn/DevOps/enoch/pkg/node-check" node_check "git.quantgroup.cn/DevOps/enoch/pkg/node-check"
_ "git.quantgroup.cn/DevOps/enoch/pkg/node-up-down"
"git.quantgroup.cn/DevOps/enoch/pkg/points" "git.quantgroup.cn/DevOps/enoch/pkg/points"
"git.quantgroup.cn/DevOps/enoch/pkg/report-form" "git.quantgroup.cn/DevOps/enoch/pkg/report-form"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
...@@ -88,7 +89,7 @@ func isMaster() bool { ...@@ -88,7 +89,7 @@ func isMaster() bool {
} }
//生产环境,但LocalIp不是"172.30.12.22",不执行初始化操作 //生产环境,但LocalIp不是"172.30.12.22",不执行初始化操作
if !global.IsDev() && global.LocalIp != "172.30.12.22" { if !global.IsDev() && global.LocalIp != global.MasterAddress {
return false return false
} }
...@@ -127,13 +128,20 @@ func main() { ...@@ -127,13 +128,20 @@ func main() {
//对外api //对外api
api_server.ListenAndServe() api_server.ListenAndServe()
//time.Sleep(time.Second * 100000)
//创建server
//处理消息(阻塞) //处理消息(阻塞)
handlerKafkaMsg() handlerKafkaMsg()
//阻塞main
//select {}
} }
func init() { func init() {
api_server.HandlerFunc(api_server.GET, "/tech/health/check", healthCheck) api_server.HandlerFunc(api_server.GET, "/tech/health/check", healthCheck)
api_server.HandlerFunc(api_server.HEAD, "/tech/health/check", healthCheck) api_server.HandlerFunc(api_server.HEAD, "/tech/health/check", healthCheck)
} }
func healthCheck(ctx *fasthttp.RequestCtx) { func healthCheck(ctx *fasthttp.RequestCtx) {
......
...@@ -43,6 +43,7 @@ var ( ...@@ -43,6 +43,7 @@ var (
Logger *logger.Logger = nil Logger *logger.Logger = nil
KafkaVersion = sarama.V1_0_0_0 KafkaVersion = sarama.V1_0_0_0
KafkaRecver *kafka.Recver = nil KafkaRecver *kafka.Recver = nil
MasterAddress = ""
InfluxDbAddress = "" InfluxDbAddress = ""
DaoFileCacheDir = "" DaoFileCacheDir = ""
ConsulDc = "" ConsulDc = ""
...@@ -111,6 +112,7 @@ func init() { ...@@ -111,6 +112,7 @@ func init() {
KafkaRecver = kafka.NewRecver(KafkaVersion, strings.Split(kafkaAddress, ","), kafkaLogger) KafkaRecver = kafka.NewRecver(KafkaVersion, strings.Split(kafkaAddress, ","), kafkaLogger)
InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "") InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "")
MasterAddress = Config.GetOrDefault(NamespaceApplication, "master.address", "")
//InfluxDbAddress = "http://172.20.6.33:8086" //InfluxDbAddress = "http://172.20.6.33:8086"
DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var") DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var")
ReportFormDir = Config.GetOrDefault(NamespaceApplication, "report.form.dir", "/var") ReportFormDir = Config.GetOrDefault(NamespaceApplication, "report.form.dir", "/var")
......
package node_up_down
import (
"encoding/json"
"errors"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/api-server"
"git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/valyala/fasthttp"
"github.com/vrg0/go-common/consul-kv"
"github.com/vrg0/go-common/registry"
"strconv"
"strings"
)
func init() {
api_server.HandlerFunc(api_server.POST, "/upstreams/:nginx_upstream_name/targets", manage)
}
//函数改名
func manage(ctx *fasthttp.RequestCtx) {
//参数获取
bodyParam := make(map[string]interface{})
body := ctx.Request.Body()
if err := json.Unmarshal(body, &bodyParam); err != nil {
errStr := fmt.Sprint("服务上下线 Json解析失败: ", string(body), err)
ctx.SetBodyString(errStr)
ctx.SetStatusCode(400)
glog.Warn(errStr)
return
}
sysName, ok1 := ctx.UserValue("nginx_upstream_name").(string)
target, ok2 := bodyParam["target"].(string)
weight, ok3 := bodyParam["weight"].(float64)
if !ok1 || !ok2 || !ok3 {
errStr := fmt.Sprint("服务上下线 参数读取失败失败:", string(body))
ctx.SetBodyString(errStr)
ctx.SetStatusCode(400)
glog.Warn(errStr)
return
}
//强制上下线,默认关闭
force := false
if f, ok := bodyParam["force"].(bool); ok {
force = f
}
//上下线
if weight > 0 {
glog.Info("服务上线开始: ", sysName, " ", target, " force=", force)
} else {
glog.Info("服务下线开始: ", sysName, " ", target, " force=", force)
}
if err := doUpDown(target, sysName, weight, force); err != nil {
ctx.SetBodyString(err.Error())
ctx.SetStatusCode(400)
glog.Warn(err)
return
} else if weight > 0 {
glog.Info("服务上线成功: ", sysName, " ", target, " force=", force)
} else {
glog.Info("服务下线成功: ", sysName, " ", target, " force=", force)
}
}
//节点上下线
func doUpDown(target string, sysName string, weight float64, force bool) error {
endpointList := strings.Split(target, ",")
for _, endpoint := range endpointList {
ipPortStr := strings.Split(endpoint, ":")
if len(ipPortStr) != 2 {
return errors.New("failure of target resolution: " + endpoint)
}
ip := ipPortStr[0]
portStr := ipPortStr[1]
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.New("failure of target resolution: " + portStr)
}
node := registry.NewNode(sysName, endpoint, ip, port)
if weight > 0 {
//获取metadata(非必须,可获取失败)
v, err := consul_kv.GetValue("node:" + sysName + ":" + endpoint)
if err == nil {
meta := make(map[string]string)
if e := json.Unmarshal([]byte(v), &meta); e == nil {
node.Meta = meta
}
}
//上线
if err := registry.Register(node); err != nil {
return errors.New("服务上线失败:" + err.Error())
}
} else {
// 仅剩一个节点的时候不可以下线,强制下线忽略此步骤
if !force {
if service, ok := registry.GetService(node.ServiceName); ok && len(service.NodeMap) == 1 {
if _, ok := service.NodeMap[node.Id]; ok {
return errors.New("服务下线失败:服务仅剩一个可用节点")
}
}
}
//下线
if err := registry.Deregister(node); err != nil {
return errors.New("服务下线失败:" + err.Error())
}
}
}
return nil
}
package node_up_down
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/api-server"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"github.com/buaazp/fasthttprouter"
"github.com/valyala/fasthttp"
"github.com/vrg0/go-common/registry"
"strings"
"testing"
)
func init() {
consulHosts := strings.Split(global.Config.GetOrDefault(global.NamespaceApplication, "consul.address", ""), ",")
fmt.Println(consulHosts)
consulConfig := map[string]interface{}{"dc": "3c", "cluster": consulHosts} //dc固定为3c
if err := registry.Init("consul", consulConfig); err != nil {
panic(err)
}
api_server.ListenAndServe()
}
//正常上线
var nodeUpFlag = false
func TestNodeUp(t *testing.T) {
if nodeUpFlag {
return
} else {
nodeUpFlag = true
}
//开启服务server
router := fasthttprouter.New()
router.Handle("GET", "/tech/health/check", func(ctx *fasthttp.RequestCtx) {
ctx.SetBodyString("Pong!")
})
s := &fasthttp.Server{
Handler: router.Handler,
}
go func() {
_ = s.ListenAndServe(":12345")
}()
//正常服务注册
req := &fasthttp.Request{}
req.SetRequestURI("http://127.0.0.1:5555/upstreams/aaaa/targets")
req.SetBody([]byte(fmt.Sprintf(`{"target":"%s:12345", "weight":11}`, global.LocalIp)))
req.Header.SetContentType("application/json")
req.Header.SetMethod("POST")
resp := &fasthttp.Response{}
client := &fasthttp.Client{}
if err := client.Do(req, resp); err != nil {
t.Error(err)
}
if resp.StatusCode() != 200 {
t.Error(string(resp.Body()))
}
}
//异常测试:预期返回400错误码,Json解析失败
func TestNodeUp2(t *testing.T) {
fasthttpArgs := &fasthttp.Args{} //空body
if code, body, err := fasthttp.Post(nil, "http://127.0.0.1:5555/upstreams/aaaa/targets", fasthttpArgs); err != nil {
t.Error(err)
} else if code != 400 || !strings.Contains(string(body), "Json解析失败") {
t.Error(code, string(body))
}
}
/* 注:此条测试需要 go-common registry 接入后进行
//异常上线:服务启动失败,注册超时
func TestNodeUp3(t *testing.T) {
//正常服务注册
req := &fasthttp.Request{}
req.SetRequestURI("http://127.0.0.1:5555/upstreams/aaaa/targets")
req.SetBody([]byte(fmt.Sprintf(`{"target":"%s:22112", "weight":11}`, global.LocalIp)))
req.Header.SetContentType("application/json")
req.Header.SetMethod("POST")
resp := &fasthttp.Response{}
client := &fasthttp.Client{}
if err := client.Do(req, resp); err != nil {
t.Error(err)
}
if resp.StatusCode() != 400 || !strings.Contains(string(resp.Body()), "timeout"){
t.Error(string(resp.Body()))
}
}
*/
//正常下线
func TestNodeDown(t *testing.T) {
TestNodeUp(t)
//下线
req := &fasthttp.Request{}
req.SetRequestURI("http://127.0.0.1:5555/upstreams/aaaa/targets")
req.SetBody([]byte(fmt.Sprintf(`{"target":"%s:12345", "weight":-11}`, global.LocalIp)))
req.Header.SetContentType("application/json")
req.Header.SetMethod("POST")
resp := &fasthttp.Response{}
client := &fasthttp.Client{}
if err := client.Do(req, resp); err != nil {
t.Error(err)
}
if resp.StatusCode() != 200 {
t.Error(string(resp.Body()))
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment