Commit b8fbf8a2 authored by Node- 门 忠鑫's avatar Node- 门 忠鑫

# 修改缓存路径的配置方式

parent 1ef8676f
...@@ -2,3 +2,4 @@ ...@@ -2,3 +2,4 @@
*.log *.log
enoch enoch
go.sum go.sum
test.go
\ No newline at end of file
...@@ -4,7 +4,6 @@ require ( ...@@ -4,7 +4,6 @@ require (
github.com/DataDog/zstd v1.3.4 // indirect github.com/DataDog/zstd v1.3.4 // indirect
github.com/Shopify/sarama v1.20.0 github.com/Shopify/sarama v1.20.0
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect
github.com/Unknwon/goconfig v0.0.0-20181105214110-56bd8ab18619
github.com/bsm/sarama-cluster v2.1.15+incompatible github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/eapache/go-resiliency v1.1.0 // indirect github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
...@@ -12,12 +11,13 @@ require ( ...@@ -12,12 +11,13 @@ require (
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/influxdata/influxdb v1.7.2 github.com/influxdata/influxdb v1.7.2
github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect
github.com/json-iterator/go v1.1.6
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect github.com/onsi/gomega v1.4.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/timest/env v0.0.0-20180717050204-5fce78d35255
golang.org/x/text v0.3.0
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
) )
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"git.quantgroup.cn/DevOps/enoch/service/conf" "git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/continuous_queries" "git.quantgroup.cn/DevOps/enoch/service/continuous_queries"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/job" "git.quantgroup.cn/DevOps/enoch/service/job"
"log" "log"
"net/http" "net/http"
...@@ -29,7 +30,9 @@ func main() { ...@@ -29,7 +30,9 @@ func main() {
if err != nil { if err != nil {
log.Fatalln("create file error", err) log.Fatalln("create file error", err)
} }
file_cache.Load(conf.GlobalConfig.FileCachePath)
file_cache.RegisterJob(service.ReSubmit)
go file_cache.Delete()
port := conf.GlobalConfig.Port port := conf.GlobalConfig.Port
logger := log.New(file, "[Info]", log.LstdFlags|log.Llongfile) logger := log.New(file, "[Info]", log.LstdFlags|log.Llongfile)
...@@ -66,3 +69,5 @@ func main() { ...@@ -66,3 +69,5 @@ func main() {
} }
} }
...@@ -11,6 +11,7 @@ type Config struct { ...@@ -11,6 +11,7 @@ type Config struct {
InfluxDb InfluxDb `json:"influx_db"` InfluxDb InfluxDb `json:"influx_db"`
ApdexThreshold ApdexThreshold `json:"apdex_threshold"` ApdexThreshold ApdexThreshold `json:"apdex_threshold"`
StrategyConfPath string `json:"strategy_conf_path"` StrategyConfPath string `json:"strategy_conf_path"`
FileCachePath string `json:"file_cache_path"`
} }
type Redis struct { type Redis struct {
......
package file_cache
import (
"encoding/json"
"github.com/influxdata/influxdb/client/v2"
"github.com/json-iterator/go"
"strconv"
"time"
"unsafe"
)
type CachePoint struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Time time.Time `json:"time"`
}
func NewPoint(point *client.Point) CachePoint {
cp := CachePoint{}
cp.Name = point.Name()
cp.Time = point.Time()
cp.Fields, _ = point.Fields()
cp.Tags = point.Tags()
return cp
}
func CreateCachePoint(data string) *CachePoint {
cp := CachePoint{}
jsoniter.Unmarshal([]byte(data), &cp)
if v, ok := cp.Fields["system_load_average"]; ok {
switch v.(type) {
case float64:
break
case int:
cp.Fields["system_load_average"],_ = strconv.ParseFloat(strconv.Itoa(v.(int)), 64)
break
case int64:
cp.Fields["system_load_average"],_ = strconv.ParseFloat(strconv.FormatInt(v.(int64),10), 64)
break
}
}
return &cp
}
func init() {
decodeNumberAsInt64IfPossible := func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
switch iter.WhatIsNext() {
case jsoniter.NumberValue:
var number json.Number
iter.ReadVal(&number)
i, err := strconv.ParseInt(string(number), 10, 64)
if err == nil {
*(*interface{})(ptr) = i
return
}
f, err := strconv.ParseFloat(string(number), 64)
if err == nil {
*(*interface{})(ptr) = f
return
}
// Not much we can do here.
default:
*(*interface{})(ptr) = iter.Read()
}
}
jsoniter.RegisterTypeDecoderFunc("interface {}", decodeNumberAsInt64IfPossible)
}
package file_cache
import (
"bufio"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
WRITING = ".writing"
DELETE = ".delete"
CACHE = ".cache"
)
type fileHolder struct {
fileName string
path string
file *os.File
fileSize fileSize
}
type fileSize struct {
size int
lock sync.Mutex
}
func (fs *fileSize) Get() int {
return fs.size
}
func (fs *fileSize) Add(offset int) {
fs.lock.Lock()
defer fs.lock.Unlock()
fs.size = fs.size + offset
if fs.size < 0 {
fs.size = 0
return
}
}
var currentFile *fileHolder
var fileCacheOnce sync.Once
func Load(path string) {
if path == "" {
path = "../"
}
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
if !pathExists(path) {
mkdir(path)
}
currentFile = &fileHolder{}
currentFile.path = path
go func() {
fileCacheOnce.Do(func() {
mark() //将因为服务停止导致未及时更改状态的文件修改为缓存状态
Delete() //删除过期文件
pickup() //统计上次未提交的缓存文件的数量
})
}()
}
func create() {
current := time.Now().Unix()
currentFile.fileName = currentFile.path + strconv.FormatInt(current, 10)
file, err := os.Create(currentFile.fileName + WRITING)
if err != nil {
logger.Error.Println("创建缓存文件失败", err)
}
logger.Info.Println("打开缓存文件", currentFile.fileName)
currentFile.file = file
currentFile.fileSize.Add(1)
logger.Info.Println("文件缓存数:", currentFile.fileSize.Get())
}
func closed() {
defer os.Rename(currentFile.fileName+WRITING, currentFile.fileName+CACHE)
logger.Info.Println("关闭缓存文件")
currentFile.file.Close()
}
func Delete() {
fileNames := scan(DELETE)
for _, name := range fileNames {
os.Remove(currentFile.path + name)
logger.Info.Println("删除缓存文件", name)
}
}
func pickup() {
logger.Info.Println("获取缓存文件数量")
files := scan(CACHE)
currentFile.fileSize.Add(len(files))
}
func mark() {
fileName := scan(WRITING)
logger.Info.Println("重新标记缓存文件")
for _, name := range fileName {
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+CACHE)
}
}
func Recover(submit func(data []string) error) {
if currentFile.fileSize.Get() <= 0 {
return
}
fileNames := scan(CACHE)
for _, name := range fileNames {
err := submit(Read(currentFile.path + name))
if err != nil {
logger.Error.Println("重新提交缓存异常:", err)
return
}
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+DELETE)
currentFile.fileSize.Add(-1)
logger.Info.Println("文件缓存数:", currentFile.fileSize.Get())
}
go Delete()
}
func Write(data string) error {
writer := bufio.NewWriter(currentFile.file)
_, err := writer.WriteString(data + "\n")
if err != nil {
return err
}
go writer.Flush()
return nil
}
func scan(suffix string) []string {
files, _ := ioutil.ReadDir(currentFile.path)
fileNames := make([]string, 0)
for _, f := range files {
if strings.HasSuffix(f.Name(), suffix) {
fileNames = append(fileNames, f.Name())
}
}
return fileNames
}
func Read(fileName string) []string {
file, err := os.Open(fileName)
if err != nil {
logger.Error.Println("未找到对应的文件:", err)
}
reader := bufio.NewReader(file)
data := make([]string, 0)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
data = append(data, string(line))
}
return data
}
func pathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
return !os.IsNotExist(err)
}
func mkdir(path string) bool {
err := os.Mkdir(path, os.ModePerm)
return err == nil
}
func RegisterJob(submit func(data []string) error) {
c := cron.New()
err := c.AddFunc("@every 1m", func() {
if !Enabled() {
logger.Info.Println("开始扫描缓存文件")
Recover(submit)
}
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
package file_cache
import (
"git.quantgroup.cn/DevOps/enoch/service/log"
"sync"
"time"
)
/**
通过开关控制是否启用文件缓存,每次默认写入超过1分钟即关闭缓存
*/
type switcher struct {
state bool
origin int64
lock sync.Mutex
}
func (s *switcher) turnOn() {
s.lock.Lock()
defer s.lock.Unlock()
if !s.state { //打开状态,则关闭
s.state = true
s.origin = time.Now().Unix()
create()
}
}
func (s *switcher) turnOff() {
s.lock.Lock()
defer s.lock.Unlock()
if s.state {
s.state = false
s.origin = 0
closed()
}
}
func (s *switcher) status() bool {
if s.state {
current := time.Now().Unix()
diff := current - s.origin
if diff >= 60 {
logger.Info.Println("缓存切换")
s.turnOff()
}
}
return s.state
}
var cacheSwitcher *switcher
func init() {
cacheSwitcher = &switcher{}
}
func Enabled() bool {
return cacheSwitcher.status()
}
func OpenCache() {
cacheSwitcher.turnOn()
}
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log" "git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/client/v2"
...@@ -83,6 +84,22 @@ func msgInfluxProcess(traceMsgs []TraceMsg) { ...@@ -83,6 +84,22 @@ func msgInfluxProcess(traceMsgs []TraceMsg) {
} }
func batchWrite(pointArray []*client.Point) { func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient() c := data.NewClient()
defer func() { _ = c.Close() }() defer func() { _ = c.Close() }()
...@@ -91,14 +108,56 @@ func batchWrite(pointArray []*client.Point) { ...@@ -91,14 +108,56 @@ func batchWrite(pointArray []*client.Point) {
//Precision : "ms", //Precision : "ms",
}) })
if err != nil { if err != nil {
log.Fatal(err) return err
} }
points.AddPoints(pointArray) points.AddPoints(pointArray)
err = c.Write(points) err = c.Write(points)
logger.Info.Println("写入数据", len(pointArray))
if err != nil { if err != nil {
logger.Error.Fatal(err) return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
} }
} }
......
package main
import (
"fmt"
)
func main() {
}
func demo() {
go func() {
fmt.Print("ddddddd")
}()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment