Commit 514b60fd authored by jingbo.wang's avatar jingbo.wang

平滑退出

parent 11f73d5e
...@@ -58,12 +58,15 @@ func main() { ...@@ -58,12 +58,15 @@ func main() {
case <-sigterm: case <-sigterm:
if braveConsumer != nil { if braveConsumer != nil {
braveConsumer.Close() braveConsumer.Close()
glog.Info("braceConsumer平滑退出")
} }
if healthConsumer != nil { if healthConsumer != nil {
healthConsumer.Close() healthConsumer.Close()
glog.Info("healthConsumer平滑退出")
} }
if db != nil { if db != nil {
db.Close() db.Close()
glog.Info("dao平滑退出")
} }
glog.Info(global.AppName + "平滑退出") glog.Info(global.AppName + "平滑退出")
} }
......
package dao package dao
import ( import (
"context"
"encoding/json" "encoding/json"
"git.quantgroup.cn/DevOps/enoch/pkg/global" "git.quantgroup.cn/DevOps/enoch/pkg/global"
"git.quantgroup.cn/DevOps/enoch/pkg/glog" "git.quantgroup.cn/DevOps/enoch/pkg/glog"
...@@ -9,6 +10,7 @@ import ( ...@@ -9,6 +10,7 @@ import (
"os" "os"
"path" "path"
"strings" "strings"
"sync"
"time" "time"
) )
...@@ -25,9 +27,14 @@ type Dao struct { ...@@ -25,9 +27,14 @@ type Dao struct {
flashTime time.Duration flashTime time.Duration
dbAddress string dbAddress string
cacheFileDir string cacheFileDir string
isClose bool //平滑退出标记
ctx context.Context
ctxCancel context.CancelFunc
wg *sync.WaitGroup
} }
func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir string) *Dao { func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir string) *Dao {
ctx, cancel := context.WithCancel(context.Background())
rtn := &Dao{ rtn := &Dao{
batchSize: batchSize, batchSize: batchSize,
size: 0, size: 0,
...@@ -35,6 +42,10 @@ func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir ...@@ -35,6 +42,10 @@ func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir
flashTime: flashTime, flashTime: flashTime,
dbAddress: dbAddress, dbAddress: dbAddress,
cacheFileDir: cacheFileDir, cacheFileDir: cacheFileDir,
isClose: false,
ctx: ctx,
ctxCancel: cancel,
wg: new(sync.WaitGroup),
} }
if stat, err := os.Stat(cacheFileDir); err != nil || !stat.IsDir() { if stat, err := os.Stat(cacheFileDir); err != nil || !stat.IsDir() {
...@@ -51,58 +62,69 @@ func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir ...@@ -51,58 +62,69 @@ func New(batchSize int, flashTime time.Duration, dbAddress string, cacheFileDir
} }
//平滑退出Dao,会flash缓存 //平滑退出Dao,会flash缓存
//TODO 未实现
func (d *Dao) Close() { func (d *Dao) Close() {
d.isClose = true
d.ctxCancel()
d.wg.Wait()
} }
func (d *Dao) flashFileCache() { func (d *Dao) flashFileCache() {
d.wg.Add(1)
timer := time.NewTimer(0)
for { for {
fileList, err := ioutil.ReadDir(d.cacheFileDir) select {
if err != nil || len(fileList) == 0 { case <-timer.C:
continue fileList, err := ioutil.ReadDir(d.cacheFileDir)
} if err != nil || len(fileList) == 0 {
for _, file := range fileList {
sl := strings.Split(file.Name(), ":")
if len(sl) == 0 || sl[0] != filePrefixCache {
continue
}
//读取文件
filePath := path.Join(d.cacheFileDir, file.Name())
data, err := ioutil.ReadFile(filePath)
if err != nil {
glog.Error("can not read file:", filePath, err)
continue
}
cachePointList := make([]CachePoint, 0)
if err := json.Unmarshal(data, &cachePointList); err != nil {
glog.Error("can not unmarshal file:", filePath)
continue continue
} }
for _, file := range fileList {
sl := strings.Split(file.Name(), ":")
if len(sl) == 0 || sl[0] != filePrefixCache {
continue
}
pointList := make([]*client.Point, 0) //读取文件
for _, cachePoint := range cachePointList { filePath := path.Join(d.cacheFileDir, file.Name())
point, err := client.NewPoint(cachePoint.Name, cachePoint.Tags, cachePoint.Fields, cachePoint.Time) data, err := ioutil.ReadFile(filePath)
if err != nil { if err != nil {
glog.Error("can not read file:", filePath, err)
continue continue
} }
pointList = append(pointList, point)
}
if err := d.writeDb(pointList); err != nil { cachePointList := make([]CachePoint, 0)
glog.Warn("flash file cache: can not write db") if err := json.Unmarshal(data, &cachePointList); err != nil {
continue glog.Error("can not unmarshal file:", filePath)
} else { continue
glog.Info("文件缓存写入db成功:", filePath) }
}
pointList := make([]*client.Point, 0)
for _, cachePoint := range cachePointList {
point, err := client.NewPoint(cachePoint.Name, cachePoint.Tags, cachePoint.Fields, cachePoint.Time)
if err != nil {
continue
}
pointList = append(pointList, point)
}
if err := d.writeDb(pointList); err != nil {
glog.Warn("flash file cache: can not write db")
continue
} else {
glog.Info("文件缓存写入db成功:", filePath)
}
if err := os.Remove(filePath); err != nil { if err := os.Remove(filePath); err != nil {
glog.Error("删除文件失败:", filePath) glog.Error("删除文件失败:", filePath)
}
} }
timer.Reset(d.flashTime)
case <-d.ctx.Done():
glog.Info("flash file cache 平滑退出")
timer.Stop()
d.wg.Done()
return
} }
time.Sleep(d.flashTime)
} }
} }
...@@ -111,11 +133,16 @@ func (d *Dao) MsgProcess(point *client.Point) { ...@@ -111,11 +133,16 @@ func (d *Dao) MsgProcess(point *client.Point) {
return return
} }
if d.isClose {
return
}
d.channel <- point d.channel <- point
} }
//list满或者超时,则数据入库 //list满或者超时,则数据入库
func (d *Dao) run() { func (d *Dao) run() {
d.wg.Add(1)
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
glog.Error(err) glog.Error(err)
...@@ -138,6 +165,12 @@ func (d *Dao) run() { ...@@ -138,6 +165,12 @@ func (d *Dao) run() {
go d.batchWrite(pointList) go d.batchWrite(pointList)
pointList = make([]*client.Point, 0, d.batchSize) pointList = make([]*client.Point, 0, d.batchSize)
timer.Reset(d.flashTime) timer.Reset(d.flashTime)
case <-d.ctx.Done():
go d.batchWrite(pointList)
glog.Info("存入influx平滑退出")
timer.Stop()
d.wg.Done()
return
} }
} }
} }
...@@ -196,6 +229,9 @@ func (d *Dao) writeFile(pointList []*client.Point) error { ...@@ -196,6 +229,9 @@ func (d *Dao) writeFile(pointList []*client.Point) error {
} }
func (d *Dao) batchWrite(pointList []*client.Point) { func (d *Dao) batchWrite(pointList []*client.Point) {
d.wg.Add(1)
defer d.wg.Done()
if len(pointList) == 0 { if len(pointList) == 0 {
return return
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment