Commit 736aad88 authored by vrg0's avatar vrg0

删除废弃代码

parent b6c55ef5
package alarm
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"github.com/vrg0/go-common/logger"
"io/ioutil"
)
func Load() {
config,err := ioutil.ReadFile(conf.GlobalConfig.StrategyConfPath)
if err != nil {
logger.Error("!!!轮训监控,未找到配置文件!!!")
return
}
strategies := make([]Strategy, 0)
err = json.Unmarshal(config, &strategies)
if err != nil {
logger.Error("!!!策略文件格式错误:%s!!!\n", err)
return
}
if !CheckArray(strategies) {
logger.Error("!!!策略文件未通过校验!!!")
return
}
sql := BuildSql(strategies)
result := data.QueryMonitor(sql)
DealResult(result, strategies)
}
package alarm
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/util"
"github.com/vrg0/go-common/logger"
"net/http"
"strings"
"sync"
"time"
)
var SenderWatcher sync.Map
func Sender(target []string, title string, info string, notice Notice) {
uniqueKey := buildUniqueKey(target)
if !isExpired(uniqueKey, notice.Interval) { //小于报警周期
return
}
info = uniqueKey + info
switch notice.Sender {
case DING_DING:
senderDingDing(title, info, notice.Receiver)
break
case MAIL:
senderMail(title, info, notice.Receiver)
default:
logger.Error("策略配置错误,未匹配到对应的Sender")
}
}
func senderMail(title string, info string, receiver []string) {
for _, r := range receiver {
util.SendEmail(title, info, r)
}
}
func senderDingDing(title string, info string, receiver [] string) {
bodyStr := buildDingDingMsg(title, info)
for _, r := range receiver {
data := strings.NewReader(string(bodyStr))
_, err := http.Post(r, "application/json;charset=utf-8", data)
if err != nil {
logger.Error(err)
}
}
}
func buildDingDingMsg(title string, info string) []byte {
msg := DinDingMsg{
MsgType: "link",
Link: Link{
Title: title,
Text: info,
MessageUrl: "http://172.20.6.33:3000/d/y1Ju2slik/apdexlist?refresh=1m&orgId=1",
},
}
msgStr, err := json.Marshal(msg)
if nil != err {
logger.Error("无法序列化ding ding msg", err)
}
return msgStr
}
/**
判断是否符合报警的时间间隔
*/
func isExpired(key string, interval int64) bool {
now := time.Now().Unix()
lastTime, hasValue := SenderWatcher.LoadOrStore(key, now)
logger.Info("---------时间间隔:old:", lastTime, ",new:", now)
if hasValue { // 存在旧值,判断是否过期
if now-lastTime.(int64) >= interval { //过期
SenderWatcher.Store(key, now)
return true
}
return false
}
return true
}
func buildUniqueKey(target []string) string {
length := len(target)
str := strings.Builder{}
for i := length - 1; i >= 0; i-- {
str.WriteString(target[i])
str.WriteString(":")
}
return str.String()
}
/**
持久化报警
*/
func persistence() {
}
package alarm
import (
"fmt"
"github.com/vrg0/go-common/logger"
"strconv"
"strings"
)
type Compare struct {
}
/**
real < alter
*/
func (Compare) Less(alter string, real []string) bool {
rs := true
lastIndex := len(real) - 1
for i, r := range real {
if i != 0 || i != lastIndex {
if r == "0" {
continue
}
rs = rs && (r < alter)
}
}
return rs
}
func (Compare) Greater(alter string, real []string) bool {
rs := true
lastIndex := len(real) - 1
for i, r := range real {
if i != 0 || i != lastIndex {
rs = rs && (r > alter)
}
}
return rs
}
/**
real = alter
*/
func (Compare) Equal(alter string, real []string) bool {
rs := true
lastIndex := len(real) - 1
for i, r := range real {
if i != 0 || i != lastIndex {
rs = rs && (r == alter)
}
}
return rs
}
//限制同比
func (Compare) LimitComparedWithSame(alter string, old []string, current []string) bool {
logger.Info(alter, old, current)
logger.Info("old:", strings.Join(old, ","), "new: ", strings.Join(current, ","))
rs := true
lastIndex := len(current) - 1
for i, r := range current {
if i != 0 || i != lastIndex {
rs = rs && limitCompareSame(alter, old[i], r)
}
}
return rs
}
/**
同比超过alter
*/
func (Compare) ComparedWithSame(alter string, old []string, current []string) bool {
logger.Info("old:", strings.Join(old, ","), "new: ", strings.Join(current, ","))
rs := true
lastIndex := len(current) - 1
for i, r := range current {
if i != 0 || i != lastIndex {
rs = rs && compareSame(alter, old[i], r)
}
}
return rs
}
func limitCompareSame(alter string, old string, current string) bool {
cf := parseToFloat(current)
of := parseToFloat(old)
if cf < 200 && of < 200 {
return false
}
return (cf-of)/of > parseToFloat(alter)
}
func compareSame(alter string, old string, current string) bool {
cf := parseToFloat(current)
of := parseToFloat(old)
if of == 0 { //jingbo.wang, 忽略of为0的场景
return false
}
return (cf-of)/of > parseToFloat(alter)
}
func parseToFloat(value string) float64 {
rs, err := strconv.ParseFloat(value, 64)
if nil != err {
logger.Error(err)
}
return rs
}
type MsgBuilder struct {
}
func (MsgBuilder) Less(alter string) string {
return fmt.Sprintf(" 低于阈值:%s", alter)
}
/**
real > alter
*/
func (MsgBuilder) Greater(alter string) string {
return fmt.Sprintf(" 高于阈值:%s", alter)
}
/**
real = alter
*/
func (MsgBuilder) Equal(alter string) string {
return fmt.Sprintf(" 等于阈值:%.3s", alter)
}
/**
同比超过alter
*/
func (MsgBuilder) ComparedWithSame(alter string) string {
return "同比超过" + alter
}
//限制同比
func (MsgBuilder) LimitComparedWithSame(alter string) string {
return "同比超过" + alter
}
\ No newline at end of file
package alarm
import (
"fmt"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
"reflect"
"strings"
)
var operators = reflect.ValueOf(Compare{})
var msgBuilder = reflect.ValueOf(MsgBuilder{})
func DealResult(res []client.Result, strategies []Strategy) {
var resIndex = 0
for _, strategy := range strategies {
logger.Info("-------", strategy.Name, resIndex, "---------")
operator := strategy.Operator
operatorMethod := operators.MethodByName(operator)
buildMsgMethod := msgBuilder.MethodByName(operator)
alterValue := strategy.AlterValue
alterValueLen := len(alterValue)
ignoreTag := strategy.IgnoreTag
tags := strategy.Tag
notice := strategy.Notice
noDataAlter := strategy.NoDataAlter
personalAlterValue := strategy.PersonalAlterValue
if strategy.SqlLen == 1 { //单sql
result := res[resIndex]
for _, series := range result.Series {
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
if needIgnore(ignoreTag, tagValues) {
continue
}
value := make([]string,0)
for _, v := range series.Values {
value = append(value, fmt.Sprint(v[1]))
}
//value := series.Values[0][1]
if nil == value && noDataAlter { //空值报警
Sender(tagValues, strategy.Name, "no data", notice)
}
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
params := make([]reflect.Value, alterValueLen+1)
for j, arg := range currentAlterValue {
params[j] = reflect.ValueOf(arg)
}
params[alterValueLen] = reflect.ValueOf(value)
logger.Info(uniqueTag, ":alter", params[:1], "-", strings.Join(value, ","))
rs := operatorMethod.Call(params)
if rs[0].Bool() { //触发报警策略
//Sender(tagValues, strategy.Name, s, notice) // 报警
Sender(tagValues, strategy.Name, buildMsgMethod.Call(params[:1])[0].String(), notice) // 报警
}
}
} else {
params := make(map[string][]reflect.Value)
tagValueMap := make(map[string][]string)
for i := 0; i < strategy.SqlLen; i++ {
resIndex += i
for _, series := range res[resIndex].Series {
uniqueTag := series.Tags[tags[0]]
tagValues := getTagValues(tags, series.Tags)
if needIgnore(ignoreTag, tagValues) {
continue
}
tagValueMap[uniqueTag] = tagValues
value := make([]string,0)
for _, v := range series.Values {
value = append(value, fmt.Sprint(v[1]))
}
if nil == params[uniqueTag] {
currentParams := make([]reflect.Value, strategy.SqlLen+alterValueLen)
currentAlterValue := getAlterValue(personalAlterValue, uniqueTag, tagValues, alterValue)
for k, arg := range currentAlterValue {
currentParams[k] = reflect.ValueOf(arg)
}
params[uniqueTag] = currentParams
}
params[uniqueTag][alterValueLen+i] = reflect.ValueOf(value)
}
}
for k, v := range params {
if nil == v || hasNilValue(v) {
continue
}
logger.Info(k, ":", v)
rs := operatorMethod.Call(v)
//结果报警
if rs[0].Bool() { //触发报警策略
Sender(tagValueMap[k], strategy.Name, buildMsgMethod.Call(v[:1])[0].String(), notice) // 报警
}
}
}
//循环结果集
resIndex += 1
}
}
func CheckArray(strategies []Strategy) bool {
for _, strategy := range strategies {
if !Check(strategy) {
return false
}
}
return true
}
func Check(strategy Strategy) bool {
if "" == strategy.Name {
return false
}
if "" == strategy.Name {
return false
}
if strategy.SqlLen != strings.Count(strategy.Sql, ";") {
return false
}
if len(strategy.Tag) < 1 {
return false
}
length := len(strategy.AlterValue)
for _, v := range strategy.PersonalAlterValue {
if len(v) != length {
return false
}
}
return true
}
func BuildSql(strategies []Strategy) string {
var sqlBuff strings.Builder
for _, strategy := range strategies {
sqlBuff.WriteString(strategy.Sql)
}
return sqlBuff.String()
}
func hasNilValue(params []reflect.Value) bool {
for _, c := range params {
return !c.IsValid()
}
return false
}
func needIgnore(ignoreTag map[string]bool, tags []string) bool {
for _, k := range tags {
if ignoreTag[k] {
return true
}
}
return false
}
func getAlterValue(personalAlterValue map[string][]string, uniqueTag string, tags []string, alterValue []string) []string {
value := personalAlterValue[uniqueTag]
if nil != value {
return value
}
tagLen := len(tags)
for i := 1; i < tagLen; i++ {
if nil != personalAlterValue[tags[i]] {
return personalAlterValue[tags[i]]
}
}
return alterValue
}
func getTagValues(keys []string, tag map[string]string) []string {
values := make([]string, len(keys))
for i, k := range keys {
values[i] = tag[k]
}
return values
}
package conf
import (
"encoding/json"
"github.com/vrg0/go-common/logger"
"io/ioutil"
"net/http"
)
var GlobalConfig = Config{}
func Load(denv string, didc string) {
url := "http://apollo-" + denv + ".quantgroups.com/configfiles/real_json/enoch/" + didc + "/tech.config.json"
resp, err := http.Get(url)
defer func() { _ = resp.Body.Close() }()
bytes, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(bytes, &GlobalConfig)
GlobalConfig.Env = denv
if nil != err {
logger.Error(err)
}
}
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
"strings"
"time"
)
type BraveMessageHandler struct {
}
var httpMethod = map[string]string{
"get": "", "GET": "", "post": "", "options": "", "put": "", "delete": "", "head": "",
}
func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg := make([]end_points.TraceMsg, 3) //[]TraceMsg{}
err := json.Unmarshal([]byte(msg), &traceMsg)
if err != nil {
logger.Error("brave 解析msg失败:", err)
}
msgInfluxProcess(traceMsg)
}
func (BraveMessageHandler) Destroy() {
if len(pointSlice) > 0 {
logger.Info("braveMessageHandler 提交本地缓存数据:", len(pointSlice))
batchWrite(pointSlice)
}
}
var pointSlice = make([]*client.Point, 0, batchSize)
func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
for _, traceMsg := range traceMsgs {
if traceMsg.Kind != "SERVER" {
continue
}
path := traceMsg.Name
if strings.ToLower(traceMsg.Tags.HttpMethod) == "options" {
continue
}
if _, ok := httpMethod[path]; ok {
path = traceMsg.Tags.HttpMethod + " " + traceMsg.Tags.HttpPath
}
path = strings.ToLower(path)
sysName := traceMsg.LocalEndpoint.ServiceName
fields := make(map[string]interface{})
fields["duration"] = traceMsg.Duration / 1000
fields["traceId"] = traceMsg.TraceId
bytes, err := json.Marshal(traceMsg)
msg := string(bytes)
fields["msg"] = msg
tags := make(map[string]string)
tags["sys_name"] = sysName
tags["path"] = path
tags["host"] = traceMsg.LocalEndpoint.Ipv4
if err != nil {
logger.Fatal(err)
}
unix := time.Unix(0, traceMsg.Timestamp*1000)
if len(pointSlice) == batchSize {
go batchWrite(pointSlice)
pointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("trace_info", tags, fields, unix)
pointSlice = append(pointSlice, point)
if err != nil {
logger.Fatal(err, msg)
}
}
}
func Duration(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get()
defer func() { _ = conn.Close() }()
matchPattern := "duration*" + day
reply, err := redis.Values(conn.Do("scan", 0, "match", matchPattern, "count", 10000))
if err != nil {
fmt.Print("Scan 扫描key出错", err)
}
redisKeys := reply[1].([]interface{})
for _, redisKey := range redisKeys {
reply2, err := redis.StringMap(conn.Do("zrevrangebyscore", redisKey, "+inf", 200000, "withscores"))
if err != nil {
fmt.Print("获取出错了", err)
}
fun(string(redisKey.([]uint8)), reply2)
}
}
func Counter(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get()
defer func() { _ = conn.Close() }()
matchPattern := "counter*" + day
reply, err := redis.Values(conn.Do("scan", 0, "match", matchPattern, "count", 10000))
if err != nil {
fmt.Print("Scan 扫描key出错", err)
}
redisKeys := reply[1].([]interface{})
for _, redisKey := range redisKeys {
reply2, err := redis.StringMap(conn.Do("zrevrange", redisKey, 0, 300, "withscores"))
if err != nil {
fmt.Print("获取出错了", err)
}
fun(string(redisKey.([]uint8)), reply2)
}
}
package consumer
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
"math/big"
"net"
"time"
)
var sysNameIndex = make(map[int64]bool)
var metricsPointSlice = make([]*client.Point, 0, batchSize)
var healthPointSlice = make([]*client.Point, 0, batchSize)
type HealthMessageHandler struct {
}
func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil {
logger.Error("healthMessageHandler解析json失败:", err)
logger.Error(msg)
return
}
buildMsg(chunkMsg)
}
func (HealthMessageHandler) Destroy() {
if len(metricsPointSlice) > 0 {
logger.Info("metricsMessageHandler 提交本地缓存数据:", len(metricsPointSlice))
batchWrite(metricsPointSlice)
}
if len(healthPointSlice) > 0 {
logger.Info("HealthMessageHandler 提交本地缓存数据:", len(healthPointSlice))
batchWrite(healthPointSlice)
}
}
func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, db *[]byte) {
tags := make(map[string]string, )
tags["sys_name"] = appName
tags["host"] = ip
fields := make(map[string]interface{})
dbInfo := end_points.DBInfo{}
err := json.Unmarshal(*db, &dbInfo)
if err != nil {
dbDetails := end_points.DBDetail{}
err = json.Unmarshal(*db, &dbDetails)
if err == nil {
fields[dbDetails.Details.Database] = isOK(dbDetails.Status.Code)
}
}else {
for k, v := range dbInfo.Details {
var fieldName = v.Details.Database + "—" + k
fields[fieldName] = isOK(v.Status.Code)
}
}
if len(healthPointSlice) >= submitLimit {
go batchWrite(healthPointSlice)
healthPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("health_info", tags, fields, timestamp)
healthPointSlice = append(healthPointSlice, point)
}
func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, health end_points.Health, metrics end_points.MetricsInfo) {
tags := make(map[string]string, )
fields := make(map[string]interface{})
tags["sys_name"] = appName
tags["host"] = ip
status := health.Status
fields["sever_status"] = isOK(status.Code)
redis := health.Details.Redis
fields["redis_status"] = isOK(redis.Status.Code)
diskSpace := health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free
fields["disk_threshold"] = diskSpace.Threshold
fields["mem_tol"] = metrics.Mem
fields["mem_free"] = metrics.MemFree
fields["heap_tol"] = metrics.Heap
fields["heap_init"] = metrics.HeapInit
fields["heap_used"] = metrics.HeapUsed
fields["nonheap_tol"] = metrics.Nonheap
fields["nonheap_init"] = metrics.NonheapInit
fields["nonheap_used"] = metrics.NonheapUsed
fields["nonheap_commit"] = metrics.NonheapCommitted
fields["thread_tol"] = metrics.ThreadsTotalStarted
fields["thread_peak"] = metrics.ThreadsPeak
fields["thread_daemon"] = metrics.ThreadsDaemon
fields["class_tol"] = metrics.Classes
fields["class_loaded"] = metrics.ClassesLoaded
fields["class_unloaded"] = metrics.ClassesUnloaded
fields["gc_parnew_count"] = metrics.GcParnewCount
fields["gc_parnew_time"] = metrics.GcParnewTime
fields["gc_concurrent_mark_sweep"] = metrics.GcConcurrentmarksweepCount
fields["gc_concurrent_mark_time"] = metrics.GcConcurrentmarksweepTime
fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage
fields["processors"] = metrics.Processors
if len(metricsPointSlice) >= submitLimit {
go batchWrite(metricsPointSlice)
metricsPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("machine_info", tags, fields, timestamp)
metricsPointSlice = append(metricsPointSlice, point)
}
func buildMsg(chunkMsg end_points.ChunkMsg) {
var ip = inetAtoN(chunkMsg.Ip)
sysNameIndex[ip] = true
var sysNameCount = len(sysNameIndex)
for _, p := range chunkMsg.EndPoints {
var appName = chunkMsg.AppName
var ip = chunkMsg.Ip
unix := time.Unix(0, p.Timestamp*1000000)
//metricsInfo
buildMetricsInfluxMsg(appName, ip, unix, sysNameCount, p.Health, p.Metrics)
//health_info
dbByte, _ := json.Marshal(p.Health.Details.Db)
buildHealthInfluxMsg(appName, ip, unix, sysNameCount, &dbByte)
}
}
func inetAtoN(ip string) int64 {
ret := big.NewInt(0)
ret.SetBytes(net.ParseIP(ip).To4())
return ret.Int64()
}
func isOK(code string) int {
if "UP" == code {
return 1
}
return 0
}
package consumer
import (
"git.quantgroup.cn/DevOps/enoch/service/conf"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/vrg0/go-common/logger"
"os"
"os/signal"
"sync/atomic"
"syscall"
)
var consumerCount int32
func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true
consumer, err := cluster.NewConsumer(kafkaConf.Broker, kafkaConf.Group, []string{kafkaConf.Topic}, config)
if err != nil {
panic(err)
}
atomic.AddInt32(&consumerCount, 1)
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
defer func() {
_ = consumer.Close()
messageHandle.Destroy()
atomic.AddInt32(&consumerCount, -1)
logger.Info("consumer结束")
if consumerCount == 0 {
os.Exit(0)
}
}()
// consume errors
go func() {
for err := range consumer.Errors() {
logger.Error("consume error:", err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
logger.Info("Rebalanced: %+v\n", ntf)
}
}()
// consume messages, watch signals
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
messageHandle.MsgProcess(string(msg.Value))
consumer.MarkOffset(msg, "") // mark message as processed
}
case <-signals:
return
}
}
}
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
)
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info(pointSlice)
return err
}
logger.Info("缓存重新提交:", len(pointSlice))
}
logger.Info("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
package continuous_queries
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"github.com/vrg0/go-common/logger"
"net"
"os"
"strings"
)
var (
DROP_CQ_SQL = "DROP CONTINUOUS QUERY %s ON %s;"
SHOW_CQ_SQL = "SHOW CONTINUOUS QUERIES;"
APDEX_CQ_SAT = "CREATE CONTINUOUS QUERY cq_apdex_sat_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS sat INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_TOL = "CREATE CONTINUOUS QUERY cq_apdex_tol_%s ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS tol INTO monitor.autogen.apdex FROM monitor.autogen.trace_info WHERE sys_name = '%s' AND \"duration\" > %d AND \"duration\" < %d GROUP BY sys_name, time(1m) fill(0) END;"
APDEX_CQ_ALL = "CREATE CONTINUOUS QUERY cq_apdex_all ON monitor RESAMPLE FOR 1h BEGIN SELECT count(traceId) AS ct_all INTO monitor.autogen.apdex FROM monitor.autogen.trace_info GROUP BY sys_name, time(1m) fill(1) END;"
)
func query(dbName string) []string {
cq := data.QueryMonitor(SHOW_CQ_SQL)
logger.Info(cq)
cqName := make([]string, 0)
for _, row := range cq[0].Series {
if row.Name == dbName {
for _, v := range row.Values {
cqName = append(cqName, v[0].(string))
}
}
}
return cqName
}
func delete(db string, cqName [] string) bool {
sql := strings.Builder{}
for _, name := range cqName {
sql.WriteString(fmt.Sprintf(DROP_CQ_SQL, name, db))
}
rs := data.QueryMonitor(sql.String())
return rs == nil
}
func create(apdexThreshold conf.ApdexThreshold) {
sysName := data.QuerySysName()
logger.Info(sysName)
sql := strings.Builder{}
for _, name := range sysName {
cqName := strings.Replace(name, "-", "_", 10)
threshold := apdexThreshold.Common
if c, ok := apdexThreshold.Personal[name]; ok {
threshold = c
}
sql.WriteString(buildSatSql(cqName, name, threshold))
sql.WriteString(buildTolSql(cqName, name, threshold))
}
sql.WriteString(APDEX_CQ_ALL)
data.QueryMonitor(sql.String())
}
func buildSatSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_SAT, cqName, sysName, threshold)
}
func buildTolSql(cqName string, sysName string, threshold int) string {
return fmt.Sprintf(APDEX_CQ_TOL, cqName, sysName, threshold, threshold*4)
}
func Load() {
name := query(data.MONITOR)
logger.Info("old: ", name)
delete(data.MONITOR, name)
create(conf.GlobalConfig.ApdexThreshold)
name = query(data.MONITOR)
logger.Info("new: ", name)
}
func checkIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
\ No newline at end of file
package data
import (
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
)
func NewClient() client.Client {
var httpConfig = client.HTTPConfig{
Addr: conf.GlobalConfig.InfluxDb.Host + ":" + conf.GlobalConfig.InfluxDb.Port,
}
con, err := client.NewHTTPClient(httpConfig)
if err != nil {
logger.Error(err.Error())
}
return con
}
func Query(sql string, db string) []client.Result {
con := NewClient()
q := client.Query{Command: sql, Database: db}
res, err := con.Query(q)
if nil != err {
logger.Error("influxdb client init error", err)
}
if nil != res.Error() {
logger.Error("query error", db, sql, res.Error())
}
return res.Results
}
func QueryMonitor(sql string) []client.Result {
return Query(sql, MONITOR)
}
func QuerySysName() []string {
res := QueryMonitor(SYSNAME_SQL)
var values = res[0].Series[0].Values
sysName := make([]string, 0)
for _, v := range values {
sysName = append(sysName, v[1].(string))
}
return sysName
}
func QueryHost(sysName []string) map[string][]string {
var sqlBuff bytes.Buffer
for _, name := range sysName {
sqlBuff.WriteString(HOST_SQL)
sqlBuff.WriteString("'")
sqlBuff.WriteString(name)
sqlBuff.WriteString("';")
}
res := QueryMonitor(sqlBuff.String())
nodeTree := make(map[string][]string)
for index, result := range res {
ipList := make([]string, 0)
values := result.Series[0].Values
for _, v := range values {
ipList = append(ipList, v[1].(string))
}
nodeTree[sysName[index]] = ipList
}
return nodeTree
}
package dingding
var DefaultDingURL = []string{"https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"}
package file_cache
import (
"bufio"
"fmt"
"github.com/robfig/cron"
"github.com/vrg0/go-common/logger"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
WRITING = ".writing"
DELETE = ".delete"
CACHE = ".cache"
)
type fileHolder struct {
fileName string
path string
file *os.File
fileSize fileSize
}
type fileSize struct {
size int
lock sync.Mutex
}
func (fs *fileSize) Get() int {
return fs.size
}
func (fs *fileSize) Add(offset int) {
fs.lock.Lock()
defer fs.lock.Unlock()
fs.size = fs.size + offset
if fs.size < 0 {
fs.size = 0
return
}
}
var currentFile *fileHolder
var fileCacheOnce sync.Once
func Load(path string) {
if path == "" {
path = "../"
}
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
if !pathExists(path) {
mkdir(path)
}
currentFile = &fileHolder{}
currentFile.path = path
go func() {
fileCacheOnce.Do(func() {
mark() //将因为服务停止导致未及时更改状态的文件修改为缓存状态
Delete() //删除过期文件
pickup() //统计上次未提交的缓存文件的数量
})
}()
}
func create() {
current := time.Now().Unix()
currentFile.fileName = currentFile.path + strconv.FormatInt(current, 10)
file, err := os.Create(currentFile.fileName + WRITING)
if err != nil {
logger.Error("创建缓存文件失败", err)
}
logger.Info("打开缓存文件", currentFile.fileName)
currentFile.file = file
currentFile.fileSize.Add(1)
logger.Info("文件缓存数:", currentFile.fileSize.Get())
}
func closed() {
defer os.Rename(currentFile.fileName+WRITING, currentFile.fileName+CACHE)
logger.Info("关闭缓存文件")
currentFile.file.Close()
}
func Delete() {
fileNames := scan(DELETE)
for _, name := range fileNames {
os.Remove(currentFile.path + name)
logger.Info("删除缓存文件", name)
}
}
func pickup() {
logger.Info("获取缓存文件数量")
files := scan(CACHE)
currentFile.fileSize.Add(len(files))
}
func mark() {
fileName := scan(WRITING)
logger.Info("重新标记缓存文件")
for _, name := range fileName {
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+CACHE)
}
}
func Recover(submit func(data []string) error) {
if currentFile.fileSize.Get() <= 0 {
return
}
fileNames := scan(CACHE)
for _, name := range fileNames {
err := submit(Read(currentFile.path + name))
if err != nil {
logger.Error("重新提交缓存异常:", err)
return
}
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+DELETE)
currentFile.fileSize.Add(-1)
logger.Info("文件缓存数:", currentFile.fileSize.Get())
}
go Delete()
}
func Write(data string) error {
writer := bufio.NewWriter(currentFile.file)
_, err := writer.WriteString(data + "\n")
if err != nil {
return err
}
go writer.Flush()
return nil
}
func scan(suffix string) []string {
files, _ := ioutil.ReadDir(currentFile.path)
fileNames := make([]string, 0)
for _, f := range files {
if strings.HasSuffix(f.Name(), suffix) {
fileNames = append(fileNames, f.Name())
}
}
return fileNames
}
func Read(fileName string) []string {
file, err := os.Open(fileName)
if err != nil {
logger.Error("未找到对应的文件:", err)
}
reader := bufio.NewReader(file)
data := make([]string, 0)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
data = append(data, string(line))
}
return data
}
func pathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
return !os.IsNotExist(err)
}
func mkdir(path string) bool {
err := os.Mkdir(path, os.ModePerm)
return err == nil
}
func RegisterJob(submit func(data []string) error) {
c := cron.New()
err := c.AddFunc("@every 1m", func() {
if !Enabled() {
logger.Info("开始扫描缓存文件")
Recover(submit)
}
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
package file_cache
import (
"encoding/json"
"github.com/vrg0/go-common/logger"
"net/http"
"strings"
"sync"
"time"
)
/**
通过开关控制是否启用文件缓存,每次默认写入超过1分钟即关闭缓存
*/
type switcher struct {
state bool
origin int64
lock sync.Mutex
}
func (s *switcher) turnOn() {
s.lock.Lock()
defer s.lock.Unlock()
if !s.state { //打开状态,则关闭
s.state = true
s.origin = time.Now().Unix()
create()
senderDingDing()
}
}
func (s *switcher) turnOff() {
s.lock.Lock()
defer s.lock.Unlock()
if s.state {
s.state = false
s.origin = 0
closed()
}
}
func (s *switcher) status() bool {
if s.state {
current := time.Now().Unix()
diff := current - s.origin
if diff >= 60 {
logger.Info("缓存切换")
s.turnOff()
}
}
return s.state
}
var cacheSwitcher *switcher
var alterMsg string
const (
url = "https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"
contentType = "application/json;charset=utf-8"
)
func init() {
cacheSwitcher = &switcher{}
alterMsg = buildDingDingMsg()
}
func Enabled() bool {
return cacheSwitcher.status()
}
func OpenCache() {
cacheSwitcher.turnOn()
}
func senderDingDing() {
_, err := http.Post(url, contentType, strings.NewReader(alterMsg))
if err != nil {
logger.Error(err)
}
}
func buildDingDingMsg() string {
msg := dingDingMsg{
MsgType: "text",
Text: text{
Content: "influxdb 写超时,已启用文件缓存",
},
}
msgStr, err := json.Marshal(msg)
if nil != err {
logger.Error("无法序列化ding ding msg", err)
}
return string(msgStr)
}
type dingDingMsg struct {
MsgType string
Text text
}
type text struct {
Content string
}
package job
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"github.com/robfig/cron"
"github.com/vrg0/go-common/logger"
"net"
"os"
"time"
)
/**
报警定时任务,每分钟执行一次
*/
func AutoAlarm() {
c := cron.New()
err := c.AddFunc("@every 1m", func() {
logger.Info("开始执行定时任务", time.Now().Minute())
alarm.Load()
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
func CheckIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
package node_check
import (
"context"
"git.quantgroup.cn/DevOps/enoch/service/dingding"
"git.quantgroup.cn/DevOps/enoch/service/registry"
"github.com/vrg0/go-common/ding"
"github.com/vrg0/go-common/logger"
"net/http"
"os"
"sync"
"time"
)
const (
Passing = "passing"
Critical = "critical"
)
var HandlerMap = new(sync.Map)
var HttpGetRetryCount = 3
var HttpTimeOut = time.Second * 10
var IgnoreServiceMap = make(map[string]struct{})
func init() {
//TODO 灵活配置
HandlerMap.Store("aaaa", "http://www.baidasdfasdfasdfasdfu.com/")
HandlerMap.Store("heimdallr", "http://172.20.6.33:8989/service-down")
// HandlerMap.Store("aaaa", "http://172.20.6.33:8989/service-down")
//TODO 灵活配置
IgnoreServiceMap["vcc-talos"] = struct{}{}
IgnoreServiceMap["aaaa"] = struct{}{}
}
func httpGet(url string, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithCancel(context.TODO())
req, e := http.NewRequest("GET", url, nil)
if e != nil {
return nil, e
} else {
req = req.WithContext(ctx)
}
_ = time.AfterFunc(timeout, func() {
cancel()
})
return http.DefaultClient.Do(req)
}
func handler(serviceName string) {
if url, ok := HandlerMap.Load(serviceName); ok {
for i := 0; i < HttpGetRetryCount; i++ {
if resp, e := httpGet(url.(string), HttpTimeOut); e != nil {
logger.Error(" handler service: ", serviceName, " ", e)
} else {
logger.Info(" handler service: ", serviceName, " ", resp.StatusCode)
break
}
}
} else {
logger.Info(" handler service: ", serviceName, " ", "not found handler hook api")
}
}
type watch struct{}
func (w watch) DeleteService(serviceName string) {
//pass
servicesStatusLock.Lock()
defer servicesStatusLock.Unlock()
delete(servicesStatus, serviceName)
}
func serviceStatus(service *registry.Service) bool {
for _, node := range service.NodeMap {
if node.Status == Passing {
return true
}
}
return false
}
func serviceStr(service *registry.Service) string {
rtn := service.Name + " "
for _, node := range service.NodeMap {
rtn += node.Id + ":" + node.Status + " "
}
return rtn
}
func (w watch) UpdateNodes(service *registry.Service) {
servicesStatusLock.Lock()
defer servicesStatusLock.Unlock()
//单个节点挂了告警
if oldService, ok := servicesStatus[service.Name]; ok {
for _, node := range service.NodeMap {
if oldNode, ok := oldService.NodeMap[node.Id]; ok {
if oldNode.Status == Passing && node.Status == Critical {
logger.Warn(service.Name, " ", node.Id, "---!!!node critical!!!---")
if _, ok := IgnoreServiceMap[service.Name]; !ok {
_ = ding.SendText(service.Name+" "+node.Id+" "+"---!!!node critical!!!---", dingding.DefaultDingURL...)
}
}
}
}
}
//整个服务挂了告警
//如果 服务存在,并且服务的old状态为passing,并且服务的now状态为critical,则报警,否贼记录服务状态
serviceString := serviceStr(service)
if oldService, ok := servicesStatus[service.Name]; ok && serviceStatus(oldService) && !serviceStatus(service) {
logger.Warn(serviceString, "---!!!service critical!!!---")
if _, ok := IgnoreServiceMap[service.Name]; !ok {
_ = ding.SendText(serviceString+"---!!!service critical!!!---", dingding.DefaultDingURL...)
}
handler(service.Name)
} else {
logger.Info(serviceString)
}
//更新服务状态
//深拷贝对象
newService := registry.NewService(service.Name)
for kk, vv := range service.NodeMap {
newNode := registry.Node{
ServiceName: vv.ServiceName,
Id: vv.Id,
Port: vv.Port,
Address: vv.Address,
Status: vv.Status,
Meta: make(map[string]string),
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
newService.NodeMap[kk] = &newNode
}
servicesStatus[service.Name] = newService
}
func (w watch) AddNode(node *registry.Node) {
//pass
}
func (w watch) DelNode(node *registry.Node) {
//pass
}
var (
servicesStatus = make(map[string]*registry.Service)
servicesStatusLock = new(sync.Mutex)
)
func InitServiceStatus() {
servicesStatus = registry.GetServiceMap()
}
func NodeCheck() {
defer func() {
/*
if e := recover(); e != nil {
logger.Error("node check panic: ", e)
_ = ding.SendText("node check panic!", dingding.DefaultDingURL...)
time.Sleep(time.Second * 1)
NodeCheck()
}
*/
}()
//注册器初始化
dc := "3c"
cluster := []string{"172.30.12.2:8500", "172.30.12.3:8500", "172.30.12.4:8500"}
if e := registry.Init("consul", map[string]interface{}{"dc": dc, "cluster": cluster}); e != nil {
logger.Info("registry init error:", e)
os.Exit(-1)
}
time.Sleep(time.Second * 1)
//服务状态初始化
InitServiceStatus()
//设置观察者
if e := registry.SetObserver("watch", &watch{}); e != nil {
logger.Info("set observer error:", e)
os.Exit(-1)
}
select {}
}
package registry
import (
"bytes"
"encoding/gob"
"errors"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"reflect"
"strconv"
"sync"
"time"
)
//consul注册器
type consulRegistry struct {
masterClient *api.Client //主客户端
watchClients []*api.Client //监视器客户端
servicesWatchParse *watch.Plan //服务列表监视器
serviceWatchParseMap map[string]*watch.Plan //服务监视器映射
observerMap *sync.Map //观察者映射
serviceMap map[string]*Service //服务映射
serviceMapLock *sync.RWMutex //服务映射锁
}
//新建consul注册器
func newConsulRegistry() Registry {
return &consulRegistry{
masterClient: nil,
watchClients: make([]*api.Client, 0),
servicesWatchParse: nil,
serviceWatchParseMap: make(map[string]*watch.Plan),
observerMap: new(sync.Map),
serviceMap: make(map[string]*Service),
serviceMapLock: new(sync.RWMutex),
}
}
//内部初始化
func init() {
if _, ok := newRegistryMap["consul"]; !ok {
newRegistryMap["consul"] = newConsulRegistry
}
}
//初始化
//参数名 类型 格式 默认值
//cluster []string ip:port ["127.0.0.1:8500"]
//dc string string "dc1"
func (cr *consulRegistry) Init(options map[string]interface{}) error {
//参数过滤
cluster := []string{"127.0.0.1:8500"}
dc := "dc1"
if v, ok := options["cluster"]; ok {
if _, ok := v.([]string); ok {
cluster = v.([]string)
} else {
return errors.New("consul cluster []string")
}
}
if v, ok := options["dc"]; ok {
if _, ok := v.(string); ok {
dc = v.(string)
} else {
return errors.New("consul dc string")
}
}
//初始化参数
cr.masterClient = newConsulClient(cluster[0], dc, time.Millisecond*100)
for _, address := range cluster {
cr.watchClients = append(cr.watchClients, newConsulClient(address, dc, time.Second*300))
}
//服务初始化
parse, _ := watch.Parse(map[string]interface{}{"type": "services"})
parse.Handler = cr.handlerServices
cr.servicesWatchParse = parse
go cr.watch(cr.servicesWatchParse)
return nil
}
//新建consul客户端
func newConsulClient(address string, dc string, waitTime time.Duration) *api.Client {
config := api.DefaultConfig()
config.Address = address
config.Datacenter = dc
config.WaitTime = waitTime
client, _ := api.NewClient(config)
return client
}
//监控
func (cr *consulRegistry) watch(parse *watch.Plan) {
sentry := 0
defer func() {
if e := recover(); e != nil {
time.Sleep(time.Second * 1)
if sentry < len(cr.watchClients)-1 {
sentry++
} else {
sentry = 0
}
}
}()
_ = parse.RunWithClientAndLogger(cr.watchClients[sentry], nil)
}
//监控服务列表的变化
func (cr *consulRegistry) handlerServices(_ uint64, data interface{}) {
services := data.(map[string][]string)
//锁
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
for service, tags := range services {
//如果服务不存在,则创建服务、监听服务
if _, ok := cr.serviceWatchParseMap[service]; !ok {
parse, _ := watch.Parse(map[string]interface{}{"type": "service", "service": service})
parse.Handler = cr.handlerService
cr.serviceWatchParseMap[service] = parse
cr.serviceMap[service] = NewService(service)
go cr.watch(parse)
}
//更新标签
tagMap := make(map[string]struct{})
for _, tag := range tags {
tagMap[tag] = struct{}{}
}
if !reflect.DeepEqual(tagMap, cr.serviceMap[service].TagMap) {
cr.serviceMap[service].TagMap = tagMap
}
}
//处理被删除的服务
for service, parse := range cr.serviceWatchParseMap {
if _, ok := services[service]; !ok {
//删除服务
parse.Stop()
delete(cr.serviceWatchParseMap, service)
delete(cr.serviceMap, service)
//执行观察者
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DeleteService(service)
return true
})
}
}
}
//观察服务的变化
func (cr *consulRegistry) handlerService(_ uint64, data interface{}) {
services := data.([]*api.ServiceEntry)
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
serviceName := services[0].Service.Service
nodeMap := make(map[string]*Node)
for _, service := range services {
if service.Service.ID == "consul" {
continue
}
//更新节点
node := NewNode(service.Service.Service, service.Service.ID, service.Service.Address, service.Service.Port)
node.Meta = service.Service.Meta
for _, health := range service.Checks {
if node.Id == health.ServiceID {
node.Status = health.Status
}
}
nodeMap[service.Service.ID] = node
}
cr.serviceMap[serviceName].NodeMap = nodeMap
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.UpdateNodes(cr.serviceMap[serviceName])
return true
})
}
//服务注册
func (cr *consulRegistry) Register(node *Node) error {
check := &api.AgentServiceCheck{
HTTP: "http://" + node.Address + ":" + strconv.Itoa(node.Port) + "/tech/health/check",
Interval: "5s",
Timeout: "10s",
// DeregisterCriticalServiceAfter: "24h",
DeregisterCriticalServiceAfter: "1m",
CheckID: node.Id,
}
registration := api.AgentServiceRegistration{
ID: node.Id,
Name: node.ServiceName,
Port: node.Port,
Address: node.Address,
Check: check,
Meta: node.Meta,
}
//可以重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceRegister(&registration); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.AddNode(node)
return true
})
return nil
}
}
return errors.New("service register fail " + node.Id)
}
//服务注销
func (cr *consulRegistry) Deregister(node *Node) error {
//可重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceDeregister(node.Id); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DelNode(node)
return true
})
return nil
}
}
return errors.New("service deregister failed " + node.ServiceName + ":" + node.Id)
}
//获取服务映射
func (cr *consulRegistry) GetServiceMap() map[string]*Service {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
rtn := make(map[string]*Service)
buffer := bytes.NewBuffer(nil)
_ = gob.NewEncoder(buffer).Encode(&cr.serviceMap)
_ = gob.NewDecoder(bytes.NewBuffer(buffer.Bytes())).Decode(&rtn)
return rtn
}
//获取服务
func (cr *consulRegistry) GetService(serviceName string) (*Service, error) {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
if v, ok := cr.serviceMap[serviceName]; ok {
rtn := NewService(serviceName)
for kk, vv := range v.NodeMap {
newNode := Node{
ServiceName:vv.ServiceName,
Id:vv.Id,
Port:vv.Port,
Address:vv.Address,
Status:vv.Status,
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
rtn.NodeMap[kk] = &newNode
}
return rtn, nil
} else {
return nil, errors.New("service not found")
}
}
//设置观察者
func (cr *consulRegistry) SetObserver(name string, observer Observer) error {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
if _, ok := cr.observerMap.Load(name); ok {
return errors.New("observer is exists")
} else {
for _, service := range cr.serviceMap {
//深拷贝对象
newService := NewService(service.Name)
for kk, vv := range service.NodeMap {
newNode := Node{
ServiceName:vv.ServiceName,
Id:vv.Id,
Port:vv.Port,
Address:vv.Address,
Status:vv.Status,
Meta: make(map[string]string),
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
newService.NodeMap[kk] = &newNode
}
observer.UpdateNodes(newService)
}
cr.observerMap.Store(name, observer)
}
return nil
}
//删除观察
func (cr *consulRegistry) DelObserver(name string) error {
if _, ok := cr.observerMap.Load(name); ok {
cr.observerMap.Delete(name)
} else {
return errors.New("observer is not exists")
}
return nil
}
package registry
import "errors"
//节点
type Node struct {
ServiceName string //服务名称
Id string //节点编号
Address string //地址
Port int //端口
Meta map[string]string //元数据
Status string //状态
}
//服务
type Service struct {
Name string //服务名称
NodeMap map[string]*Node //节点映射
TagMap map[string]struct{} //标签
}
//观察者
type Observer interface {
//删除服务事件
DeleteService(serviceName string)
//更新节点事件
UpdateNodes(service *Service)
//添加节点
AddNode(node *Node)
//删除节点
DelNode(node *Node)
}
//注册器
type Registry interface {
//初始化
Init(options map[string]interface{}) error
//服务注册
Register(node *Node) error
//服务注销
Deregister(node *Node) error
//获取服务映射
GetServiceMap() map[string]*Service
//获取服务
GetService(serviceName string) (*Service, error)
//设置观察者
SetObserver(name string, observer Observer) error
//删除观察者
DelObserver(name string) error
}
var newRegistryMap = make(map[string]func() Registry, 0)
//新建注册器
func New(name string) Registry {
if newRegistry, ok := newRegistryMap[name]; ok {
return newRegistry()
} else {
return nil
}
}
//新建节点
func NewNode(serviceName string, id string, address string, port int) *Node {
return &Node{
ServiceName: serviceName,
Id: id,
Address: address,
Port: port,
Status: "unknown",
Meta: make(map[string]string, 0),
}
}
//新建服务
func NewService(name string) *Service {
return &Service{
Name: name,
NodeMap: make(map[string]*Node, 0),
TagMap: make(map[string]struct{}, 0),
}
}
//默认注册器
var defaultRegistry Registry = nil
//初始化
func Init(name string, options map[string]interface{}) error {
if defaultRegistry != nil {
return nil
}
defaultRegistry = New(name)
if defaultRegistry == nil {
return errors.New("new registry error")
}
return defaultRegistry.Init(options)
}
//服务注册
func Register(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Register(node)
}
//服务注销
func Deregister(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Deregister(node)
}
//获取服务映射
func GetServiceMap() map[string]*Service {
if defaultRegistry == nil {
return make(map[string]*Service, 0)
}
return defaultRegistry.GetServiceMap()
}
//获取服务
func GetService(serviceName string) (*Service, error) {
if defaultRegistry == nil {
return nil, errors.New("default registry is not exists")
}
return defaultRegistry.GetService(serviceName)
}
//设置观察者
func SetObserver(name string, observer Observer) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.SetObserver(name, observer)
}
//删除观察者
func DelObserver(name string) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.DelObserver(name)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment