Commit e9cfb095 authored by jingbo.wang's avatar jingbo.wang

监控节点的健康状态,并且dingding告警

parent a9ddbfc4
...@@ -2,12 +2,13 @@ package main ...@@ -2,12 +2,13 @@ package main
import ( import (
"fmt" "fmt"
api_server "git.quantgroup.cn/DevOps/enoch/pkg/api-server" "git.quantgroup.cn/DevOps/enoch/pkg/api-server"
"git.quantgroup.cn/DevOps/enoch/pkg/dao" "git.quantgroup.cn/DevOps/enoch/pkg/dao"
"git.quantgroup.cn/DevOps/enoch/pkg/global" "git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog" "git.quantgroup.cn/DevOps/enoch/pkg/glog"
node_check "git.quantgroup.cn/DevOps/enoch/pkg/node-check"
"git.quantgroup.cn/DevOps/enoch/pkg/points" "git.quantgroup.cn/DevOps/enoch/pkg/points"
report_form "git.quantgroup.cn/DevOps/enoch/pkg/report-form" "git.quantgroup.cn/DevOps/enoch/pkg/report-form"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
_ "github.com/mkevac/debugcharts" _ "github.com/mkevac/debugcharts"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
...@@ -79,6 +80,7 @@ func handlerKafkaMsg() { ...@@ -79,6 +80,7 @@ func handlerKafkaMsg() {
} }
} }
//TODO 可优化成Raft算法,目前是固定节点的
func isMaster() bool { func isMaster() bool {
//开发环境,但InfluxDb的Ip是生产环境ip的时候,不执行初始化操作 //开发环境,但InfluxDb的Ip是生产环境ip的时候,不执行初始化操作
if global.IsDev() && strings.Contains(global.InfluxDbAddress, "172.16") { if global.IsDev() && strings.Contains(global.InfluxDbAddress, "172.16") {
...@@ -116,10 +118,10 @@ func main() { ...@@ -116,10 +118,10 @@ func main() {
report_form.RegularReport(global.ReportFormDir) report_form.RegularReport(global.ReportFormDir)
//每周1早10点发送邮件 //每周1早10点发送邮件
report_form.RegularMail(global.ReportFormDir) report_form.RegularMail(global.ReportFormDir)
//节点健康状态检查
node_check.NodeHealthCheckAndNotify()
//TODO 告警策略 //TODO 告警策略
//TODO node状态监控
} }
//对外api //对外api
......
...@@ -81,8 +81,11 @@ func init() { ...@@ -81,8 +81,11 @@ func init() {
} }
kvMap := make(map[string]string) kvMap := make(map[string]string)
for k, v := range result.Details { for k, v := range result.Details {
if _, ok := v.(string); ok { if vString, ok := v.(string); ok {
kvMap[k] = v.(string) kvMap[k] = vString
}
if vFloat64, ok := v.(float64); ok {
kvMap[k] = strconv.Itoa(int(vFloat64))
} }
} }
Config.RefreshKvMap(kvMap) Config.RefreshKvMap(kvMap)
...@@ -122,11 +125,13 @@ func init() { ...@@ -122,11 +125,13 @@ func init() {
Logger.Error("get must conf error: application consul.datacenter") Logger.Error("get must conf error: application consul.datacenter")
} else { } else {
ConsulDc = consulDc ConsulDc = consulDc
Logger.Debug("consul dc", ConsulDc)
} }
if consulAddress, ok := Config.Get(NamespaceApplication, "consul.address"); !ok { if consulAddress, ok := Config.Get(NamespaceApplication, "consul.address"); !ok {
Logger.Error("get must conf error: application consul.address") Logger.Error("get must conf error: application consul.address")
} else { } else {
ConsulAddress = consulAddress ConsulAddress = consulAddress
Logger.Debug("consul address", ConsulAddress)
} }
consulCluster := strings.Split(ConsulAddress, ",") consulCluster := strings.Split(ConsulAddress, ",")
if e := registry.Init("consul", map[string]interface{}{"dc": ConsulDc, "cluster": consulCluster}); e != nil { if e := registry.Init("consul", map[string]interface{}{"dc": ConsulDc, "cluster": consulCluster}); e != nil {
......
package node_check package node_check
func NodeHealthCheckAndNotify() {
}
/*
import ( import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog" "git.quantgroup.cn/DevOps/enoch/pkg/glog"
"github.com/vrg0/go-common/notify"
"github.com/vrg0/go-common/registry" "github.com/vrg0/go-common/registry"
"strings"
"sync" "sync"
) )
const ( var (
Passing = "passing" notifyDingDing = newNotifyDingDing()
Critical = "critical"
) )
//节点健康状态检查和告警 func newNotifyDingDing() *notify.Notify {
cfgStr := global.Config.GetOrDefault(global.NamespaceApplication, "notify.dingding", `[]`)
dstList := make([]string, 0)
_ = json.Unmarshal([]byte(cfgStr), &dstList)
return notify.New(dstList)
}
func NodeHealthCheckAndNotify() { func NodeHealthCheckAndNotify() {
//设置观察者节点 //设置观察者节点
if err := registry.SetObserver(&watch{}); err != nil { if err := registry.SetObserver(newWatch()); err != nil {
glog.Info("设置观察者节点失败", err) glog.Info("设置观察者节点失败", err)
return return
} }
} }
//registry观察者,观察服务的状态,当节点挂掉时告警,当服务挂掉时告警
type watch struct { type watch struct {
serviceMap map[string]*registry.Service serviceMap map[string]*registry.Service
serviceMapLock *sync.Mutex serviceMapLock *sync.Mutex
} }
func newWatch() *watch {
return &watch{
serviceMap: make(map[string]*registry.Service),
serviceMapLock: new(sync.Mutex),
}
}
func (w watch) DeleteService(serviceName string) { func (w watch) DeleteService(serviceName string) {
w.serviceMapLock.Lock() w.serviceMapLock.Lock()
defer w.serviceMapLock.Unlock() defer w.serviceMapLock.Unlock()
//服务下线
glog.Info("服务下线:", serviceName) glog.Info("服务下线:", serviceName)
//删除服务 //删除服务
...@@ -43,220 +58,69 @@ func (w watch) UpdateNodes(service *registry.Service) { ...@@ -43,220 +58,69 @@ func (w watch) UpdateNodes(service *registry.Service) {
w.serviceMapLock.Lock() w.serviceMapLock.Lock()
defer w.serviceMapLock.Unlock() defer w.serviceMapLock.Unlock()
if s, ok := w.serviceMap[service.Name]; !ok { //状态检查&告警
glog.Info("服务上线:", service.Name) w.statusCheckAndNotify(service)
} else {
for id, node := range service.NodeMap {
if node.Status == Critical {
}
}
}
//更新服务状态 //更新本地缓存的节点状态
w.serviceMap[service.Name] = service w.serviceMap[service.Name] = service
} }
*/ func (w watch) statusCheckAndNotify(service *registry.Service) {
/* //服务信息初始化
type watch struct{} //服务不存在
func (w watch) DeleteService(serviceName string) { if _, ok := w.serviceMap[service.Name]; !ok {
//pass glog.Info("服务信息初始化:", service.Name)
servicesStatusLock.Lock() return
defer servicesStatusLock.Unlock() }
//服务存在,但所有节点都是critical 或 没有节点
delete(servicesStatus, serviceName) if service, ok := w.serviceMap[service.Name]; ok {
} allNodeCritical := true
func (w watch) UpdateNodes(service *registry.Service) {
servicesStatusLock.Lock()
defer servicesStatusLock.Unlock()
//单个节点挂了告警
if oldService, ok := servicesStatus[service.Name]; ok {
for _, node := range service.NodeMap { for _, node := range service.NodeMap {
if oldNode, ok := oldService.NodeMap[node.Id]; ok { if node.Status == registry.Passing {
if oldNode.Status == Passing && node.Status == Critical { allNodeCritical = false
logger.Warning.Print(service.Name, " ", node.Id, "---!!!node critical!!!---") break
if _, ok := IgnoreServiceMap[service.Name]; !ok {
_ = dingding.SenderDingDing(service.Name+" "+node.Id+" "+"---!!!node critical!!!---", dingding.DefaultDingURL)
}
}
} }
} }
} if allNodeCritical {
glog.Info("服务信息初始化:", service.Name)
//整个服务挂了告警 return
//如果 服务存在,并且服务的old状态为passing,并且服务的now状态为critical,则报警,否贼记录服务状态
serviceString := serviceStr(service)
if oldService, ok := servicesStatus[service.Name]; ok && serviceStatus(oldService) && !serviceStatus(service) {
logger.Warning.Print(serviceString, "---!!!service critical!!!---")
if _, ok := IgnoreServiceMap[service.Name]; !ok {
_ = dingding.SenderDingDing(serviceString+"---!!!service critical!!!---", dingding.DefaultDingURL)
} }
handler(service.Name)
} else {
logger.Info.Print(serviceString)
} }
//更新服务状态 //服务挂掉告警(服务的全部节点都挂掉)
//深拷贝对象 if len(service.NodeMap) != 0 {
newService := registry.NewService(service.Name) serviceUp := false
for kk, vv := range service.NodeMap { for _, node := range service.NodeMap {
newNode := registry.Node{ if node.Status == registry.Passing {
ServiceName:vv.ServiceName, serviceUp = true
Id:vv.Id,
Port:vv.Port,
Address:vv.Address,
Status:vv.Status,
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
newService.NodeMap[kk] = &newNode
}
servicesStatus[service.Name] = newService
}
*/
/*
//节点的健康状态告警
func NodeCheck() {
//服务状态初始化
InitServiceStatus()
//设置观察者节点
if e := registry.SetObserver(&watch{}); e != nil {
glog.Info("设置观察者节点失败", e)
return
}
}
func InitServiceStatus() {
if s, ok := registry.GetServiceMap(); ok {
servicesStatus = s
}
}
*/
/*
var HandlerMap = new(sync.Map)
var HttpGetRetryCount = 3
var HttpTimeOut = time.Second * 10
var IgnoreServiceMap = make(map[string]struct{})
func init() {
//TODO 灵活配置
HandlerMap.Store("aaaa", "http://www.baidasdfasdfasdfasdfu.com/")
HandlerMap.Store("heimdallr", "http://172.20.6.33:8989/service-down")
// HandlerMap.Store("aaaa", "http://172.20.6.33:8989/service-down")
//TODO 灵活配置
IgnoreServiceMap["vcc-talos"] = struct{}{}
IgnoreServiceMap["aaaa"] = struct{}{}
}
func httpGet(url string, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithCancel(context.TODO())
req, e := http.NewRequest("GET", url, nil)
if e != nil {
return nil, e
} else {
req = req.WithContext(ctx)
}
_ = time.AfterFunc(timeout, func() {
cancel()
})
return http.DefaultClient.Do(req)
}
func handler(serviceName string) {
if url, ok := HandlerMap.Load(serviceName); ok {
for i := 0; i < HttpGetRetryCount; i++ {
if resp, e := httpGet(url.(string), HttpTimeOut); e != nil {
logger.Error.Print(" handler service: ", serviceName, " ", e)
} else {
logger.Info.Print(" handler service: ", serviceName, " ", resp.StatusCode)
break break
} }
} }
} else {
logger.Info.Print(" handler service: ", serviceName, " ", "not found handler hook api")
}
}
func serviceStatus(service *registry.Service) bool { if !serviceUp {
for _, node := range service.NodeMap { sb := new(strings.Builder)
if node.Status == Passing { sb.WriteString(fmt.Sprintf("服务健康状态异常:%s", service.Name))
return true for _, node := range service.NodeMap {
sb.WriteString(fmt.Sprintf(" %s:%s", node.Id, node.Status))
}
sb.WriteString("\n")
glog.Warn(sb.String())
notifyDingDing.SendText(sb.String())
return
} }
} }
return false //节点挂掉告警(服务的部分节点挂掉)
}
func serviceStr(service *registry.Service) string {
rtn := service.Name + " "
for _, node := range service.NodeMap { for _, node := range service.NodeMap {
rtn += node.Id + ":" + node.Status + " " //如果节点状态从passing变为其他,则告警
} if oldService, ok := w.serviceMap[node.ServiceName]; ok {
if oldNode, ok := oldService.NodeMap[node.Id]; ok {
return rtn if oldNode.Status == registry.Passing && node.Status != registry.Passing {
} s := fmt.Sprintf("服务节点健康状态异常:%s %s:%s", service.Name, node.Id, node.Status)
glog.Warn(s)
notifyDingDing.SendText(s)
func (w watch) AddNode(node *registry.Node) { }
//pass }
}
func (w watch) DelNode(node *registry.Node) {
//pass
}
var (
servicesStatus = make(map[string]*registry.Service)
servicesStatusLock = new(sync.Mutex)
)
func InitServiceStatus() {
servicesStatus = registry.GetServiceMap()
}
func NodeCheck() {
defer func() {
if e := recover(); e != nil {
logger.Info.Print("node check panic: ", e)
_ = dingding.SenderDingDing("node check panic!", dingding.DefaultDingURL)
time.Sleep(time.Second * 1)
NodeCheck()
} }
}()
//注册器初始化
dc := "3c"
cluster := []string{"172.30.12.2:8500", "172.30.12.3:8500", "172.30.12.4:8500"}
if e := registry.Init("consul", map[string]interface{}{"dc": dc, "cluster": cluster}); e != nil {
logger.Info.Print("registry init error:", e)
os.Exit(-1)
} }
time.Sleep(time.Second * 1)
//服务状态初始化
InitServiceStatus()
//设置观察者
if e := registry.SetObserver("watch", &watch{}); e != nil {
logger.Info.Print("set observer error:", e)
os.Exit(-1)
}
select {}
} }
*/
package node_check package node_check
/*
import "testing" import "testing"
func TestNodeCheck(t *testing.T) { func TestNodeCheck(t *testing.T) {
go NodeCheck() NodeHealthCheckAndNotify()
select{} select {}
} }
*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment