Commit 6531528f authored by jingbo.wang's avatar jingbo.wang

新增node-check功能

parent d63a793f
Pipeline #45 failed with stages
module git.quantgroup.cn/DevOps/enoch module git.quantgroup.cn/DevOps/enoch
require ( require (
github.com/DataDog/zstd v1.3.4 // indirect github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
github.com/Shopify/sarama v1.20.0 github.com/Shopify/sarama v1.21.0
github.com/Shopify/toxiproxy v2.1.3+incompatible // indirect github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/eapache/go-resiliency v1.1.0 // indirect github.com/coredns/coredns v1.6.3 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/eapache/queue v1.1.0 // indirect github.com/envoyproxy/go-control-plane v0.8.6 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gogo/googleapis v1.3.0 // indirect
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/consul v1.4.0
github.com/hashicorp/go-discover v0.0.0-20190905142513-34a650575f6c // indirect
github.com/hashicorp/hil v0.0.0-20190212132231-97b3a9cdfa93 // indirect
github.com/hashicorp/logutils v1.0.0 // indirect
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 // indirect
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 // indirect
github.com/influxdata/influxdb v1.7.2 github.com/influxdata/influxdb v1.7.2
github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect
github.com/json-iterator/go v1.1.6 github.com/json-iterator/go v1.1.7
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mitchellh/cli v1.0.0 // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967 github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
) )
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"git.quantgroup.cn/DevOps/enoch/service/data" "git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache" "git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/job" "git.quantgroup.cn/DevOps/enoch/service/job"
"git.quantgroup.cn/DevOps/enoch/service/node-check"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
...@@ -52,10 +53,18 @@ func main() { ...@@ -52,10 +53,18 @@ func main() {
job.AutoEmailPerformInfo() job.AutoEmailPerformInfo()
} }
job.AutoAlarm() //开启服务状态监控,当服务状态异常时,调用web-hook函数
continuous_queries.Load() if denv == "dev" { //开发环境
go node_check.NodeCheck()
job.AutoAlarm()
} else if job.CheckIp("172.30.12.22") { //生产环境,只有172.30.12.22执行
go node_check.NodeCheck()
job.AutoAlarm()
continuous_queries.Load() //连续查询设置只在生产环境上执行
}
go func() { go func() {
http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil) _ = http.ListenAndServe("0.0.0.0:"+strconv.Itoa(intPort+1), nil)
}() }()
http.HandleFunc("/duration", service.DurationInterface) http.HandleFunc("/duration", service.DurationInterface)
......
...@@ -67,9 +67,6 @@ func buildTolSql(cqName string, sysName string, threshold int) string { ...@@ -67,9 +67,6 @@ func buildTolSql(cqName string, sysName string, threshold int) string {
} }
func Load() { func Load() {
if !checkIp("172.30.12.22"){
return
}
name := query(data.MONITOR) name := query(data.MONITOR)
logger.Info.Println("old: ", name) logger.Info.Println("old: ", name)
delete(data.MONITOR, name) delete(data.MONITOR, name)
......
package dingding
import (
"encoding/json"
"net/http"
"strings"
)
var DefaultDingURL = []string{"https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"}
type DinDingMsg struct {
MsgType string `json:"msgtype"`
// Link Link `json:"link"`
Text Text `json:"text"`
}
type Text struct {
Content string `json:"content"`
}
func SenderDingDing(info string, receiver [] string) error {
bodyStr := buildDingDingMsg(info)
for _, r := range receiver {
data := strings.NewReader(string(bodyStr))
_, e := http.Post(r, "application/json;charset=utf-8", data)
if e != nil {
return e
}
}
return nil
}
func buildDingDingMsg(info string) []byte {
msg := DinDingMsg{
MsgType: "text",
Text:Text{
Content:info,
},
}
msgStr, _ := json.Marshal(msg)
return msgStr
}
package dingding
import "testing"
var testDingUrl = []string{
"https://oapi.dingtalk.com/robot/send?access_token=05b2c46e460e94b32044251c0f8af666c55a4600758fda508120f962d589ea47",
}
func TestSenderDingDing(t *testing.T) {
e := SenderDingDing( "叮叮测试,吧啦吧啦吧", testDingUrl)
if e != nil {
t.Error(e)
}
}
...@@ -14,10 +14,6 @@ import ( ...@@ -14,10 +14,6 @@ import (
报警定时任务,每分钟执行一次 报警定时任务,每分钟执行一次
*/ */
func AutoAlarm() { func AutoAlarm() {
if !checkIp("172.30.12.22"){
return
}
c := cron.New() c := cron.New()
err := c.AddFunc("@every 1m", func() { err := c.AddFunc("@every 1m", func() {
...@@ -31,7 +27,7 @@ func AutoAlarm() { ...@@ -31,7 +27,7 @@ func AutoAlarm() {
c.Start() c.Start()
} }
func checkIp(ip string) bool { func CheckIp(ip string) bool {
addrs, err := net.InterfaceAddrs() addrs, err := net.InterfaceAddrs()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
......
package node_check
import (
"context"
"git.quantgroup.cn/DevOps/enoch/service/dingding"
"git.quantgroup.cn/DevOps/enoch/service/registry"
"log"
"net/http"
"os"
"sync"
"time"
)
const (
Passing = "passing"
Critical = "critical"
)
var HandlerMap = new(sync.Map)
var HttpGetRetryCount = 3
var HttpTimeOut = time.Second * 10
func init() {
// HandlerMap.Store("aaaa", "http://www.baidasdfasdfasdfasdfu.com/")
// HandlerMap.Store("aaaa", "http://172.20.6.33:8989/service-down")
//TODO 灵活配置
HandlerMap.Store("heimdallr", "http://172.20.6.33:8989/service-down")
}
func httpGet(url string, timeout time.Duration) (*http.Response, error) {
ctx, cancel := context.WithCancel(context.TODO())
req, e := http.NewRequest("GET", url, nil)
if e != nil {
return nil, e
} else {
req = req.WithContext(ctx)
}
_ = time.AfterFunc(timeout, func() {
cancel()
})
return http.DefaultClient.Do(req)
}
func handler(serviceName string) {
if url, ok := HandlerMap.Load(serviceName); ok {
for i := 0; i < HttpGetRetryCount; i++ {
if resp, e := httpGet(url.(string), HttpTimeOut); e != nil {
log.Print("ERROR", " handler service: ", e)
} else {
log.Print("INFO", " handler service: ", resp.StatusCode)
break
}
}
}
}
type watch struct{}
func (w watch) DeleteService(serviceName string) {
//pass
}
func (w watch) UpdateNodes(service *registry.Service) {
//获取服务状态
serviceOK := false
nodeStr := ""
for _, node := range service.NodeMap {
nodeStr += node.Id + ":" + node.Status + " "
if node.Status == "passing" {
serviceOK = true
}
}
//判断是否告警
if v, ok := servicesStatus.Load(service.Name); ok && v.(string) == Passing && !serviceOK {
log.Print("PANIC ", service.Name, " ", nodeStr, "---!!!service critical!!!---")
_ = dingding.SenderDingDing(service.Name+" "+nodeStr+" "+"---!!!service critical!!!---", dingding.DefaultDingURL)
handler(service.Name)
} else {
log.Print("INFO ", service.Name, " ", nodeStr)
}
//更新服务状态
if serviceOK {
servicesStatus.Store(service.Name, Passing)
} else {
servicesStatus.Store(service.Name, Critical)
}
}
func (w watch) AddNode(node *registry.Node) {
//pass
}
func (w watch) DelNode(node *registry.Node) {
//pass
}
var (
servicesStatus = new(sync.Map)
)
func InitServiceStatus() {
services := registry.GetServiceMap()
for _, service := range services {
serviceOK := false
for _, node := range service.NodeMap {
if node.Status == Passing {
serviceOK = true
break
}
}
if serviceOK {
servicesStatus.Store(service.Name, Passing)
} else {
servicesStatus.Store(service.Name, Critical)
}
}
}
func NodeCheck() {
defer func() {
if e := recover(); e != nil {
log.Print("node check panic: ", e)
_ = dingding.SenderDingDing("node check panic!", dingding.DefaultDingURL)
time.Sleep(time.Second*1)
NodeCheck()
}
}()
//注册器初始化
dc := "3c"
cluster := []string{"172.30.12.2:8500", "172.30.12.3:8500", "172.30.12.4:8500"}
if e := registry.Init("consul", map[string]interface{}{"dc": dc, "cluster": cluster}); e != nil {
log.Print("registry init error:", e)
os.Exit(-1)
}
time.Sleep(time.Second * 1)
//服务状态初始化
InitServiceStatus()
//设置观察者
if e := registry.SetObserver("watch", &watch{}); e != nil {
log.Print("set observer error:", e)
os.Exit(-1)
}
select {}
}
package node_check
import "testing"
func TestNodeCheck(t *testing.T) {
go NodeCheck()
select{}
}
package registry
import (
"bytes"
"encoding/gob"
"errors"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"reflect"
"strconv"
"sync"
"time"
)
//consul注册器
type consulRegistry struct {
masterClient *api.Client //主客户端
watchClients []*api.Client //监视器客户端
servicesWatchParse *watch.Plan //服务列表监视器
serviceWatchParseMap map[string]*watch.Plan //服务监视器映射
observerMap *sync.Map //观察者映射
serviceMap map[string]*Service //服务映射
serviceMapLock *sync.RWMutex //服务映射锁
}
//新建consul注册器
func newConsulRegistry() Registry {
return &consulRegistry{
masterClient: nil,
watchClients: make([]*api.Client, 0),
servicesWatchParse: nil,
serviceWatchParseMap: make(map[string]*watch.Plan),
observerMap: new(sync.Map),
serviceMap: make(map[string]*Service),
serviceMapLock: new(sync.RWMutex),
}
}
//内部初始化
func init() {
if _, ok := newRegistryMap["consul"]; !ok {
newRegistryMap["consul"] = newConsulRegistry
}
}
//初始化
//参数名 类型 格式 默认值
//cluster []string ip:port ["127.0.0.1:8500"]
//dc string string "dc1"
func (cr *consulRegistry) Init(options map[string]interface{}) error {
//参数过滤
cluster := []string{"127.0.0.1:8500"}
dc := "dc1"
if v, ok := options["cluster"]; ok {
if _, ok := v.([]string); ok {
cluster = v.([]string)
} else {
return errors.New("consul cluster []string")
}
}
if v, ok := options["dc"]; ok {
if _, ok := v.(string); ok {
dc = v.(string)
} else {
return errors.New("consul dc string")
}
}
//初始化参数
cr.masterClient = newConsulClient(cluster[0], dc, time.Millisecond*100)
for _, address := range cluster {
cr.watchClients = append(cr.watchClients, newConsulClient(address, dc, time.Second*300))
}
//服务初始化
parse, _ := watch.Parse(map[string]interface{}{"type": "services"})
parse.Handler = cr.handlerServices
cr.servicesWatchParse = parse
go cr.watch(cr.servicesWatchParse)
return nil
}
//新建consul客户端
func newConsulClient(address string, dc string, waitTime time.Duration) *api.Client {
config := api.DefaultConfig()
config.Address = address
config.Datacenter = dc
config.WaitTime = waitTime
client, _ := api.NewClient(config)
return client
}
//监控
func (cr *consulRegistry) watch(parse *watch.Plan) {
sentry := 0
defer func() {
if e := recover(); e != nil {
time.Sleep(time.Second * 1)
if sentry < len(cr.watchClients)-1 {
sentry++
} else {
sentry = 0
}
}
}()
_ = parse.RunWithClientAndLogger(cr.watchClients[sentry], nil)
}
//监控服务列表的变化
func (cr *consulRegistry) handlerServices(_ uint64, data interface{}) {
services := data.(map[string][]string)
//锁
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
for service, tags := range services {
//如果服务不存在,则创建服务、监听服务
if _, ok := cr.serviceWatchParseMap[service]; !ok {
parse, _ := watch.Parse(map[string]interface{}{"type": "service", "service": service})
parse.Handler = cr.handlerService
cr.serviceWatchParseMap[service] = parse
cr.serviceMap[service] = NewService(service)
go cr.watch(parse)
}
//更新标签
tagMap := make(map[string]struct{})
for _, tag := range tags {
tagMap[tag] = struct{}{}
}
if !reflect.DeepEqual(tagMap, cr.serviceMap[service].TagMap) {
cr.serviceMap[service].TagMap = tagMap
}
}
//处理被删除的服务
for service, parse := range cr.serviceWatchParseMap {
if _, ok := services[service]; !ok {
//删除服务
parse.Stop()
delete(cr.serviceWatchParseMap, service)
delete(cr.serviceMap, service)
//执行观察者
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DeleteService(service)
return true
})
}
}
}
//观察服务的变化
func (cr *consulRegistry) handlerService(_ uint64, data interface{}) {
services := data.([]*api.ServiceEntry)
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
serviceName := services[0].Service.Service
nodeMap := make(map[string]*Node)
for _, service := range services {
if service.Service.ID == "consul" {
continue
}
//更新节点
node := NewNode(service.Service.Service, service.Service.ID, service.Service.Address, service.Service.Port)
node.Meta = service.Service.Meta
for _, health := range service.Checks {
if node.Id == health.ServiceID {
node.Status = health.Status
}
}
nodeMap[service.Service.ID] = node
}
cr.serviceMap[serviceName].NodeMap = nodeMap
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.UpdateNodes(cr.serviceMap[serviceName])
return true
})
}
//服务注册
func (cr *consulRegistry) Register(node *Node) error {
check := &api.AgentServiceCheck{
HTTP: "http://" + node.Address + ":" + strconv.Itoa(node.Port) + "/tech/health/check",
Interval: "5s",
Timeout: "10s",
// DeregisterCriticalServiceAfter: "24h",
DeregisterCriticalServiceAfter: "1m",
CheckID: node.Id,
}
registration := api.AgentServiceRegistration{
ID: node.Id,
Name: node.ServiceName,
Port: node.Port,
Address: node.Address,
Check: check,
Meta: node.Meta,
}
//可以重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceRegister(&registration); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.AddNode(node)
return true
})
return nil
}
}
return errors.New("service register fail " + node.Id)
}
//服务注销
func (cr *consulRegistry) Deregister(node *Node) error {
//可重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceDeregister(node.Id); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DelNode(node)
return true
})
return nil
}
}
return errors.New("service deregister failed " + node.ServiceName + ":" + node.Id)
}
//获取服务映射
func (cr *consulRegistry) GetServiceMap() map[string]*Service {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
rtn := make(map[string]*Service)
buffer := bytes.NewBuffer(nil)
_ = gob.NewEncoder(buffer).Encode(&cr.serviceMap)
_ = gob.NewDecoder(bytes.NewBuffer(buffer.Bytes())).Decode(&rtn)
return rtn
}
//获取服务
func (cr *consulRegistry) GetService(serviceName string) (*Service, error) {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
if v, ok := cr.serviceMap[serviceName]; ok {
rtn := NewService(serviceName)
rtn.TagMap = v.TagMap
for kk, vv := range v.NodeMap {
rtn.NodeMap[kk] = vv
}
return rtn, nil
} else {
return nil, errors.New("service not found")
}
}
//设置观察者
func (cr *consulRegistry) SetObserver(name string, observer Observer) error {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
if _, ok := cr.observerMap.Load(name); ok {
return errors.New("observer is exists")
} else {
for _, service := range cr.serviceMap {
observer.UpdateNodes(service)
}
cr.observerMap.Store(name, observer)
}
return nil
}
//删除观察
func (cr *consulRegistry) DelObserver(name string) error {
if _, ok := cr.observerMap.Load(name); ok {
cr.observerMap.Delete(name)
} else {
return errors.New("observer is not exists")
}
return nil
}
package registry
import (
"fmt"
"strconv"
"testing"
"time"
)
var r Registry = nil
func TestRefreshService(t *testing.T) {
TestNew(t)
if e := r.Init(map[string]interface{}{"dc": "3c", "cluster": []string{"172.30.12.4:8500"}}); e != nil {
t.Error(e)
}
time.Sleep(time.Second * 1)
smap := r.GetServiceMap()
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
fmt.Println(node)
}
}
time.Sleep(time.Second * 1)
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
_ = r.Deregister(node.Id)
// _ = r.Register(node)
}
}
time.Sleep(time.Second * 1)
r2 := New("consul")
if e := r2.Init(map[string]interface{}{"dc": "3c", "cluster": []string{"172.30.12.2:8500"}}); e != nil {
t.Error(e)
}
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
// _ = r.Deregister(node.Id)
_ = r2.Register(node)
}
}
}
func TestNew(t *testing.T) {
if r == nil {
r = New("consul")
}
if r == nil {
t.Error("consul new error")
}
}
func TestConsulRegistry_Init(t *testing.T) {
TestNew(t)
if e := r.Init(map[string]interface{}{"cluster": []string{"192.168.29.69:8500", "192.168.29.70:8500", "192.168.29.71:8500"}}); e != nil {
t.Error(e)
}
select {}
}
func TestConsulRegistry_Register(t *testing.T) {
TestNew(t)
go TestConsulRegistry_Init(t)
for i := 0; i < 10; i++ {
time.Sleep(time.Second * 1)
node := Node{
ServiceName: "name:" + strconv.Itoa(i/2),
Id: strconv.Itoa(i),
Address: "192.168.1.1",
Port: 123,
Meta: nil,
Status: "",
}
println("re:", i)
if e := r.Register(&node); e != nil {
t.Error(e)
}
}
}
func TestConsulRegistry_Deregister(t *testing.T) {
TestConsulRegistry_Register(t)
time.Sleep(time.Second * 5)
for i := 0; i < 10; i++ {
time.Sleep(time.Second * 1)
println("de:", i)
if e := r.Deregister(strconv.Itoa(i)); e != nil {
t.Error(e)
}
}
}
func TestConsulRegistry_GetServiceMap(t *testing.T) {
go TestConsulRegistry_Deregister(t)
for i := 0; i < 60; i++ {
time.Sleep(time.Second * 1)
s := r.GetServiceMap()
for _, v := range s {
fmt.Println("~~~~~~~~~~~~~~:", v.Name, v.TagMap, ":~~~~~~~~~~~~")
for _, n := range v.NodeMap {
fmt.Println(n)
}
}
}
}
func TestConsulRegistry_GetService(t *testing.T) {
TestConsulRegistry_Register(t)
if s, e := r.GetService("name:0"); e != nil {
t.Error(e)
} else {
fmt.Println(s.Name)
}
if _, e := r.GetService("xxx"); e == nil {
t.Error("GetService err")
}
}
type TObserver struct {
}
func (to *TObserver) DeleteService(serviceName string) {
fmt.Println(serviceName)
}
func (to *TObserver) UpdateNodes(_ *Service, node *Node) {
fmt.Println(node)
}
func TestConsulRegistry_SetObserver(t *testing.T) {
go TestConsulRegistry_Deregister(t)
time.Sleep(time.Second * 5)
_ = r.SetObserver("test", &TObserver{})
time.Sleep(time.Second * 5)
_ = r.DelObserver("test")
select {}
}
package registry
import "errors"
//节点
type Node struct {
ServiceName string //服务名称
Id string //节点编号
Address string //地址
Port int //端口
Meta map[string]string //元数据
Status string //状态
}
//服务
type Service struct {
Name string //服务名称
NodeMap map[string]*Node //节点映射
TagMap map[string]struct{} //标签
}
//观察者
type Observer interface {
//删除服务事件
DeleteService(serviceName string)
//更新节点事件
UpdateNodes(service *Service)
//添加节点
AddNode(node *Node)
//删除节点
DelNode(node *Node)
}
//注册器
type Registry interface {
//初始化
Init(options map[string]interface{}) error
//服务注册
Register(node *Node) error
//服务注销
Deregister(node *Node) error
//获取服务映射
GetServiceMap() map[string]*Service
//获取服务
GetService(serviceName string) (*Service, error)
//设置观察者
SetObserver(name string, observer Observer) error
//删除观察者
DelObserver(name string) error
}
var newRegistryMap = make(map[string]func() Registry, 0)
//新建注册器
func New(name string) Registry {
if newRegistry, ok := newRegistryMap[name]; ok {
return newRegistry()
} else {
return nil
}
}
//新建节点
func NewNode(serviceName string, id string, address string, port int) *Node {
return &Node{
ServiceName: serviceName,
Id: id,
Address: address,
Port: port,
Status: "unknown",
Meta: make(map[string]string, 0),
}
}
//新建服务
func NewService(name string) *Service {
return &Service{
Name: name,
NodeMap: make(map[string]*Node, 0),
TagMap: make(map[string]struct{}, 0),
}
}
//默认注册器
var defaultRegistry Registry = nil
//初始化
func Init(name string, options map[string]interface{}) error {
if defaultRegistry != nil {
return errors.New("default registry is exists")
}
defaultRegistry = New(name)
if defaultRegistry == nil {
return errors.New("new registry error")
}
return defaultRegistry.Init(options)
}
//服务注册
func Register(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Register(node)
}
//服务注销
func Deregister(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Deregister(node)
}
//获取服务映射
func GetServiceMap() map[string]*Service {
if defaultRegistry == nil {
return make(map[string]*Service, 0)
}
return defaultRegistry.GetServiceMap()
}
//获取服务
func GetService(serviceName string) (*Service, error) {
if defaultRegistry == nil {
return nil, errors.New("default registry is not exists")
}
return defaultRegistry.GetService(serviceName)
}
//设置观察者
func SetObserver(name string, observer Observer) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.SetObserver(name, observer)
}
//删除观察者
func DelObserver(name string) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.DelObserver(name)
}
sudo: false
language: go
go:
- 1.10.x
- 1.9.x
install:
- go get -u github.com/golang/dep/cmd/dep
- dep ensure
env:
- SCALA_VERSION=2.12 KAFKA_VERSION=0.11.0.1
- SCALA_VERSION=2.12 KAFKA_VERSION=1.0.1
- SCALA_VERSION=2.12 KAFKA_VERSION=1.1.0
script:
- make default test-race
addons:
apt:
packages:
- oracle-java8-set-default
# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
[[projects]]
name = "github.com/Shopify/sarama"
packages = ["."]
revision = "35324cf48e33d8260e1c7c18854465a904ade249"
version = "v1.17.0"
[[projects]]
name = "github.com/davecgh/go-spew"
packages = ["spew"]
revision = "346938d642f2ec3594ed81d874461961cd0faa76"
version = "v1.1.0"
[[projects]]
name = "github.com/eapache/go-resiliency"
packages = ["breaker"]
revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/eapache/go-xerial-snappy"
packages = ["."]
revision = "bb955e01b9346ac19dc29eb16586c90ded99a98c"
[[projects]]
name = "github.com/eapache/queue"
packages = ["."]
revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98"
version = "v1.1.0"
[[projects]]
branch = "master"
name = "github.com/golang/snappy"
packages = ["."]
revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a"
[[projects]]
name = "github.com/onsi/ginkgo"
packages = [
".",
"config",
"extensions/table",
"internal/codelocation",
"internal/containernode",
"internal/failer",
"internal/leafnodes",
"internal/remote",
"internal/spec",
"internal/spec_iterator",
"internal/specrunner",
"internal/suite",
"internal/testingtproxy",
"internal/writer",
"reporters",
"reporters/stenographer",
"reporters/stenographer/support/go-colorable",
"reporters/stenographer/support/go-isatty",
"types"
]
revision = "fa5fabab2a1bfbd924faf4c067d07ae414e2aedf"
version = "v1.5.0"
[[projects]]
name = "github.com/onsi/gomega"
packages = [
".",
"format",
"internal/assertion",
"internal/asyncassertion",
"internal/oraclematcher",
"internal/testingtsupport",
"matchers",
"matchers/support/goraph/bipartitegraph",
"matchers/support/goraph/edge",
"matchers/support/goraph/node",
"matchers/support/goraph/util",
"types"
]
revision = "62bff4df71bdbc266561a0caee19f0594b17c240"
version = "v1.4.0"
[[projects]]
name = "github.com/pierrec/lz4"
packages = [
".",
"internal/xxh32"
]
revision = "6b9367c9ff401dbc54fabce3fb8d972e799b702d"
version = "v2.0.2"
[[projects]]
branch = "master"
name = "github.com/rcrowley/go-metrics"
packages = ["."]
revision = "e2704e165165ec55d062f5919b4b29494e9fa790"
[[projects]]
branch = "master"
name = "golang.org/x/net"
packages = [
"html",
"html/atom",
"html/charset"
]
revision = "afe8f62b1d6bbd81f31868121a50b06d8188e1f9"
[[projects]]
branch = "master"
name = "golang.org/x/sys"
packages = ["unix"]
revision = "63fc586f45fe72d95d5240a5d5eb95e6503907d3"
[[projects]]
name = "golang.org/x/text"
packages = [
"encoding",
"encoding/charmap",
"encoding/htmlindex",
"encoding/internal",
"encoding/internal/identifier",
"encoding/japanese",
"encoding/korean",
"encoding/simplifiedchinese",
"encoding/traditionalchinese",
"encoding/unicode",
"internal/gen",
"internal/tag",
"internal/utf8internal",
"language",
"runes",
"transform",
"unicode/cldr"
]
revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0"
version = "v0.3.0"
[[projects]]
name = "gopkg.in/yaml.v2"
packages = ["."]
revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183"
version = "v2.2.1"
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "2fa33a2d1ae87e0905ef09332bb4b3fda29179f6bcd48fd3b94070774b9e458b"
solver-name = "gps-cdcl"
solver-version = 1
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
[[constraint]]
name = "github.com/Shopify/sarama"
version = "^1.14.0"
(The MIT License)
Copyright (c) 2017 Black Square Media Ltd
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
SCALA_VERSION?= 2.12
KAFKA_VERSION?= 1.1.0
KAFKA_DIR= kafka_$(SCALA_VERSION)-$(KAFKA_VERSION)
KAFKA_SRC= https://archive.apache.org/dist/kafka/$(KAFKA_VERSION)/$(KAFKA_DIR).tgz
KAFKA_ROOT= testdata/$(KAFKA_DIR)
PKG=$(shell go list ./... | grep -v vendor)
default: vet test
vet:
go vet $(PKG)
test: testdeps
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60
test-verbose: testdeps
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v
test-race: testdeps
KAFKA_DIR=$(KAFKA_DIR) go test $(PKG) -ginkgo.slowSpecThreshold=60 -v -race
testdeps: $(KAFKA_ROOT)
doc: README.md
.PHONY: test testdeps vet doc
# ---------------------------------------------------------------------
$(KAFKA_ROOT):
@mkdir -p $(dir $@)
cd $(dir $@) && curl -sSL $(KAFKA_SRC) | tar xz
README.md: README.md.tpl $(wildcard *.go)
becca -package $(subst $(GOPATH)/src/,,$(PWD))
# Sarama Cluster
[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster)
[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster)
[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
## Documentation
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
## Examples
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
topics and partitions are all passed to the single channel:
```go
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
// consume messages, watch signals
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case <-signals:
return
}
}
}
```
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
consumers:
```go
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {
// init (custom) config, set mode to ConsumerModePartitions
config := cluster.NewConfig()
config.Group.Mode = cluster.ConsumerModePartitions
// init consumer
brokers := []string{"127.0.0.1:9092"}
topics := []string{"my_topic", "other_topic"}
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume partitions
for {
select {
case part, ok := <-consumer.Partitions():
if !ok {
return
}
// start a separate goroutine to consume messages
go func(pc cluster.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}(part)
case <-signals:
return
}
}
}
```
## Running tests
You need to install Ginkgo & Gomega to run tests. Please see
http://onsi.github.io/ginkgo for more details.
To run tests, call:
$ make test
## Troubleshooting
### Consumer not receiving any messages?
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
# Sarama Cluster
[![GoDoc](https://godoc.org/github.com/bsm/sarama-cluster?status.svg)](https://godoc.org/github.com/bsm/sarama-cluster)
[![Build Status](https://travis-ci.org/bsm/sarama-cluster.svg?branch=master)](https://travis-ci.org/bsm/sarama-cluster)
[![Go Report Card](https://goreportcard.com/badge/github.com/bsm/sarama-cluster)](https://goreportcard.com/report/github.com/bsm/sarama-cluster)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
Cluster extensions for [Sarama](https://github.com/Shopify/sarama), the Go client library for Apache Kafka 0.9 (and later).
## Documentation
Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster
## Examples
Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple
topics and partitions are all passed to the single channel:
```go
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {{ "ExampleConsumer" | code }}
```
Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level
consumers:
```go
package main
import (
"fmt"
"log"
"os"
"os/signal"
cluster "github.com/bsm/sarama-cluster"
)
func main() {{ "ExampleConsumer_Partitions" | code }}
```
## Running tests
You need to install Ginkgo & Gomega to run tests. Please see
http://onsi.github.io/ginkgo for more details.
To run tests, call:
$ make test
## Troubleshooting
### Consumer not receiving any messages?
By default, sarama's `Config.Consumer.Offsets.Initial` is set to `sarama.OffsetNewest`. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.
If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set `Config.Consumer.Offsets.Initial` to `sarama.OffsetOldest`.
package cluster
import (
"math"
"sort"
"github.com/Shopify/sarama"
)
// NotificationType defines the type of notification
type NotificationType uint8
// String describes the notification type
func (t NotificationType) String() string {
switch t {
case RebalanceStart:
return "rebalance start"
case RebalanceOK:
return "rebalance OK"
case RebalanceError:
return "rebalance error"
}
return "unknown"
}
const (
UnknownNotification NotificationType = iota
RebalanceStart
RebalanceOK
RebalanceError
)
// Notification are state events emitted by the consumers on rebalance
type Notification struct {
// Type exposes the notification type
Type NotificationType
// Claimed contains topic/partitions that were claimed by this rebalance cycle
Claimed map[string][]int32
// Released contains topic/partitions that were released as part of this rebalance cycle
Released map[string][]int32
// Current are topic/partitions that are currently claimed to the consumer
Current map[string][]int32
}
func newNotification(current map[string][]int32) *Notification {
return &Notification{
Type: RebalanceStart,
Current: current,
}
}
func (n *Notification) success(current map[string][]int32) *Notification {
o := &Notification{
Type: RebalanceOK,
Claimed: make(map[string][]int32),
Released: make(map[string][]int32),
Current: current,
}
for topic, partitions := range current {
o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic]))
}
for topic, partitions := range n.Current {
o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic]))
}
return o
}
// --------------------------------------------------------------------
type topicInfo struct {
Partitions []int32
MemberIDs []string
}
func (info topicInfo) Perform(s Strategy) map[string][]int32 {
if s == StrategyRoundRobin {
return info.RoundRobin()
}
return info.Ranges()
}
func (info topicInfo) Ranges() map[string][]int32 {
sort.Strings(info.MemberIDs)
mlen := len(info.MemberIDs)
plen := len(info.Partitions)
res := make(map[string][]int32, mlen)
for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) > 0 {
res[memberID] = sub
}
}
return res
}
func (info topicInfo) RoundRobin() map[string][]int32 {
sort.Strings(info.MemberIDs)
mlen := len(info.MemberIDs)
res := make(map[string][]int32, mlen)
for i, pnum := range info.Partitions {
memberID := info.MemberIDs[i%mlen]
res[memberID] = append(res[memberID], pnum)
}
return res
}
// --------------------------------------------------------------------
type balancer struct {
client sarama.Client
topics map[string]topicInfo
}
func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := newBalancer(client)
for memberID, meta := range members {
for _, topic := range meta.Topics {
if err := balancer.Topic(topic, memberID); err != nil {
return nil, err
}
}
}
return balancer, nil
}
func newBalancer(client sarama.Client) *balancer {
return &balancer{
client: client,
topics: make(map[string]topicInfo),
}
}
func (r *balancer) Topic(name string, memberID string) error {
topic, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}
topic = topicInfo{
Partitions: nums,
MemberIDs: make([]string, 0, 1),
}
}
topic.MemberIDs = append(topic.MemberIDs, memberID)
r.topics[name] = topic
return nil
}
func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
if _, ok := res[memberID]; !ok {
res[memberID] = make(map[string][]int32, 1)
}
res[memberID][topic] = partitions
}
}
return res
}
package cluster
import (
"errors"
"sync/atomic"
"github.com/Shopify/sarama"
)
var errClientInUse = errors.New("cluster: client is already used by another consumer")
// Client is a group client
type Client struct {
sarama.Client
config Config
inUse uint32
}
// NewClient creates a new client instance
func NewClient(addrs []string, config *Config) (*Client, error) {
if config == nil {
config = NewConfig()
}
if err := config.Validate(); err != nil {
return nil, err
}
client, err := sarama.NewClient(addrs, &config.Config)
if err != nil {
return nil, err
}
return &Client{Client: client, config: *config}, nil
}
// ClusterConfig returns the cluster configuration.
func (c *Client) ClusterConfig() *Config {
cfg := c.config
return &cfg
}
func (c *Client) claim() bool {
return atomic.CompareAndSwapUint32(&c.inUse, 0, 1)
}
func (c *Client) release() {
atomic.CompareAndSwapUint32(&c.inUse, 1, 0)
}
package cluster
// Strategy for partition to consumer assignement
type Strategy string
const (
// StrategyRange is the default and assigns partition ranges to consumers.
// Example with six partitions and two consumers:
// C1: [0, 1, 2]
// C2: [3, 4, 5]
StrategyRange Strategy = "range"
// StrategyRoundRobin assigns partitions by alternating over consumers.
// Example with six partitions and two consumers:
// C1: [0, 2, 4]
// C2: [1, 3, 5]
StrategyRoundRobin Strategy = "roundrobin"
)
// Error instances are wrappers for internal errors with a context and
// may be returned through the consumer's Errors() channel
type Error struct {
Ctx string
error
}
package cluster
import (
"regexp"
"time"
"github.com/Shopify/sarama"
)
var minVersion = sarama.V0_9_0_0
type ConsumerMode uint8
const (
ConsumerModeMultiplex ConsumerMode = iota
ConsumerModePartitions
)
// Config extends sarama.Config with Group specific namespace
type Config struct {
sarama.Config
// Group is the namespace for group management properties
Group struct {
// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
PartitionStrategy Strategy
// By default, messages and errors from the subscribed topics and partitions are all multiplexed and
// made available through the consumer's Messages() and Errors() channels.
//
// Users who require low-level access can enable ConsumerModePartitions where individual partitions
// are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
// themselves.
Mode ConsumerMode
Offsets struct {
Retry struct {
// The numer retries when committing offsets (defaults to 3).
Max int
}
Synchronization struct {
// The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
// NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
DwellTime time.Duration
}
}
Session struct {
// The allowed session timeout for registered consumers (defaults to 30s).
// Must be within the allowed server range.
Timeout time.Duration
}
Heartbeat struct {
// Interval between each heartbeat (defaults to 3s). It should be no more
// than 1/3rd of the Group.Session.Timout setting
Interval time.Duration
}
// Return specifies which group channels will be populated. If they are set to true,
// you must read from the respective channels to prevent deadlock.
Return struct {
// If enabled, rebalance notification will be returned on the
// Notifications channel (default disabled).
Notifications bool
}
Topics struct {
// An additional whitelist of topics to subscribe to.
Whitelist *regexp.Regexp
// An additional blacklist of topics to avoid. If set, this will precede over
// the Whitelist setting.
Blacklist *regexp.Regexp
}
Member struct {
// Custom metadata to include when joining the group. The user data for all joined members
// can be retrieved by sending a DescribeGroupRequest to the broker that is the
// coordinator for the group.
UserData []byte
}
}
}
// NewConfig returns a new configuration instance with sane defaults.
func NewConfig() *Config {
c := &Config{
Config: *sarama.NewConfig(),
}
c.Group.PartitionStrategy = StrategyRange
c.Group.Offsets.Retry.Max = 3
c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime
c.Group.Session.Timeout = 30 * time.Second
c.Group.Heartbeat.Interval = 3 * time.Second
c.Config.Version = minVersion
return c
}
// Validate checks a Config instance. It will return a
// sarama.ConfigurationError if the specified values don't make sense.
func (c *Config) Validate() error {
if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.Session.Timeout%time.Millisecond != 0 {
sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin {
sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.")
}
if !c.Version.IsAtLeast(minVersion) {
sarama.Logger.Println("Version is not supported; 0.9. will be assumed.")
c.Version = minVersion
}
if err := c.Config.Validate(); err != nil {
return err
}
// validate the Group values
switch {
case c.Group.Offsets.Retry.Max < 0:
return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0")
case c.Group.Offsets.Synchronization.DwellTime <= 0:
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0")
case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute:
return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m")
case c.Group.Heartbeat.Interval <= 0:
return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0")
case c.Group.Session.Timeout <= 0:
return sarama.ConfigurationError("Group.Session.Timeout must be > 0")
case !c.Metadata.Full && c.Group.Topics.Whitelist != nil:
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used")
case !c.Metadata.Full && c.Group.Topics.Blacklist != nil:
return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used")
}
// ensure offset is correct
switch c.Consumer.Offsets.Initial {
case sarama.OffsetOldest, sarama.OffsetNewest:
default:
return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest")
}
return nil
}
This diff is collapsed.
/*
Package cluster provides cluster extensions for Sarama, enabing users
to consume topics across from multiple, balanced nodes.
It requires Kafka v0.9+ and follows the steps guide, described in:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
*/
package cluster
package cluster
import (
"sync"
"github.com/Shopify/sarama"
)
// OffsetStash allows to accumulate offsets and
// mark them as processed in a bulk
type OffsetStash struct {
offsets map[topicPartition]offsetInfo
mu sync.Mutex
}
// NewOffsetStash inits a blank stash
func NewOffsetStash() *OffsetStash {
return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
}
// MarkOffset stashes the provided message offset
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
// MarkPartitionOffset stashes the offset for the provided topic/partition combination
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
s.mu.Lock()
defer s.mu.Unlock()
key := topicPartition{Topic: topic, Partition: partition}
if info := s.offsets[key]; offset >= info.Offset {
info.Offset = offset
info.Metadata = metadata
s.offsets[key] = info
}
}
// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
s.mu.Lock()
defer s.mu.Unlock()
key := topicPartition{Topic: topic, Partition: partition}
if info := s.offsets[key]; offset <= info.Offset {
info.Offset = offset
info.Metadata = metadata
s.offsets[key] = info
}
}
// ResetOffset stashes the provided message offset
// See ResetPartitionOffset for explanation
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
}
// Offsets returns the latest stashed offsets by topic-partition
func (s *OffsetStash) Offsets() map[string]int64 {
s.mu.Lock()
defer s.mu.Unlock()
res := make(map[string]int64, len(s.offsets))
for tp, info := range s.offsets {
res[tp.String()] = info.Offset
}
return res
}
package cluster
import (
"sort"
"sync"
"time"
"github.com/Shopify/sarama"
)
// PartitionConsumer allows code to consume individual partitions from the cluster.
//
// See docs for Consumer.Partitions() for more on how to implement this.
type PartitionConsumer interface {
sarama.PartitionConsumer
// Topic returns the consumed topic name
Topic() string
// Partition returns the consumed partition
Partition() int32
// InitialOffset returns the offset used for creating the PartitionConsumer instance.
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
InitialOffset() int64
// MarkOffset marks the offset of a message as preocessed.
MarkOffset(offset int64, metadata string)
// ResetOffset resets the offset to a previously processed message.
ResetOffset(offset int64, metadata string)
}
type partitionConsumer struct {
sarama.PartitionConsumer
state partitionState
mu sync.Mutex
topic string
partition int32
initialOffset int64
closeOnce sync.Once
closeErr error
dying, dead chan none
}
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
offset := info.NextOffset(defaultOffset)
pcm, err := manager.ConsumePartition(topic, partition, offset)
// Resume from default offset, if requested offset is out-of-range
if err == sarama.ErrOffsetOutOfRange {
info.Offset = -1
offset = defaultOffset
pcm, err = manager.ConsumePartition(topic, partition, offset)
}
if err != nil {
return nil, err
}
return &partitionConsumer{
PartitionConsumer: pcm,
state: partitionState{Info: info},
topic: topic,
partition: partition,
initialOffset: offset,
dying: make(chan none),
dead: make(chan none),
}, nil
}
// Topic implements PartitionConsumer
func (c *partitionConsumer) Topic() string { return c.topic }
// Partition implements PartitionConsumer
func (c *partitionConsumer) Partition() int32 { return c.partition }
// InitialOffset implements PartitionConsumer
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
// AsyncClose implements PartitionConsumer
func (c *partitionConsumer) AsyncClose() {
c.closeOnce.Do(func() {
c.closeErr = c.PartitionConsumer.Close()
close(c.dying)
})
}
// Close implements PartitionConsumer
func (c *partitionConsumer) Close() error {
c.AsyncClose()
<-c.dead
return c.closeErr
}
func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
defer close(c.dead)
for {
select {
case err, ok := <-c.Errors():
if !ok {
return
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
}
}
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer close(c.dead)
for {
select {
case msg, ok := <-c.Messages():
if !ok {
return
}
select {
case messages <- msg:
case <-stopper:
return
case <-c.dying:
return
}
case err, ok := <-c.Errors():
if !ok {
return
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
}
}
func (c *partitionConsumer) getState() partitionState {
c.mu.Lock()
state := c.state
c.mu.Unlock()
return state
}
func (c *partitionConsumer) markCommitted(offset int64) {
c.mu.Lock()
if offset == c.state.Info.Offset {
c.state.Dirty = false
}
c.mu.Unlock()
}
// MarkOffset implements PartitionConsumer
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
c.mu.Lock()
if next := offset + 1; next > c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}
// ResetOffset implements PartitionConsumer
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
c.mu.Lock()
if next := offset + 1; next <= c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}
// --------------------------------------------------------------------
type partitionState struct {
Info offsetInfo
Dirty bool
LastCommit time.Time
}
// --------------------------------------------------------------------
type partitionMap struct {
data map[topicPartition]*partitionConsumer
mu sync.RWMutex
}
func newPartitionMap() *partitionMap {
return &partitionMap{
data: make(map[topicPartition]*partitionConsumer),
}
}
func (m *partitionMap) IsSubscribedTo(topic string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
for tp := range m.data {
if tp.Topic == topic {
return true
}
}
return false
}
func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
m.mu.RLock()
pc, _ := m.data[topicPartition{topic, partition}]
m.mu.RUnlock()
return pc
}
func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
m.mu.Lock()
m.data[topicPartition{topic, partition}] = pc
m.mu.Unlock()
}
func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
m.mu.RLock()
defer m.mu.RUnlock()
snap := make(map[topicPartition]partitionState, len(m.data))
for tp, pc := range m.data {
snap[tp] = pc.getState()
}
return snap
}
func (m *partitionMap) Stop() {
m.mu.RLock()
defer m.mu.RUnlock()
var wg sync.WaitGroup
for tp := range m.data {
wg.Add(1)
go func(p *partitionConsumer) {
_ = p.Close()
wg.Done()
}(m.data[tp])
}
wg.Wait()
}
func (m *partitionMap) Clear() {
m.mu.Lock()
for tp := range m.data {
delete(m.data, tp)
}
m.mu.Unlock()
}
func (m *partitionMap) Info() map[string][]int32 {
info := make(map[string][]int32)
m.mu.RLock()
for tp := range m.data {
info[tp.Topic] = append(info[tp.Topic], tp.Partition)
}
m.mu.RUnlock()
for topic := range info {
sort.Sort(int32Slice(info[topic]))
}
return info
}
package cluster
import (
"fmt"
"sort"
"sync"
)
type none struct{}
type topicPartition struct {
Topic string
Partition int32
}
func (tp *topicPartition) String() string {
return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition)
}
type offsetInfo struct {
Offset int64
Metadata string
}
func (i offsetInfo) NextOffset(fallback int64) int64 {
if i.Offset > -1 {
return i.Offset
}
return fallback
}
type int32Slice []int32
func (p int32Slice) Len() int { return len(p) }
func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p int32Slice) Diff(o int32Slice) (res []int32) {
on := len(o)
for _, x := range p {
n := sort.Search(on, func(i int) bool { return o[i] >= x })
if n < on && o[n] == x {
continue
}
res = append(res, x)
}
return
}
// --------------------------------------------------------------------
type loopTomb struct {
c chan none
o sync.Once
w sync.WaitGroup
}
func newLoopTomb() *loopTomb {
return &loopTomb{c: make(chan none)}
}
func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) }
func (t *loopTomb) Close() { t.stop(); t.w.Wait() }
func (t *loopTomb) Dying() <-chan none { return t.c }
func (t *loopTomb) Go(f func(<-chan none)) {
t.w.Add(1)
go func() {
defer t.stop()
defer t.w.Done()
f(t.c)
}()
}
This diff is collapsed.
package watch
import (
"context"
"fmt"
consulapi "github.com/hashicorp/consul/api"
)
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
"key": keyWatch,
"keyprefix": keyPrefixWatch,
"services": servicesWatch,
"nodes": nodesWatch,
"service": serviceWatch,
"checks": checksWatch,
"event": eventWatch,
"connect_roots": connectRootsWatch,
"connect_leaf": connectLeafWatch,
"agent_service": agentServiceWatch,
}
}
// keyWatch is used to return a key watching function
func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
var key string
if err := assignValue(params, "key", &key); err != nil {
return nil, err
}
if key == "" {
return nil, fmt.Errorf("Must specify a single key to watch")
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
kv := p.client.KV()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return nil, nil, err
}
if pair == nil {
return WaitIndexVal(meta.LastIndex), nil, err
}
return WaitIndexVal(meta.LastIndex), pair, err
}
return fn, nil
}
// keyPrefixWatch is used to return a key prefix watching function
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
var prefix string
if err := assignValue(params, "prefix", &prefix); err != nil {
return nil, err
}
if prefix == "" {
return nil, fmt.Errorf("Must specify a single prefix to watch")
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
kv := p.client.KV()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), pairs, err
}
return fn, nil
}
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
services, meta, err := catalog.Services(&opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), services, err
}
return fn, nil
}
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), nodes, err
}
return fn, nil
}
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
var (
service string
tags []string
)
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if service == "" {
return nil, fmt.Errorf("Must specify a single service to watch")
}
if err := assignValueStringSlice(params, "tag", &tags); err != nil {
return nil, err
}
passingOnly := false
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), nodes, err
}
return fn, nil
}
// checksWatch is used to watch a specific checks in a given state
func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
var service, state string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
if service == "" && state == "" {
state = "any"
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
}
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), checks, err
}
return fn, nil
}
// eventWatch is used to watch for events, optionally filtering on name
func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
// The stale setting doesn't apply to events.
var name string
if err := assignValue(params, "name", &name); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
event := p.client.Event()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
events, meta, err := event.List(name, &opts)
if err != nil {
return nil, nil, err
}
// Prune to only the new events
for i := 0; i < len(events); i++ {
if WaitIndexVal(event.IDToIndex(events[i].ID)).Equal(p.lastParamVal) {
events = events[i+1:]
break
}
}
return WaitIndexVal(meta.LastIndex), events, err
}
return fn, nil
}
// connectRootsWatch is used to watch for changes to Connect Root certificates.
func connectRootsWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support stale since roots are cached locally in the agent.
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
agent := p.client.Agent()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
roots, meta, err := agent.ConnectCARoots(&opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), roots, err
}
return fn, nil
}
// connectLeafWatch is used to watch for changes to Connect Leaf certificates
// for given local service id.
func connectLeafWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support stale since certs are cached locally in the agent.
var serviceName string
if err := assignValue(params, "service", &serviceName); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
agent := p.client.Agent()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
leaf, meta, err := agent.ConnectCALeaf(serviceName, &opts)
if err != nil {
return nil, nil, err
}
return WaitIndexVal(meta.LastIndex), leaf, err
}
return fn, nil
}
// agentServiceWatch is used to watch for changes to a single service instance
// on the local agent. Note that this state is agent-local so the watch
// mechanism uses `hash` rather than `index` for deciding whether to block.
func agentServiceWatch(params map[string]interface{}) (WatcherFunc, error) {
// We don't support consistency modes since it's agent local data
var serviceID string
if err := assignValue(params, "service_id", &serviceID); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
agent := p.client.Agent()
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
svc, _, err := agent.Service(serviceID, &opts)
if err != nil {
return nil, nil, err
}
// Return string ContentHash since we don't have Raft indexes to block on.
return WaitHashVal(svc.ContentHash), svc, err
}
return fn, nil
}
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
ctx, cancel := context.WithCancel(context.Background())
p.setCancelFunc(cancel)
opts := consulapi.QueryOptions{AllowStale: stale}
switch param := p.lastParamVal.(type) {
case WaitIndexVal:
opts.WaitIndex = uint64(param)
case WaitHashVal:
opts.WaitHash = string(param)
}
return *opts.WithContext(ctx)
}
package watch
import (
"context"
"fmt"
"log"
"os"
"reflect"
"time"
consulapi "github.com/hashicorp/consul/api"
)
const (
// retryInterval is the base retry value
retryInterval = 5 * time.Second
// maximum back off time, this is to prevent
// exponential runaway
maxBackoffTime = 180 * time.Second
)
func (p *Plan) Run(address string) error {
return p.RunWithConfig(address, nil)
}
// Run is used to run a watch plan
func (p *Plan) RunWithConfig(address string, conf *consulapi.Config) error {
// Setup the client
p.address = address
if conf == nil {
conf = consulapi.DefaultConfig()
}
conf.Address = address
conf.Datacenter = p.Datacenter
conf.Token = p.Token
client, err := consulapi.NewClient(conf)
if err != nil {
return fmt.Errorf("Failed to connect to agent: %v", err)
}
// Create the logger
output := p.LogOutput
if output == nil {
output = os.Stderr
}
logger := log.New(output, "", log.LstdFlags)
return p.RunWithClientAndLogger(client, logger)
}
// RunWithClientAndLogger runs a watch plan using an external client and
// log.Logger instance. Using this, the plan's Datacenter, Token and LogOutput
// fields are ignored and the passed client is expected to be configured as
// needed.
func (p *Plan) RunWithClientAndLogger(client *consulapi.Client,
logger *log.Logger) error {
p.client = client
// Loop until we are canceled
failures := 0
OUTER:
for !p.shouldStop() {
// Invoke the handler
blockParamVal, result, err := p.Watcher(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
}
// Handle an error in the watch function
if err != nil {
// Perform an exponential backoff
failures++
if blockParamVal == nil {
p.lastParamVal = nil
} else {
p.lastParamVal = blockParamVal.Next(p.lastParamVal)
}
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
logger.Printf("[ERR] consul.watch: Watch (type: %s) errored: %v, retry in %v",
p.Type, err, retry)
select {
case <-time.After(retry):
continue OUTER
case <-p.stopCh:
return nil
}
}
// Clear the failures
failures = 0
// If the index is unchanged do nothing
if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) {
continue
}
// Update the index, look for change
oldParamVal := p.lastParamVal
p.lastParamVal = blockParamVal.Next(oldParamVal)
if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) {
continue
}
// Handle the updated result
p.lastResult = result
// If a hybrid handler exists use that
if p.HybridHandler != nil {
p.HybridHandler(blockParamVal, result)
} else if p.Handler != nil {
idx, ok := blockParamVal.(WaitIndexVal)
if !ok {
logger.Printf("[ERR] consul.watch: Handler only supports index-based " +
" watches but non index-based watch run. Skipping Handler.")
}
p.Handler(uint64(idx), result)
}
}
return nil
}
// Stop is used to stop running the watch plan
func (p *Plan) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stop {
return
}
p.stop = true
if p.cancelFunc != nil {
p.cancelFunc()
}
close(p.stopCh)
}
func (p *Plan) shouldStop() bool {
select {
case <-p.stopCh:
return true
default:
return false
}
}
func (p *Plan) setCancelFunc(cancel context.CancelFunc) {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.shouldStop() {
// The watch is stopped and execute the new cancel func to stop watchFactory
cancel()
return
}
p.cancelFunc = cancel
}
func (p *Plan) IsStopped() bool {
p.stopLock.Lock()
defer p.stopLock.Unlock()
return p.stop
}
package watch
import (
"context"
"fmt"
"io"
"sync"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/mitchellh/mapstructure"
)
const DefaultTimeout = 10 * time.Second
// Plan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type Plan struct {
Datacenter string
Token string
Type string
HandlerType string
Exempt map[string]interface{}
Watcher WatcherFunc
// Handler is kept for backward compatibility but only supports watches based
// on index param. To support hash based watches, set HybridHandler instead.
Handler HandlerFunc
HybridHandler HybridHandlerFunc
LogOutput io.Writer
address string
client *consulapi.Client
lastParamVal BlockingParamVal
lastResult interface{}
stop bool
stopCh chan struct{}
stopLock sync.Mutex
cancelFunc context.CancelFunc
}
type HttpHandlerConfig struct {
Path string `mapstructure:"path"`
Method string `mapstructure:"method"`
Timeout time.Duration `mapstructure:"-"`
TimeoutRaw string `mapstructure:"timeout"`
Header map[string][]string `mapstructure:"header"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}
// BlockingParamVal is an interface representing the common operations needed for
// different styles of blocking. It's used to abstract the core watch plan from
// whether we are performing index-based or hash-based blocking.
type BlockingParamVal interface {
// Equal returns whether the other param value should be considered equal
// (i.e. representing no change in the watched resource). Equal must not panic
// if other is nil.
Equal(other BlockingParamVal) bool
// Next is called when deciding which value to use on the next blocking call.
// It assumes the BlockingParamVal value it is called on is the most recent one
// returned and passes the previous one which may be nil as context. This
// allows types to customize logic around ordering without assuming there is
// an order. For example WaitIndexVal can check that the index didn't go
// backwards and if it did then reset to 0. Most other cases should just
// return themselves (the most recent value) to be used in the next request.
Next(previous BlockingParamVal) BlockingParamVal
}
// WaitIndexVal is a type representing a Consul index that implements
// BlockingParamVal.
type WaitIndexVal uint64
// Equal implements BlockingParamVal
func (idx WaitIndexVal) Equal(other BlockingParamVal) bool {
if otherIdx, ok := other.(WaitIndexVal); ok {
return idx == otherIdx
}
return false
}
// Next implements BlockingParamVal
func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal {
if previous == nil {
return idx
}
prevIdx, ok := previous.(WaitIndexVal)
if ok && prevIdx > idx {
// This value is smaller than the previous index, reset.
return WaitIndexVal(0)
}
return idx
}
// WaitHashVal is a type representing a Consul content hash that implements
// BlockingParamVal.
type WaitHashVal string
// Equal implements BlockingParamVal
func (h WaitHashVal) Equal(other BlockingParamVal) bool {
if otherHash, ok := other.(WaitHashVal); ok {
return h == otherHash
}
return false
}
// Next implements BlockingParamVal
func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal {
return h
}
// WatcherFunc is used to watch for a diff.
type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error)
// HandlerFunc is used to handle new data. It only works for index-based watches
// (which is almost all end points currently) and is kept for backwards
// compatibility until more places can make use of hash-based watches too.
type HandlerFunc func(uint64, interface{})
// HybridHandlerFunc is used to handle new data. It can support either
// index-based or hash-based watches via the BlockingParamVal.
type HybridHandlerFunc func(BlockingParamVal, interface{})
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse(params map[string]interface{}) (*Plan, error) {
return ParseExempt(params, nil)
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
plan := &Plan{
stopCh: make(chan struct{}),
Exempt: make(map[string]interface{}),
}
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
// Ensure there is a watch type
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
// Get the specific handler
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil {
return nil, err
}
switch plan.HandlerType {
case "http":
if _, ok := params["http_handler_config"]; !ok {
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set")
}
config, err := parseHttpHandlerConfig(params["http_handler_config"])
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err))
}
plan.Exempt["http_handler_config"] = config
delete(params, "http_handler_config")
case "script":
// Let the caller check for configuration in exempt parameters
}
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
}
// Get the watch func
fn, err := factory(params)
if err != nil {
return nil, err
}
plan.Watcher = fn
// Remove the exempt parameters
if len(exempt) > 0 {
for _, ex := range exempt {
val, ok := params[ex]
if ok {
plan.Exempt[ex] = val
delete(params, ex)
}
}
}
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
return plan, nil
}
// assignValue is used to extract a value ensuring it is a string
func assignValue(params map[string]interface{}, name string, out *string) error {
if raw, ok := params[name]; ok {
val, ok := raw.(string)
if !ok {
return fmt.Errorf("Expecting %s to be a string", name)
}
*out = val
delete(params, name)
}
return nil
}
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool(params map[string]interface{}, name string, out *bool) error {
if raw, ok := params[name]; ok {
val, ok := raw.(bool)
if !ok {
return fmt.Errorf("Expecting %s to be a boolean", name)
}
*out = val
delete(params, name)
}
return nil
}
// assignValueStringSlice is used to extract a value ensuring it is either a string or a slice of strings
func assignValueStringSlice(params map[string]interface{}, name string, out *[]string) error {
if raw, ok := params[name]; ok {
var tmp []string
switch raw.(type) {
case string:
tmp = make([]string, 1, 1)
tmp[0] = raw.(string)
case []string:
l := len(raw.([]string))
tmp = make([]string, l, l)
copy(tmp, raw.([]string))
case []interface{}:
l := len(raw.([]interface{}))
tmp = make([]string, l, l)
for i, v := range raw.([]interface{}) {
if s, ok := v.(string); ok {
tmp[i] = s
} else {
return fmt.Errorf("Index %d of %s expected to be string", i, name)
}
}
default:
return fmt.Errorf("Expecting %s to be a string or []string", name)
}
*out = tmp
delete(params, name)
}
return nil
}
// Parse the 'http_handler_config' parameters
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) {
var config HttpHandlerConfig
if err := mapstructure.Decode(configParams, &config); err != nil {
return nil, err
}
if config.Path == "" {
return nil, fmt.Errorf("Requires 'path' to be set")
}
if config.Method == "" {
config.Method = "POST"
}
if config.TimeoutRaw == "" {
config.Timeout = DefaultTimeout
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil {
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err))
} else {
config.Timeout = timeout
}
return &config, nil
}
The MIT License (MIT)
Copyright (c) 2013-2016 Errplane Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
This diff is collapsed.
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}
package models
import (
"errors"
"strings"
)
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful.
//
// The consistency level is handled in open-source InfluxDB but only applicable to clusters.
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet.
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write.
ConsistencyLevelOne
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write.
ConsistencyLevelAll
)
var (
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const.
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}
package models // import "github.com/influxdata/influxdb/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}
package models // import "github.com/influxdata/influxdb/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseUint(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}
This diff is collapsed.
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}
// +build uint uint64
package models
func init() {
EnableUintSupport()
}
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
// The output size will be no more than inLen. Preallocating the
// capacity of the output is faster and uses less memory than
// letting append() do its own (over)allocation.
out := make([]byte, 0, inLen)
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}
MIT License
Copyright (c) 2018 InfluxData
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
package models // import "github.com/influxdata/platform/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}
package models // import "github.com/influxdata/platform/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseUintBytes is a zero-alloc wrapper around strconv.ParseUint.
func parseUintBytes(b []byte, base int, bitSize int) (i uint64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseUint(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// +build uint uint64
package models
func init() {
EnableUintSupport()
}
This diff is collapsed.
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment