Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
E
enoch
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
DevOps
enoch
Commits
1690b4b1
Commit
1690b4b1
authored
Sep 26, 2019
by
jingbo.wang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
接入 go-common/logger & go-common/ding
parent
9c377c78
Changes
21
Show whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
118 additions
and
203 deletions
+118
-203
go.mod
go.mod
+9
-22
main.go
main.go
+23
-12
load.go
service/alarm/load.go
+4
-4
notice_engine.go
service/alarm/notice_engine.go
+5
-5
operator.go
service/alarm/operator.go
+5
-6
query_engine.go
service/alarm/query_engine.go
+4
-4
load_conf.go
service/conf/load_conf.go
+2
-2
brave_message_handler.go
service/consumer/brave_message_handler.go
+5
-6
health_message_handler.go
service/consumer/health_message_handler.go
+5
-5
kafka_agent_sarama.go
service/consumer/kafka_agent_sarama.go
+4
-4
util.go
service/consumer/util.go
+8
-8
apdex_continuous_query.go
service/continuous_queries/apdex_continuous_query.go
+6
-6
influxdb_conf.go
service/data/influxdb_conf.go
+4
-4
ding.go
service/dingding/ding.go
+0
-42
ding_test.go
service/dingding/ding_test.go
+0
-15
file_cache.go
service/file_cache/file_cache.go
+12
-12
switcher.go
service/file_cache/switcher.go
+4
-4
alarm.go
service/job/alarm.go
+3
-3
conf.go
service/log/conf.go
+0
-25
node_check.go
service/node-check/node_check.go
+14
-13
consul.go
service/registry/consul.go
+1
-1
No files found.
go.mod
View file @
1690b4b1
module git.quantgroup.cn/DevOps/enoch
module git.quantgroup.cn/DevOps/enoch
go 1.12
require (
require (
github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
github.com/Shopify/sarama v1.23.1
github.com/Shopify/sarama v1.21.0
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/coredns/coredns v1.6.3 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/envoyproxy/go-control-plane v0.8.6 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gogo/googleapis v1.3.0 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/consul v1.4.0
github.com/hashicorp/consul/api v1.2.0
github.com/hashicorp/go-discover v0.0.0-20190905142513-34a650575f6c // indirect
github.com/influxdata/influxdb v1.7.8
github.com/hashicorp/hil v0.0.0-20190212132231-97b3a9cdfa93 // indirect
github.com/hashicorp/logutils v1.0.0 // indirect
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 // indirect
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 // indirect
github.com/influxdata/influxdb v1.7.2
github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect
github.com/json-iterator/go v1.1.7
github.com/json-iterator/go v1.1.7
github.com/mitchellh/cli v1.0.0 // indirect
github.com/onsi/ginkgo v1.10.1 // indirect
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/robfig/cron v1.2.0
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/vrg0/go-common v0.0.0-20190925101101-e6595edace1b
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
)
)
main.go
View file @
1690b4b1
...
@@ -10,37 +10,48 @@ import (
...
@@ -10,37 +10,48 @@ import (
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/job"
"git.quantgroup.cn/DevOps/enoch/service/job"
"git.quantgroup.cn/DevOps/enoch/service/node-check"
"git.quantgroup.cn/DevOps/enoch/service/node-check"
"
log
"
"
github.com/vrg0/go-common/logger
"
"net/http"
"net/http"
_
"net/http/pprof"
_
"net/http/pprof"
"os"
"os"
"strconv"
"strconv"
)
)
func
main
()
{
var
quartz
bool
var
quartz
bool
var
denv
string
var
denv
string
var
didc
string
var
didc
string
func
init
()
{
flag
.
BoolVar
(
&
quartz
,
"quartz"
,
false
,
"quartz"
)
flag
.
BoolVar
(
&
quartz
,
"quartz"
,
false
,
"quartz"
)
flag
.
StringVar
(
&
denv
,
"env"
,
"dev"
,
"环境"
)
flag
.
StringVar
(
&
denv
,
"env"
,
"dev"
,
"环境"
)
flag
.
StringVar
(
&
didc
,
"idc"
,
"local"
,
"机房"
)
flag
.
StringVar
(
&
didc
,
"idc"
,
"local"
,
"机房"
)
flag
.
Parse
()
flag
.
Parse
()
logPath
:=
"./enoch.log"
if
denv
==
"pro"
{
logPath
=
"/home/quant_group/enoch/enoch.log"
}
if
e
:=
logger
.
Init
(
denv
==
"pro"
,
logPath
);
e
!=
nil
{
panic
(
e
)
}
}
func
main
()
{
conf
.
Load
(
denv
,
didc
)
conf
.
Load
(
denv
,
didc
)
file
,
err
:=
os
.
OpenFile
(
"quantgroup.log"
,
os
.
O_RDWR
|
os
.
O_CREATE
,
0666
)
file
,
err
:=
os
.
OpenFile
(
"quantgroup.log"
,
os
.
O_RDWR
|
os
.
O_CREATE
,
0666
)
defer
func
()
{
_
=
file
.
Close
()
}()
defer
func
()
{
_
=
file
.
Close
()
}()
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatalln
(
"create file error"
,
err
)
log
ger
.
Fatal
(
"create file error"
,
err
)
}
}
file_cache
.
Load
(
conf
.
GlobalConfig
.
FileCachePath
)
file_cache
.
Load
(
conf
.
GlobalConfig
.
FileCachePath
)
file_cache
.
RegisterJob
(
consumer
.
ReSubmit
)
file_cache
.
RegisterJob
(
consumer
.
ReSubmit
)
go
file_cache
.
Delete
()
go
file_cache
.
Delete
()
port
:=
conf
.
GlobalConfig
.
Port
port
:=
conf
.
GlobalConfig
.
Port
logger
:=
log
.
New
(
file
,
"[Info]"
,
log
.
LstdFlags
|
log
.
Llongfile
)
logger
.
Info
(
conf
.
GlobalConfig
.
AppName
+
"项目启动, port:"
+
port
+
",环境:"
+
conf
.
GlobalConfig
.
Env
)
defer
logger
.
Info
(
"项目结束"
)
logger
.
Println
(
conf
.
GlobalConfig
.
AppName
+
"项目启动, port:"
+
port
+
",环境:"
+
conf
.
GlobalConfig
.
Env
)
defer
logger
.
Println
(
"项目结束"
)
//初始化redis连接池
//初始化redis连接池
data
.
RedisPoolInit
()
data
.
RedisPoolInit
()
...
@@ -49,7 +60,7 @@ func main() {
...
@@ -49,7 +60,7 @@ func main() {
intPort
,
_
:=
strconv
.
Atoi
(
port
)
intPort
,
_
:=
strconv
.
Atoi
(
port
)
if
quartz
{
if
quartz
{
log
.
Println
(
"启动定时任务"
)
log
ger
.
Info
(
"启动定时任务"
)
job
.
AutoEmailPerformInfo
()
job
.
AutoEmailPerformInfo
()
}
}
...
@@ -75,7 +86,7 @@ func main() {
...
@@ -75,7 +86,7 @@ func main() {
err
=
http
.
ListenAndServe
(
":"
+
port
,
nil
)
err
=
http
.
ListenAndServe
(
":"
+
port
,
nil
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatalln
(
"服务启动失败"
,
err
)
log
ger
.
Fatal
(
"服务启动失败"
,
err
)
}
}
}
}
...
...
service/alarm/load.go
View file @
1690b4b1
...
@@ -4,14 +4,14 @@ import (
...
@@ -4,14 +4,14 @@ import (
"encoding/json"
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git
.quantgroup.cn/DevOps/enoch/service/log
"
"git
hub.com/vrg0/go-common/logger
"
"io/ioutil"
"io/ioutil"
)
)
func
Load
()
{
func
Load
()
{
config
,
err
:=
ioutil
.
ReadFile
(
conf
.
GlobalConfig
.
StrategyConfPath
)
config
,
err
:=
ioutil
.
ReadFile
(
conf
.
GlobalConfig
.
StrategyConfPath
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Print
(
"!!!轮训监控,未找到配置文件!!!"
)
logger
.
Error
(
"!!!轮训监控,未找到配置文件!!!"
)
return
return
}
}
...
@@ -19,12 +19,12 @@ func Load() {
...
@@ -19,12 +19,12 @@ func Load() {
err
=
json
.
Unmarshal
(
config
,
&
strategies
)
err
=
json
.
Unmarshal
(
config
,
&
strategies
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Printf
(
"!!!策略文件格式错误:%s!!!
\n
"
,
err
)
logger
.
Error
(
"!!!策略文件格式错误:%s!!!
\n
"
,
err
)
return
return
}
}
if
!
CheckArray
(
strategies
)
{
if
!
CheckArray
(
strategies
)
{
logger
.
Error
.
Print
(
"!!!策略文件未通过校验!!!"
)
logger
.
Error
(
"!!!策略文件未通过校验!!!"
)
return
return
}
}
...
...
service/alarm/notice_engine.go
View file @
1690b4b1
...
@@ -2,8 +2,8 @@ package alarm
...
@@ -2,8 +2,8 @@ package alarm
import
(
import
(
"encoding/json"
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log"
"git.quantgroup.cn/DevOps/enoch/service/util"
"git.quantgroup.cn/DevOps/enoch/service/util"
"github.com/vrg0/go-common/logger"
"net/http"
"net/http"
"strings"
"strings"
"sync"
"sync"
...
@@ -25,7 +25,7 @@ func Sender(target []string, title string, info string, notice Notice) {
...
@@ -25,7 +25,7 @@ func Sender(target []string, title string, info string, notice Notice) {
case
MAIL
:
case
MAIL
:
senderMail
(
title
,
info
,
notice
.
Receiver
)
senderMail
(
title
,
info
,
notice
.
Receiver
)
default
:
default
:
logger
.
Error
.
Println
(
"策略配置错误,未匹配到对应的Sender"
)
logger
.
Error
(
"策略配置错误,未匹配到对应的Sender"
)
}
}
}
}
...
@@ -42,7 +42,7 @@ func senderDingDing(title string, info string, receiver [] string) {
...
@@ -42,7 +42,7 @@ func senderDingDing(title string, info string, receiver [] string) {
data
:=
strings
.
NewReader
(
string
(
bodyStr
))
data
:=
strings
.
NewReader
(
string
(
bodyStr
))
_
,
err
:=
http
.
Post
(
r
,
"application/json;charset=utf-8"
,
data
)
_
,
err
:=
http
.
Post
(
r
,
"application/json;charset=utf-8"
,
data
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
err
)
logger
.
Error
(
err
)
}
}
}
}
}
}
...
@@ -61,7 +61,7 @@ func buildDingDingMsg(title string, info string) []byte {
...
@@ -61,7 +61,7 @@ func buildDingDingMsg(title string, info string) []byte {
msgStr
,
err
:=
json
.
Marshal
(
msg
)
msgStr
,
err
:=
json
.
Marshal
(
msg
)
if
nil
!=
err
{
if
nil
!=
err
{
logger
.
Error
.
Println
(
"无法序列化ding ding msg"
,
err
)
logger
.
Error
(
"无法序列化ding ding msg"
,
err
)
}
}
return
msgStr
return
msgStr
...
@@ -73,7 +73,7 @@ func buildDingDingMsg(title string, info string) []byte {
...
@@ -73,7 +73,7 @@ func buildDingDingMsg(title string, info string) []byte {
func
isExpired
(
key
string
,
interval
int64
)
bool
{
func
isExpired
(
key
string
,
interval
int64
)
bool
{
now
:=
time
.
Now
()
.
Unix
()
now
:=
time
.
Now
()
.
Unix
()
lastTime
,
hasValue
:=
SenderWatcher
.
LoadOrStore
(
key
,
now
)
lastTime
,
hasValue
:=
SenderWatcher
.
LoadOrStore
(
key
,
now
)
logger
.
Info
.
Println
(
"---------时间间隔:old:"
,
lastTime
,
",new:"
,
now
)
logger
.
Info
(
"---------时间间隔:old:"
,
lastTime
,
",new:"
,
now
)
if
hasValue
{
// 存在旧值,判断是否过期
if
hasValue
{
// 存在旧值,判断是否过期
if
now
-
lastTime
.
(
int64
)
>=
interval
{
//过期
if
now
-
lastTime
.
(
int64
)
>=
interval
{
//过期
SenderWatcher
.
Store
(
key
,
now
)
SenderWatcher
.
Store
(
key
,
now
)
...
...
service/alarm/operator.go
View file @
1690b4b1
...
@@ -2,8 +2,7 @@ package alarm
...
@@ -2,8 +2,7 @@ package alarm
import
(
import
(
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/vrg0/go-common/logger"
"log"
"strconv"
"strconv"
"strings"
"strings"
)
)
...
@@ -55,8 +54,8 @@ func (Compare) Equal(alter string, real []string) bool {
...
@@ -55,8 +54,8 @@ func (Compare) Equal(alter string, real []string) bool {
//限制同比
//限制同比
func
(
Compare
)
LimitComparedWithSame
(
alter
string
,
old
[]
string
,
current
[]
string
)
bool
{
func
(
Compare
)
LimitComparedWithSame
(
alter
string
,
old
[]
string
,
current
[]
string
)
bool
{
log
.
Println
(
alter
,
old
,
current
)
log
ger
.
Info
(
alter
,
old
,
current
)
logger
.
Info
.
Println
(
"old:"
,
strings
.
Join
(
old
,
","
),
"new: "
,
strings
.
Join
(
current
,
","
))
logger
.
Info
(
"old:"
,
strings
.
Join
(
old
,
","
),
"new: "
,
strings
.
Join
(
current
,
","
))
rs
:=
true
rs
:=
true
lastIndex
:=
len
(
current
)
-
1
lastIndex
:=
len
(
current
)
-
1
for
i
,
r
:=
range
current
{
for
i
,
r
:=
range
current
{
...
@@ -71,7 +70,7 @@ func (Compare) LimitComparedWithSame(alter string, old []string, current []strin
...
@@ -71,7 +70,7 @@ func (Compare) LimitComparedWithSame(alter string, old []string, current []strin
同比超过alter
同比超过alter
*/
*/
func
(
Compare
)
ComparedWithSame
(
alter
string
,
old
[]
string
,
current
[]
string
)
bool
{
func
(
Compare
)
ComparedWithSame
(
alter
string
,
old
[]
string
,
current
[]
string
)
bool
{
logger
.
Info
.
Println
(
"old:"
,
strings
.
Join
(
old
,
","
),
"new: "
,
strings
.
Join
(
current
,
","
))
logger
.
Info
(
"old:"
,
strings
.
Join
(
old
,
","
),
"new: "
,
strings
.
Join
(
current
,
","
))
rs
:=
true
rs
:=
true
lastIndex
:=
len
(
current
)
-
1
lastIndex
:=
len
(
current
)
-
1
for
i
,
r
:=
range
current
{
for
i
,
r
:=
range
current
{
...
@@ -104,7 +103,7 @@ func compareSame(alter string, old string, current string) bool {
...
@@ -104,7 +103,7 @@ func compareSame(alter string, old string, current string) bool {
func
parseToFloat
(
value
string
)
float64
{
func
parseToFloat
(
value
string
)
float64
{
rs
,
err
:=
strconv
.
ParseFloat
(
value
,
64
)
rs
,
err
:=
strconv
.
ParseFloat
(
value
,
64
)
if
nil
!=
err
{
if
nil
!=
err
{
logger
.
Error
.
Println
(
err
)
logger
.
Error
(
err
)
}
}
return
rs
return
rs
}
}
...
...
service/alarm/query_engine.go
View file @
1690b4b1
...
@@ -2,8 +2,8 @@ package alarm
...
@@ -2,8 +2,8 @@ package alarm
import
(
import
(
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
"reflect"
"reflect"
"strings"
"strings"
)
)
...
@@ -14,7 +14,7 @@ var msgBuilder = reflect.ValueOf(MsgBuilder{})
...
@@ -14,7 +14,7 @@ var msgBuilder = reflect.ValueOf(MsgBuilder{})
func
DealResult
(
res
[]
client
.
Result
,
strategies
[]
Strategy
)
{
func
DealResult
(
res
[]
client
.
Result
,
strategies
[]
Strategy
)
{
var
resIndex
=
0
var
resIndex
=
0
for
_
,
strategy
:=
range
strategies
{
for
_
,
strategy
:=
range
strategies
{
logger
.
Info
.
Println
(
"-------"
,
strategy
.
Name
,
resIndex
,
"---------"
)
logger
.
Info
(
"-------"
,
strategy
.
Name
,
resIndex
,
"---------"
)
operator
:=
strategy
.
Operator
operator
:=
strategy
.
Operator
operatorMethod
:=
operators
.
MethodByName
(
operator
)
operatorMethod
:=
operators
.
MethodByName
(
operator
)
buildMsgMethod
:=
msgBuilder
.
MethodByName
(
operator
)
buildMsgMethod
:=
msgBuilder
.
MethodByName
(
operator
)
...
@@ -50,7 +50,7 @@ func DealResult(res []client.Result, strategies []Strategy) {
...
@@ -50,7 +50,7 @@ func DealResult(res []client.Result, strategies []Strategy) {
params
[
j
]
=
reflect
.
ValueOf
(
arg
)
params
[
j
]
=
reflect
.
ValueOf
(
arg
)
}
}
params
[
alterValueLen
]
=
reflect
.
ValueOf
(
value
)
params
[
alterValueLen
]
=
reflect
.
ValueOf
(
value
)
logger
.
Info
.
Println
(
uniqueTag
,
":alter"
,
params
[
:
1
],
"-"
,
strings
.
Join
(
value
,
","
))
logger
.
Info
(
uniqueTag
,
":alter"
,
params
[
:
1
],
"-"
,
strings
.
Join
(
value
,
","
))
rs
:=
operatorMethod
.
Call
(
params
)
rs
:=
operatorMethod
.
Call
(
params
)
if
rs
[
0
]
.
Bool
()
{
//触发报警策略
if
rs
[
0
]
.
Bool
()
{
//触发报警策略
//Sender(tagValues, strategy.Name, s, notice) // 报警
//Sender(tagValues, strategy.Name, s, notice) // 报警
...
@@ -94,7 +94,7 @@ func DealResult(res []client.Result, strategies []Strategy) {
...
@@ -94,7 +94,7 @@ func DealResult(res []client.Result, strategies []Strategy) {
continue
continue
}
}
logger
.
Info
.
Println
(
k
,
":"
,
v
)
logger
.
Info
(
k
,
":"
,
v
)
rs
:=
operatorMethod
.
Call
(
v
)
rs
:=
operatorMethod
.
Call
(
v
)
//结果报警
//结果报警
if
rs
[
0
]
.
Bool
()
{
//触发报警策略
if
rs
[
0
]
.
Bool
()
{
//触发报警策略
...
...
service/conf/load_conf.go
View file @
1690b4b1
...
@@ -2,7 +2,7 @@ package conf
...
@@ -2,7 +2,7 @@ package conf
import
(
import
(
"encoding/json"
"encoding/json"
"git
.quantgroup.cn/DevOps/enoch/service/log
"
"git
hub.com/vrg0/go-common/logger
"
"io/ioutil"
"io/ioutil"
"net/http"
"net/http"
)
)
...
@@ -17,6 +17,6 @@ func Load(denv string, didc string) {
...
@@ -17,6 +17,6 @@ func Load(denv string, didc string) {
err
=
json
.
Unmarshal
(
bytes
,
&
GlobalConfig
)
err
=
json
.
Unmarshal
(
bytes
,
&
GlobalConfig
)
GlobalConfig
.
Env
=
denv
GlobalConfig
.
Env
=
denv
if
nil
!=
err
{
if
nil
!=
err
{
logger
.
Error
.
Println
(
err
)
logger
.
Error
(
err
)
}
}
}
}
service/consumer/brave_message_handler.go
View file @
1690b4b1
...
@@ -5,10 +5,9 @@ import (
...
@@ -5,10 +5,9 @@ import (
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis"
"github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"
log
"
"
github.com/vrg0/go-common/logger
"
"time"
"time"
)
)
...
@@ -24,13 +23,13 @@ func (BraveMessageHandler) MsgProcess(msg string) {
...
@@ -24,13 +23,13 @@ func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg
:=
make
([]
end_points
.
TraceMsg
,
3
)
//[]TraceMsg{}
traceMsg
:=
make
([]
end_points
.
TraceMsg
,
3
)
//[]TraceMsg{}
err
:=
json
.
Unmarshal
([]
byte
(
msg
),
&
traceMsg
)
err
:=
json
.
Unmarshal
([]
byte
(
msg
),
&
traceMsg
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"brave 解析msg失败:"
,
err
)
logger
.
Error
(
"brave 解析msg失败:"
,
err
)
}
}
msgInfluxProcess
(
traceMsg
)
msgInfluxProcess
(
traceMsg
)
}
}
func
(
BraveMessageHandler
)
Destroy
()
{
func
(
BraveMessageHandler
)
Destroy
()
{
if
len
(
pointSlice
)
>
0
{
if
len
(
pointSlice
)
>
0
{
logger
.
Info
.
Println
(
"braveMessageHandler 提交本地缓存数据:"
,
len
(
pointSlice
))
logger
.
Info
(
"braveMessageHandler 提交本地缓存数据:"
,
len
(
pointSlice
))
batchWrite
(
pointSlice
)
batchWrite
(
pointSlice
)
}
}
}
}
...
@@ -67,7 +66,7 @@ func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
...
@@ -67,7 +66,7 @@ func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
tags
[
"host"
]
=
traceMsg
.
LocalEndpoint
.
Ipv4
tags
[
"host"
]
=
traceMsg
.
LocalEndpoint
.
Ipv4
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
)
log
ger
.
Fatal
(
err
)
}
}
unix
:=
time
.
Unix
(
0
,
traceMsg
.
Timestamp
*
1000
)
unix
:=
time
.
Unix
(
0
,
traceMsg
.
Timestamp
*
1000
)
...
@@ -80,7 +79,7 @@ func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
...
@@ -80,7 +79,7 @@ func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
pointSlice
=
append
(
pointSlice
,
point
)
pointSlice
=
append
(
pointSlice
,
point
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Fatal
(
err
,
msg
)
log
ger
.
Fatal
(
err
,
msg
)
}
}
}
}
...
...
service/consumer/health_message_handler.go
View file @
1690b4b1
...
@@ -3,8 +3,8 @@ package consumer
...
@@ -3,8 +3,8 @@ package consumer
import
(
import
(
"encoding/json"
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
"math/big"
"math/big"
"net"
"net"
"time"
"time"
...
@@ -21,8 +21,8 @@ func (HealthMessageHandler) MsgProcess(msg string) {
...
@@ -21,8 +21,8 @@ func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg
:=
end_points
.
ChunkMsg
{}
chunkMsg
:=
end_points
.
ChunkMsg
{}
err
:=
json
.
Unmarshal
([]
byte
(
msg
),
&
chunkMsg
)
err
:=
json
.
Unmarshal
([]
byte
(
msg
),
&
chunkMsg
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"healthMessageHandler解析json失败:"
,
err
)
logger
.
Error
(
"healthMessageHandler解析json失败:"
,
err
)
logger
.
Error
.
Println
(
msg
)
logger
.
Error
(
msg
)
return
return
}
}
buildMsg
(
chunkMsg
)
buildMsg
(
chunkMsg
)
...
@@ -30,12 +30,12 @@ func (HealthMessageHandler) MsgProcess(msg string) {
...
@@ -30,12 +30,12 @@ func (HealthMessageHandler) MsgProcess(msg string) {
func
(
HealthMessageHandler
)
Destroy
()
{
func
(
HealthMessageHandler
)
Destroy
()
{
if
len
(
metricsPointSlice
)
>
0
{
if
len
(
metricsPointSlice
)
>
0
{
logger
.
Info
.
Println
(
"metricsMessageHandler 提交本地缓存数据:"
,
len
(
metricsPointSlice
))
logger
.
Info
(
"metricsMessageHandler 提交本地缓存数据:"
,
len
(
metricsPointSlice
))
batchWrite
(
metricsPointSlice
)
batchWrite
(
metricsPointSlice
)
}
}
if
len
(
healthPointSlice
)
>
0
{
if
len
(
healthPointSlice
)
>
0
{
logger
.
Info
.
Println
(
"HealthMessageHandler 提交本地缓存数据:"
,
len
(
healthPointSlice
))
logger
.
Info
(
"HealthMessageHandler 提交本地缓存数据:"
,
len
(
healthPointSlice
))
batchWrite
(
healthPointSlice
)
batchWrite
(
healthPointSlice
)
}
}
}
}
...
...
service/consumer/kafka_agent_sarama.go
View file @
1690b4b1
...
@@ -2,9 +2,9 @@ package consumer
...
@@ -2,9 +2,9 @@ package consumer
import
(
import
(
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/bsm/sarama-cluster"
"github.com/vrg0/go-common/logger"
"os"
"os"
"os/signal"
"os/signal"
"sync/atomic"
"sync/atomic"
...
@@ -32,7 +32,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
...
@@ -32,7 +32,7 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
_
=
consumer
.
Close
()
_
=
consumer
.
Close
()
messageHandle
.
Destroy
()
messageHandle
.
Destroy
()
atomic
.
AddInt32
(
&
consumerCount
,
-
1
)
atomic
.
AddInt32
(
&
consumerCount
,
-
1
)
logger
.
Info
.
Println
(
"consumer结束"
)
logger
.
Info
(
"consumer结束"
)
if
consumerCount
==
0
{
if
consumerCount
==
0
{
os
.
Exit
(
0
)
os
.
Exit
(
0
)
}
}
...
@@ -41,14 +41,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
...
@@ -41,14 +41,14 @@ func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler
// consume errors
// consume errors
go
func
()
{
go
func
()
{
for
err
:=
range
consumer
.
Errors
()
{
for
err
:=
range
consumer
.
Errors
()
{
logger
.
Error
.
Println
(
"consume error:"
,
err
.
Error
())
logger
.
Error
(
"consume error:"
,
err
.
Error
())
}
}
}()
}()
// consume notifications
// consume notifications
go
func
()
{
go
func
()
{
for
ntf
:=
range
consumer
.
Notifications
()
{
for
ntf
:=
range
consumer
.
Notifications
()
{
logger
.
Info
.
Printf
(
"Rebalanced: %+v
\n
"
,
ntf
)
logger
.
Info
(
"Rebalanced: %+v
\n
"
,
ntf
)
}
}
}()
}()
...
...
service/consumer/util.go
View file @
1690b4b1
...
@@ -5,13 +5,13 @@ import (
...
@@ -5,13 +5,13 @@ import (
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
)
)
func
batchWrite
(
pointArray
[]
*
client
.
Point
)
{
func
batchWrite
(
pointArray
[]
*
client
.
Point
)
{
if
file_cache
.
Enabled
()
{
if
file_cache
.
Enabled
()
{
logger
.
Info
.
Println
(
"写入缓存"
)
logger
.
Info
(
"写入缓存"
)
fileWrite
(
pointArray
)
fileWrite
(
pointArray
)
}
else
{
}
else
{
err
:=
httpWrite
(
pointArray
)
err
:=
httpWrite
(
pointArray
)
...
@@ -19,7 +19,7 @@ func batchWrite(pointArray []*client.Point) {
...
@@ -19,7 +19,7 @@ func batchWrite(pointArray []*client.Point) {
file_cache
.
OpenCache
()
file_cache
.
OpenCache
()
fileWrite
(
pointArray
)
fileWrite
(
pointArray
)
}
}
logger
.
Info
.
Println
(
"写入influx"
,
len
(
pointArray
))
logger
.
Info
(
"写入influx"
,
len
(
pointArray
))
}
}
}
}
...
@@ -48,7 +48,7 @@ func ReSubmit(data []string) error {
...
@@ -48,7 +48,7 @@ func ReSubmit(data []string) error {
cp
:=
file_cache
.
CreateCachePoint
(
v
)
cp
:=
file_cache
.
CreateCachePoint
(
v
)
point
,
err
:=
client
.
NewPoint
(
cp
.
Name
,
cp
.
Tags
,
cp
.
Fields
,
cp
.
Time
)
point
,
err
:=
client
.
NewPoint
(
cp
.
Name
,
cp
.
Tags
,
cp
.
Fields
,
cp
.
Time
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"构造client.point异常"
,
err
)
logger
.
Error
(
"构造client.point异常"
,
err
)
}
}
pointSlice
=
append
(
pointSlice
,
point
)
pointSlice
=
append
(
pointSlice
,
point
)
if
len
(
pointSlice
)
>
1000
{
if
len
(
pointSlice
)
>
1000
{
...
@@ -56,19 +56,19 @@ func ReSubmit(data []string) error {
...
@@ -56,19 +56,19 @@ func ReSubmit(data []string) error {
if
err
!=
nil
{
if
err
!=
nil
{
return
err
return
err
}
}
logger
.
Info
.
Println
(
"缓存重新提交:1000"
)
logger
.
Info
(
"缓存重新提交:1000"
)
pointSlice
=
make
([]
*
client
.
Point
,
0
)
pointSlice
=
make
([]
*
client
.
Point
,
0
)
}
}
}
}
if
len
(
pointSlice
)
>
0
{
if
len
(
pointSlice
)
>
0
{
err
:=
httpWrite
(
pointSlice
)
err
:=
httpWrite
(
pointSlice
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Info
.
Println
(
pointSlice
)
logger
.
Info
(
pointSlice
)
return
err
return
err
}
}
logger
.
Info
.
Println
(
"缓存重新提交:"
,
len
(
pointSlice
))
logger
.
Info
(
"缓存重新提交:"
,
len
(
pointSlice
))
}
}
logger
.
Info
.
Println
(
"重新提交"
)
logger
.
Info
(
"重新提交"
)
return
nil
return
nil
}
}
...
...
service/continuous_queries/apdex_continuous_query.go
View file @
1690b4b1
...
@@ -4,7 +4,7 @@ import (
...
@@ -4,7 +4,7 @@ import (
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git
.quantgroup.cn/DevOps/enoch/service/log
"
"git
hub.com/vrg0/go-common/logger
"
"net"
"net"
"os"
"os"
"strings"
"strings"
...
@@ -20,7 +20,7 @@ var (
...
@@ -20,7 +20,7 @@ var (
func
query
(
dbName
string
)
[]
string
{
func
query
(
dbName
string
)
[]
string
{
cq
:=
data
.
QueryMonitor
(
SHOW_CQ_SQL
)
cq
:=
data
.
QueryMonitor
(
SHOW_CQ_SQL
)
logger
.
Info
.
Println
(
cq
)
logger
.
Info
(
cq
)
cqName
:=
make
([]
string
,
0
)
cqName
:=
make
([]
string
,
0
)
for
_
,
row
:=
range
cq
[
0
]
.
Series
{
for
_
,
row
:=
range
cq
[
0
]
.
Series
{
if
row
.
Name
==
dbName
{
if
row
.
Name
==
dbName
{
...
@@ -43,7 +43,7 @@ func delete(db string, cqName [] string) bool {
...
@@ -43,7 +43,7 @@ func delete(db string, cqName [] string) bool {
func
create
(
apdexThreshold
conf
.
ApdexThreshold
)
{
func
create
(
apdexThreshold
conf
.
ApdexThreshold
)
{
sysName
:=
data
.
QuerySysName
()
sysName
:=
data
.
QuerySysName
()
logger
.
Info
.
Println
(
sysName
)
logger
.
Info
(
sysName
)
sql
:=
strings
.
Builder
{}
sql
:=
strings
.
Builder
{}
for
_
,
name
:=
range
sysName
{
for
_
,
name
:=
range
sysName
{
cqName
:=
strings
.
Replace
(
name
,
"-"
,
"_"
,
10
)
cqName
:=
strings
.
Replace
(
name
,
"-"
,
"_"
,
10
)
...
@@ -68,11 +68,11 @@ func buildTolSql(cqName string, sysName string, threshold int) string {
...
@@ -68,11 +68,11 @@ func buildTolSql(cqName string, sysName string, threshold int) string {
func
Load
()
{
func
Load
()
{
name
:=
query
(
data
.
MONITOR
)
name
:=
query
(
data
.
MONITOR
)
logger
.
Info
.
Println
(
"old: "
,
name
)
logger
.
Info
(
"old: "
,
name
)
delete
(
data
.
MONITOR
,
name
)
delete
(
data
.
MONITOR
,
name
)
create
(
conf
.
GlobalConfig
.
ApdexThreshold
)
create
(
conf
.
GlobalConfig
.
ApdexThreshold
)
name
=
query
(
data
.
MONITOR
)
name
=
query
(
data
.
MONITOR
)
logger
.
Info
.
Println
(
"new: "
,
name
)
logger
.
Info
(
"new: "
,
name
)
}
}
...
@@ -86,7 +86,7 @@ func checkIp(ip string) bool {
...
@@ -86,7 +86,7 @@ func checkIp(ip string) bool {
// 检查ip地址判断是否回环地址
// 检查ip地址判断是否回环地址
if
ipnet
,
ok
:=
address
.
(
*
net
.
IPNet
);
ok
&&
!
ipnet
.
IP
.
IsLoopback
()
{
if
ipnet
,
ok
:=
address
.
(
*
net
.
IPNet
);
ok
&&
!
ipnet
.
IP
.
IsLoopback
()
{
if
ipnet
.
IP
.
To4
()
!=
nil
{
if
ipnet
.
IP
.
To4
()
!=
nil
{
logger
.
Info
.
Println
(
ipnet
.
IP
.
String
())
logger
.
Info
(
ipnet
.
IP
.
String
())
return
ipnet
.
IP
.
String
()
==
ip
return
ipnet
.
IP
.
String
()
==
ip
}
}
}
}
...
...
service/data/influxdb_conf.go
View file @
1690b4b1
...
@@ -3,8 +3,8 @@ package data
...
@@ -3,8 +3,8 @@ package data
import
(
import
(
"bytes"
"bytes"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/client/v2"
"github.com/vrg0/go-common/logger"
)
)
func
NewClient
()
client
.
Client
{
func
NewClient
()
client
.
Client
{
...
@@ -14,7 +14,7 @@ func NewClient() client.Client {
...
@@ -14,7 +14,7 @@ func NewClient() client.Client {
con
,
err
:=
client
.
NewHTTPClient
(
httpConfig
)
con
,
err
:=
client
.
NewHTTPClient
(
httpConfig
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
err
.
Error
())
logger
.
Error
(
err
.
Error
())
}
}
return
con
return
con
...
@@ -26,10 +26,10 @@ func Query(sql string, db string) []client.Result {
...
@@ -26,10 +26,10 @@ func Query(sql string, db string) []client.Result {
res
,
err
:=
con
.
Query
(
q
)
res
,
err
:=
con
.
Query
(
q
)
if
nil
!=
err
{
if
nil
!=
err
{
logger
.
Error
.
Println
(
"influxdb client init error"
,
err
)
logger
.
Error
(
"influxdb client init error"
,
err
)
}
}
if
nil
!=
res
.
Error
()
{
if
nil
!=
res
.
Error
()
{
logger
.
Error
.
Println
(
"query error"
,
db
,
sql
,
res
.
Error
())
logger
.
Error
(
"query error"
,
db
,
sql
,
res
.
Error
())
}
}
return
res
.
Results
return
res
.
Results
}
}
...
...
service/dingding/ding.go
View file @
1690b4b1
package
dingding
package
dingding
import
(
"encoding/json"
"net/http"
"strings"
)
var
DefaultDingURL
=
[]
string
{
"https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"
}
var
DefaultDingURL
=
[]
string
{
"https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"
}
type
DinDingMsg
struct
{
MsgType
string
`json:"msgtype"`
// Link Link `json:"link"`
Text
Text
`json:"text"`
}
type
Text
struct
{
Content
string
`json:"content"`
}
func
SenderDingDing
(
info
string
,
receiver
[]
string
)
error
{
bodyStr
:=
buildDingDingMsg
(
info
)
for
_
,
r
:=
range
receiver
{
data
:=
strings
.
NewReader
(
string
(
bodyStr
))
_
,
e
:=
http
.
Post
(
r
,
"application/json;charset=utf-8"
,
data
)
if
e
!=
nil
{
return
e
}
}
return
nil
}
func
buildDingDingMsg
(
info
string
)
[]
byte
{
msg
:=
DinDingMsg
{
MsgType
:
"text"
,
Text
:
Text
{
Content
:
info
,
},
}
msgStr
,
_
:=
json
.
Marshal
(
msg
)
return
msgStr
}
service/dingding/ding_test.go
deleted
100644 → 0
View file @
9c377c78
package
dingding
import
"testing"
var
testDingUrl
=
[]
string
{
"https://oapi.dingtalk.com/robot/send?access_token=05b2c46e460e94b32044251c0f8af666c55a4600758fda508120f962d589ea47"
,
}
func
TestSenderDingDing
(
t
*
testing
.
T
)
{
e
:=
SenderDingDing
(
"叮叮测试,吧啦吧啦吧"
,
testDingUrl
)
if
e
!=
nil
{
t
.
Error
(
e
)
}
}
service/file_cache/file_cache.go
View file @
1690b4b1
...
@@ -3,8 +3,8 @@ package file_cache
...
@@ -3,8 +3,8 @@ package file_cache
import
(
import
(
"bufio"
"bufio"
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"github.com/robfig/cron"
"github.com/vrg0/go-common/logger"
"io"
"io"
"io/ioutil"
"io/ioutil"
"os"
"os"
...
@@ -76,17 +76,17 @@ func create() {
...
@@ -76,17 +76,17 @@ func create() {
currentFile
.
fileName
=
currentFile
.
path
+
strconv
.
FormatInt
(
current
,
10
)
currentFile
.
fileName
=
currentFile
.
path
+
strconv
.
FormatInt
(
current
,
10
)
file
,
err
:=
os
.
Create
(
currentFile
.
fileName
+
WRITING
)
file
,
err
:=
os
.
Create
(
currentFile
.
fileName
+
WRITING
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"创建缓存文件失败"
,
err
)
logger
.
Error
(
"创建缓存文件失败"
,
err
)
}
}
logger
.
Info
.
Println
(
"打开缓存文件"
,
currentFile
.
fileName
)
logger
.
Info
(
"打开缓存文件"
,
currentFile
.
fileName
)
currentFile
.
file
=
file
currentFile
.
file
=
file
currentFile
.
fileSize
.
Add
(
1
)
currentFile
.
fileSize
.
Add
(
1
)
logger
.
Info
.
Println
(
"文件缓存数:"
,
currentFile
.
fileSize
.
Get
())
logger
.
Info
(
"文件缓存数:"
,
currentFile
.
fileSize
.
Get
())
}
}
func
closed
()
{
func
closed
()
{
defer
os
.
Rename
(
currentFile
.
fileName
+
WRITING
,
currentFile
.
fileName
+
CACHE
)
defer
os
.
Rename
(
currentFile
.
fileName
+
WRITING
,
currentFile
.
fileName
+
CACHE
)
logger
.
Info
.
Println
(
"关闭缓存文件"
)
logger
.
Info
(
"关闭缓存文件"
)
currentFile
.
file
.
Close
()
currentFile
.
file
.
Close
()
}
}
...
@@ -94,19 +94,19 @@ func Delete() {
...
@@ -94,19 +94,19 @@ func Delete() {
fileNames
:=
scan
(
DELETE
)
fileNames
:=
scan
(
DELETE
)
for
_
,
name
:=
range
fileNames
{
for
_
,
name
:=
range
fileNames
{
os
.
Remove
(
currentFile
.
path
+
name
)
os
.
Remove
(
currentFile
.
path
+
name
)
logger
.
Info
.
Println
(
"删除缓存文件"
,
name
)
logger
.
Info
(
"删除缓存文件"
,
name
)
}
}
}
}
func
pickup
()
{
func
pickup
()
{
logger
.
Info
.
Println
(
"获取缓存文件数量"
)
logger
.
Info
(
"获取缓存文件数量"
)
files
:=
scan
(
CACHE
)
files
:=
scan
(
CACHE
)
currentFile
.
fileSize
.
Add
(
len
(
files
))
currentFile
.
fileSize
.
Add
(
len
(
files
))
}
}
func
mark
()
{
func
mark
()
{
fileName
:=
scan
(
WRITING
)
fileName
:=
scan
(
WRITING
)
logger
.
Info
.
Println
(
"重新标记缓存文件"
)
logger
.
Info
(
"重新标记缓存文件"
)
for
_
,
name
:=
range
fileName
{
for
_
,
name
:=
range
fileName
{
os
.
Rename
(
currentFile
.
path
+
name
,
currentFile
.
path
+
strings
.
Split
(
name
,
"."
)[
0
]
+
CACHE
)
os
.
Rename
(
currentFile
.
path
+
name
,
currentFile
.
path
+
strings
.
Split
(
name
,
"."
)[
0
]
+
CACHE
)
}
}
...
@@ -120,12 +120,12 @@ func Recover(submit func(data []string) error) {
...
@@ -120,12 +120,12 @@ func Recover(submit func(data []string) error) {
for
_
,
name
:=
range
fileNames
{
for
_
,
name
:=
range
fileNames
{
err
:=
submit
(
Read
(
currentFile
.
path
+
name
))
err
:=
submit
(
Read
(
currentFile
.
path
+
name
))
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"重新提交缓存异常:"
,
err
)
logger
.
Error
(
"重新提交缓存异常:"
,
err
)
return
return
}
}
os
.
Rename
(
currentFile
.
path
+
name
,
currentFile
.
path
+
strings
.
Split
(
name
,
"."
)[
0
]
+
DELETE
)
os
.
Rename
(
currentFile
.
path
+
name
,
currentFile
.
path
+
strings
.
Split
(
name
,
"."
)[
0
]
+
DELETE
)
currentFile
.
fileSize
.
Add
(
-
1
)
currentFile
.
fileSize
.
Add
(
-
1
)
logger
.
Info
.
Println
(
"文件缓存数:"
,
currentFile
.
fileSize
.
Get
())
logger
.
Info
(
"文件缓存数:"
,
currentFile
.
fileSize
.
Get
())
}
}
go
Delete
()
go
Delete
()
}
}
...
@@ -154,7 +154,7 @@ func scan(suffix string) []string {
...
@@ -154,7 +154,7 @@ func scan(suffix string) []string {
func
Read
(
fileName
string
)
[]
string
{
func
Read
(
fileName
string
)
[]
string
{
file
,
err
:=
os
.
Open
(
fileName
)
file
,
err
:=
os
.
Open
(
fileName
)
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
"未找到对应的文件:"
,
err
)
logger
.
Error
(
"未找到对应的文件:"
,
err
)
}
}
reader
:=
bufio
.
NewReader
(
file
)
reader
:=
bufio
.
NewReader
(
file
)
data
:=
make
([]
string
,
0
)
data
:=
make
([]
string
,
0
)
...
@@ -188,7 +188,7 @@ func RegisterJob(submit func(data []string) error) {
...
@@ -188,7 +188,7 @@ func RegisterJob(submit func(data []string) error) {
c
:=
cron
.
New
()
c
:=
cron
.
New
()
err
:=
c
.
AddFunc
(
"@every 1m"
,
func
()
{
err
:=
c
.
AddFunc
(
"@every 1m"
,
func
()
{
if
!
Enabled
()
{
if
!
Enabled
()
{
logger
.
Info
.
Println
(
"开始扫描缓存文件"
)
logger
.
Info
(
"开始扫描缓存文件"
)
Recover
(
submit
)
Recover
(
submit
)
}
}
})
})
...
...
service/file_cache/switcher.go
View file @
1690b4b1
...
@@ -2,7 +2,7 @@ package file_cache
...
@@ -2,7 +2,7 @@ package file_cache
import
(
import
(
"encoding/json"
"encoding/json"
"git
.quantgroup.cn/DevOps/enoch/service/log
"
"git
hub.com/vrg0/go-common/logger
"
"net/http"
"net/http"
"strings"
"strings"
"sync"
"sync"
...
@@ -44,7 +44,7 @@ func (s *switcher) status() bool {
...
@@ -44,7 +44,7 @@ func (s *switcher) status() bool {
current
:=
time
.
Now
()
.
Unix
()
current
:=
time
.
Now
()
.
Unix
()
diff
:=
current
-
s
.
origin
diff
:=
current
-
s
.
origin
if
diff
>=
60
{
if
diff
>=
60
{
logger
.
Info
.
Println
(
"缓存切换"
)
logger
.
Info
(
"缓存切换"
)
s
.
turnOff
()
s
.
turnOff
()
}
}
}
}
...
@@ -75,7 +75,7 @@ func OpenCache() {
...
@@ -75,7 +75,7 @@ func OpenCache() {
func
senderDingDing
()
{
func
senderDingDing
()
{
_
,
err
:=
http
.
Post
(
url
,
contentType
,
strings
.
NewReader
(
alterMsg
))
_
,
err
:=
http
.
Post
(
url
,
contentType
,
strings
.
NewReader
(
alterMsg
))
if
err
!=
nil
{
if
err
!=
nil
{
logger
.
Error
.
Println
(
err
)
logger
.
Error
(
err
)
}
}
}
}
...
@@ -88,7 +88,7 @@ func buildDingDingMsg() string {
...
@@ -88,7 +88,7 @@ func buildDingDingMsg() string {
}
}
msgStr
,
err
:=
json
.
Marshal
(
msg
)
msgStr
,
err
:=
json
.
Marshal
(
msg
)
if
nil
!=
err
{
if
nil
!=
err
{
logger
.
Error
.
Println
(
"无法序列化ding ding msg"
,
err
)
logger
.
Error
(
"无法序列化ding ding msg"
,
err
)
}
}
return
string
(
msgStr
)
return
string
(
msgStr
)
}
}
...
...
service/job/alarm.go
View file @
1690b4b1
...
@@ -3,8 +3,8 @@ package job
...
@@ -3,8 +3,8 @@ package job
import
(
import
(
"fmt"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"github.com/robfig/cron"
"github.com/vrg0/go-common/logger"
"net"
"net"
"os"
"os"
"time"
"time"
...
@@ -17,7 +17,7 @@ func AutoAlarm() {
...
@@ -17,7 +17,7 @@ func AutoAlarm() {
c
:=
cron
.
New
()
c
:=
cron
.
New
()
err
:=
c
.
AddFunc
(
"@every 1m"
,
func
()
{
err
:=
c
.
AddFunc
(
"@every 1m"
,
func
()
{
logger
.
Info
.
Println
(
"开始执行定时任务"
,
time
.
Now
()
.
Minute
())
logger
.
Info
(
"开始执行定时任务"
,
time
.
Now
()
.
Minute
())
alarm
.
Load
()
alarm
.
Load
()
})
})
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -37,7 +37,7 @@ func CheckIp(ip string) bool {
...
@@ -37,7 +37,7 @@ func CheckIp(ip string) bool {
// 检查ip地址判断是否回环地址
// 检查ip地址判断是否回环地址
if
ipnet
,
ok
:=
address
.
(
*
net
.
IPNet
);
ok
&&
!
ipnet
.
IP
.
IsLoopback
()
{
if
ipnet
,
ok
:=
address
.
(
*
net
.
IPNet
);
ok
&&
!
ipnet
.
IP
.
IsLoopback
()
{
if
ipnet
.
IP
.
To4
()
!=
nil
{
if
ipnet
.
IP
.
To4
()
!=
nil
{
logger
.
Info
.
Println
(
ipnet
.
IP
.
String
())
logger
.
Info
(
ipnet
.
IP
.
String
())
return
ipnet
.
IP
.
String
()
==
ip
return
ipnet
.
IP
.
String
()
==
ip
}
}
}
}
...
...
service/log/conf.go
deleted
100644 → 0
View file @
9c377c78
package
logger
import
(
"io"
"log"
"os"
)
var
(
Info
*
log
.
Logger
Warning
*
log
.
Logger
Error
*
log
.
Logger
)
func
init
(){
file
,
err
:=
os
.
OpenFile
(
"quantgroup.log"
,
os
.
O_RDWR
|
os
.
O_CREATE
,
0666
)
if
err
!=
nil
{
log
.
Fatalln
(
"打开日志文件失败:"
,
err
)
}
Info
=
log
.
New
(
io
.
MultiWriter
(
os
.
Stderr
,
file
),
"Info:"
,
log
.
Ldate
|
log
.
Ltime
|
log
.
Lshortfile
)
Warning
=
log
.
New
(
io
.
MultiWriter
(
os
.
Stderr
,
file
),
"Warning:"
,
log
.
Ldate
|
log
.
Ltime
|
log
.
Lshortfile
)
Error
=
log
.
New
(
io
.
MultiWriter
(
os
.
Stderr
,
file
),
"Error:"
,
log
.
Ldate
|
log
.
Ltime
|
log
.
Lshortfile
)
}
\ No newline at end of file
service/node-check/node_check.go
View file @
1690b4b1
...
@@ -3,8 +3,9 @@ package node_check
...
@@ -3,8 +3,9 @@ package node_check
import
(
import
(
"context"
"context"
"git.quantgroup.cn/DevOps/enoch/service/dingding"
"git.quantgroup.cn/DevOps/enoch/service/dingding"
"git.quantgroup.cn/DevOps/enoch/service/log"
"git.quantgroup.cn/DevOps/enoch/service/registry"
"git.quantgroup.cn/DevOps/enoch/service/registry"
"github.com/vrg0/go-common/ding"
"github.com/vrg0/go-common/logger"
"net/http"
"net/http"
"os"
"os"
"sync"
"sync"
...
@@ -53,14 +54,14 @@ func handler(serviceName string) {
...
@@ -53,14 +54,14 @@ func handler(serviceName string) {
if
url
,
ok
:=
HandlerMap
.
Load
(
serviceName
);
ok
{
if
url
,
ok
:=
HandlerMap
.
Load
(
serviceName
);
ok
{
for
i
:=
0
;
i
<
HttpGetRetryCount
;
i
++
{
for
i
:=
0
;
i
<
HttpGetRetryCount
;
i
++
{
if
resp
,
e
:=
httpGet
(
url
.
(
string
),
HttpTimeOut
);
e
!=
nil
{
if
resp
,
e
:=
httpGet
(
url
.
(
string
),
HttpTimeOut
);
e
!=
nil
{
logger
.
Error
.
Print
(
" handler service: "
,
serviceName
,
" "
,
e
)
logger
.
Error
(
" handler service: "
,
serviceName
,
" "
,
e
)
}
else
{
}
else
{
logger
.
Info
.
Print
(
" handler service: "
,
serviceName
,
" "
,
resp
.
StatusCode
)
logger
.
Info
(
" handler service: "
,
serviceName
,
" "
,
resp
.
StatusCode
)
break
break
}
}
}
}
}
else
{
}
else
{
logger
.
Info
.
Print
(
" handler service: "
,
serviceName
,
" "
,
"not found handler hook api"
)
logger
.
Info
(
" handler service: "
,
serviceName
,
" "
,
"not found handler hook api"
)
}
}
}
}
...
@@ -103,9 +104,9 @@ func (w watch) UpdateNodes(service *registry.Service) {
...
@@ -103,9 +104,9 @@ func (w watch) UpdateNodes(service *registry.Service) {
for
_
,
node
:=
range
service
.
NodeMap
{
for
_
,
node
:=
range
service
.
NodeMap
{
if
oldNode
,
ok
:=
oldService
.
NodeMap
[
node
.
Id
];
ok
{
if
oldNode
,
ok
:=
oldService
.
NodeMap
[
node
.
Id
];
ok
{
if
oldNode
.
Status
==
Passing
&&
node
.
Status
==
Critical
{
if
oldNode
.
Status
==
Passing
&&
node
.
Status
==
Critical
{
logger
.
Warn
ing
.
Print
(
service
.
Name
,
" "
,
node
.
Id
,
"---!!!node critical!!!---"
)
logger
.
Warn
(
service
.
Name
,
" "
,
node
.
Id
,
"---!!!node critical!!!---"
)
if
_
,
ok
:=
IgnoreServiceMap
[
service
.
Name
];
!
ok
{
if
_
,
ok
:=
IgnoreServiceMap
[
service
.
Name
];
!
ok
{
_
=
ding
ding
.
SenderDingDing
(
service
.
Name
+
" "
+
node
.
Id
+
" "
+
"---!!!node critical!!!---"
,
dingding
.
DefaultDingURL
)
_
=
ding
.
SendText
(
service
.
Name
+
" "
+
node
.
Id
+
" "
+
"---!!!node critical!!!---"
,
dingding
.
DefaultDingURL
...
)
}
}
}
}
}
}
...
@@ -116,13 +117,13 @@ func (w watch) UpdateNodes(service *registry.Service) {
...
@@ -116,13 +117,13 @@ func (w watch) UpdateNodes(service *registry.Service) {
//如果 服务存在,并且服务的old状态为passing,并且服务的now状态为critical,则报警,否贼记录服务状态
//如果 服务存在,并且服务的old状态为passing,并且服务的now状态为critical,则报警,否贼记录服务状态
serviceString
:=
serviceStr
(
service
)
serviceString
:=
serviceStr
(
service
)
if
oldService
,
ok
:=
servicesStatus
[
service
.
Name
];
ok
&&
serviceStatus
(
oldService
)
&&
!
serviceStatus
(
service
)
{
if
oldService
,
ok
:=
servicesStatus
[
service
.
Name
];
ok
&&
serviceStatus
(
oldService
)
&&
!
serviceStatus
(
service
)
{
logger
.
Warn
ing
.
Print
(
serviceString
,
"---!!!service critical!!!---"
)
logger
.
Warn
(
serviceString
,
"---!!!service critical!!!---"
)
if
_
,
ok
:=
IgnoreServiceMap
[
service
.
Name
];
!
ok
{
if
_
,
ok
:=
IgnoreServiceMap
[
service
.
Name
];
!
ok
{
_
=
ding
ding
.
SenderDingDing
(
serviceString
+
"---!!!service critical!!!---"
,
dingding
.
DefaultDingURL
)
_
=
ding
.
SendText
(
serviceString
+
"---!!!service critical!!!---"
,
dingding
.
DefaultDingURL
...
)
}
}
handler
(
service
.
Name
)
handler
(
service
.
Name
)
}
else
{
}
else
{
logger
.
Info
.
Print
(
serviceString
)
logger
.
Info
(
serviceString
)
}
}
//更新服务状态
//更新服务状态
...
@@ -165,8 +166,8 @@ func InitServiceStatus() {
...
@@ -165,8 +166,8 @@ func InitServiceStatus() {
func
NodeCheck
()
{
func
NodeCheck
()
{
defer
func
()
{
defer
func
()
{
if
e
:=
recover
();
e
!=
nil
{
if
e
:=
recover
();
e
!=
nil
{
logger
.
Info
.
Print
(
"node check panic: "
,
e
)
logger
.
Info
(
"node check panic: "
,
e
)
_
=
ding
ding
.
SenderDingDing
(
"node check panic!"
,
dingding
.
DefaultDingURL
)
_
=
ding
.
SendText
(
"node check panic!"
,
dingding
.
DefaultDingURL
...
)
time
.
Sleep
(
time
.
Second
*
1
)
time
.
Sleep
(
time
.
Second
*
1
)
NodeCheck
()
NodeCheck
()
}
}
...
@@ -176,7 +177,7 @@ func NodeCheck() {
...
@@ -176,7 +177,7 @@ func NodeCheck() {
dc
:=
"3c"
dc
:=
"3c"
cluster
:=
[]
string
{
"172.30.12.2:8500"
,
"172.30.12.3:8500"
,
"172.30.12.4:8500"
}
cluster
:=
[]
string
{
"172.30.12.2:8500"
,
"172.30.12.3:8500"
,
"172.30.12.4:8500"
}
if
e
:=
registry
.
Init
(
"consul"
,
map
[
string
]
interface
{}{
"dc"
:
dc
,
"cluster"
:
cluster
});
e
!=
nil
{
if
e
:=
registry
.
Init
(
"consul"
,
map
[
string
]
interface
{}{
"dc"
:
dc
,
"cluster"
:
cluster
});
e
!=
nil
{
logger
.
Info
.
Print
(
"registry init error:"
,
e
)
logger
.
Info
(
"registry init error:"
,
e
)
os
.
Exit
(
-
1
)
os
.
Exit
(
-
1
)
}
}
time
.
Sleep
(
time
.
Second
*
1
)
time
.
Sleep
(
time
.
Second
*
1
)
...
@@ -186,7 +187,7 @@ func NodeCheck() {
...
@@ -186,7 +187,7 @@ func NodeCheck() {
//设置观察者
//设置观察者
if
e
:=
registry
.
SetObserver
(
"watch"
,
&
watch
{});
e
!=
nil
{
if
e
:=
registry
.
SetObserver
(
"watch"
,
&
watch
{});
e
!=
nil
{
logger
.
Info
.
Print
(
"set observer error:"
,
e
)
logger
.
Info
(
"set observer error:"
,
e
)
os
.
Exit
(
-
1
)
os
.
Exit
(
-
1
)
}
}
...
...
service/registry/consul.go
View file @
1690b4b1
...
@@ -5,7 +5,7 @@ import (
...
@@ -5,7 +5,7 @@ import (
"encoding/gob"
"encoding/gob"
"errors"
"errors"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/consul/
api/
watch"
"reflect"
"reflect"
"strconv"
"strconv"
"sync"
"sync"
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment