Commit 651e4e91 authored by jingbo.wang's avatar jingbo.wang

一顿瞎p删

parent 8b222bcf
module git.quantgroup.cn/DevOps/enoch
go 1.12
require (
github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
github.com/Shopify/sarama v1.23.1
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
github.com/bsm/sarama-cluster v2.1.15+incompatible
github.com/coredns/coredns v1.6.3 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/envoyproxy/go-control-plane v0.8.6 // indirect
github.com/gogo/googleapis v1.3.0 // indirect
github.com/Shopify/sarama v1.24.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/consul v1.4.0
github.com/hashicorp/go-discover v0.0.0-20190905142513-34a650575f6c // indirect
github.com/hashicorp/hil v0.0.0-20190212132231-97b3a9cdfa93 // indirect
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 // indirect
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 // indirect
github.com/influxdata/influxdb v1.7.2
github.com/influxdata/platform v0.0.0-20181221041837-9d0ed9020c3f // indirect
github.com/json-iterator/go v1.1.7
github.com/mitchellh/hashstructure v1.0.0 // indirect
github.com/influxdata/influxdb v1.7.9
github.com/mkevac/debugcharts v0.0.0-20180124214838-d3203a8fa926
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/shirou/gopsutil v2.19.10+incompatible // indirect
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
github.com/valyala/fasthttp v1.6.0
github.com/vrg0/go-common v0.0.0-20191111075058-891b371eb18a
go.uber.org/zap v1.10.0
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
go.uber.org/zap v1.13.0
)
......@@ -18,7 +18,7 @@ func DbInit() {
return
}
//如果数据库不存在,创建数据库
//如果数据库不存在,创建数据库,可重复创建不会覆盖掉原数据库
if _, err := c.Query(client.Query{Command: "create database " + global.InfluxDbName}); err != nil {
glog.Error(err)
return
......
......@@ -8,6 +8,7 @@ import (
"github.com/vrg0/go-common/conf"
"github.com/vrg0/go-common/kafka"
"github.com/vrg0/go-common/logger"
"github.com/vrg0/go-common/registry"
"github.com/vrg0/go-common/util"
"go.uber.org/zap/zapcore"
"os"
......@@ -43,6 +44,8 @@ var (
KafkaRecver *kafka.Recver = nil
InfluxDbAddress = ""
DaoFileCacheDir = ""
ConsulDc = ""
ConsulAddress = ""
)
type EosResult struct {
......@@ -101,6 +104,22 @@ func init() {
InfluxDbAddress = Config.GetOrDefault(NamespaceApplication, "influxdb.address", "")
DaoFileCacheDir = Config.GetOrDefault(NamespaceApplication, "dao.file.cache.dir", "/var")
//初始化registry
if consulDc, ok := Config.Get(NamespaceApplication, "consul.datacenter"); !ok {
Logger.Error("get must conf error: application consul.datacenter")
} else {
ConsulDc = consulDc
}
if consulAddress, ok := Config.Get(NamespaceApplication, "consul.address"); !ok {
Logger.Error("get must conf error: application consul.address")
} else {
ConsulAddress = consulAddress
}
consulCluster := strings.Split(ConsulAddress, ",")
if e := registry.Init("consul", map[string]interface{}{"dc": ConsulDc, "cluster": consulCluster}); e != nil {
Logger.Error("consul初始化失败", e)
}
}
func getLoggerLevel(level string) zapcore.Level {
......
package conf
import "time"
type Config struct {
AppName string `json:"app_name"`
Port string `json:"port"`
Env string
Redis Redis `json:"redis"`
Kafka Kafka `json:"kafka"`
InfluxDb InfluxDb `json:"influx_db"`
ApdexThreshold ApdexThreshold `json:"apdex_threshold"`
StrategyConfPath string `json:"strategy_conf_path"`
FileCachePath string `json:"file_cache_path"`
}
type Redis struct {
Host string `json:"host"`
Port string `json:"port"`
MaxIdle int `json:"max_idle"`
MaxActive int `json:"max_active"`
IdleTimeout time.Duration `json:"idle_timeout"`
}
type Kafka struct {
Brokers []string `json:"brokers"`
}
type InfluxDb struct {
Host string `json:"host"`
Port string `json:"port"`
}
type ApdexThreshold struct {
Common int `json:"common"`
Personal map[string]int `json:"personal"`
}
{
"app_name": "enoch",
"port": "9091",
"redis": {
"host": "172.30.12.2",
"port": "6379",
"max_idle": 0,
"max_active": 5000,
"idle_timeout": 120
},
"kafka": {
"brokers": ["192.168.4.100:15091", "192.168.4.100:15092", "192.168.4.100:15093"]
},
"influx_db": {
"host": "http://192.168.4.100",
"port": "8086"
}
}
package conf
type KafkaConf struct {
Broker []string
Topic string
Group string
}
func HealthTopic() KafkaConf {
return KafkaConf{
GlobalConfig.Kafka.Brokers,
"quantGroup.tech.enoch.pro",
"quantGroup-enoch-agent"}
}
func BraveTopic() KafkaConf {
return KafkaConf{
GlobalConfig.Kafka.Brokers,
"quantGroup.tech.brave.pro",
"enoch-group"}
}
package conf
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log"
"io/ioutil"
"net/http"
)
var GlobalConfig = Config{}
func Load(denv string, didc string) {
url := "http://apollo-" + denv + ".quantgroups.com/configfiles/real_json/enoch/" + didc + "/tech.config.json"
resp, err := http.Get(url)
defer func() { _ = resp.Body.Close() }()
bytes, err := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(bytes, &GlobalConfig)
GlobalConfig.Env = denv
if nil != err {
logger.Error.Println(err)
}
}
{
"app_name": "enoch",
"port": "9091",
"redis": {
"host": "172.30.12.2",
"port": "6379",
"max_idle": 0,
"max_active": 5000,
"idle_timeout": 120
},
"kafka": {
"brokers": ["172.30.12.19:9092", "172.30.12.20:9092", "172.30.12.21:9092"]
},
"influx_db": {
"host": "http://172.20.6.33",
"port": "8086"
}
}
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/pkg/end_points"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/gomodule/redigo/redis"
"github.com/influxdata/influxdb/client/v2"
"log"
"strings"
"time"
)
type BraveMessageHandler struct {
}
var httpMethod = map[string]string{
"get": "", "GET": "", "post": "", "options": "", "put": "", "delete": "", "head": "",
}
func (BraveMessageHandler) MsgProcess(msg string) {
traceMsg := make([]end_points.TraceMsg, 3) //[]TraceMsg{}
err := json.Unmarshal([]byte(msg), &traceMsg)
if err != nil {
logger.Error.Println("brave 解析msg失败:", err)
}
msgInfluxProcess(traceMsg)
}
func (BraveMessageHandler) Destroy() {
if len(pointSlice) > 0 {
logger.Info.Println("braveMessageHandler 提交本地缓存数据:", len(pointSlice))
batchWrite(pointSlice)
}
}
var pointSlice = make([]*client.Point, 0, batchSize)
func msgInfluxProcess(traceMsgs []end_points.TraceMsg) {
for _, traceMsg := range traceMsgs {
if traceMsg.Kind != "SERVER" {
continue
}
path := traceMsg.Name
if _, ok := httpMethod[path]; ok {
path = traceMsg.Tags.HttpMethod + " " + traceMsg.Tags.HttpPath
}
path = strings.ToLower(path)
sysName := traceMsg.LocalEndpoint.ServiceName
fields := make(map[string]interface{})
fields["duration"] = traceMsg.Duration / 1000
fields["traceId"] = traceMsg.TraceId
bytes, err := json.Marshal(traceMsg)
msg := string(bytes)
fields["msg"] = msg
tags := make(map[string]string)
tags["sys_name"] = sysName
tags["path"] = path
tags["host"] = traceMsg.LocalEndpoint.Ipv4
if err != nil {
log.Fatal(err)
}
unix := time.Unix(0, traceMsg.Timestamp*1000)
if len(pointSlice) == batchSize {
go batchWrite(pointSlice)
pointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("trace_info", tags, fields, unix)
pointSlice = append(pointSlice, point)
if err != nil {
log.Fatal(err, msg)
}
}
}
func Duration(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get()
defer func() { _ = conn.Close() }()
matchPattern := "duration*" + day
reply, err := redis.Values(conn.Do("scan", 0, "match", matchPattern, "count", 10000))
if err != nil {
fmt.Print("Scan 扫描key出错", err)
}
redisKeys := reply[1].([]interface{})
for _, redisKey := range redisKeys {
reply2, err := redis.StringMap(conn.Do("zrevrangebyscore", redisKey, "+inf", 200000, "withscores"))
if err != nil {
fmt.Print("获取出错了", err)
}
fun(string(redisKey.([]uint8)), reply2)
}
}
func Counter(day string, fun func(sysName string, durations map[string]string)) {
conn := data.Pool.Get()
defer func() { _ = conn.Close() }()
matchPattern := "counter*" + day
reply, err := redis.Values(conn.Do("scan", 0, "match", matchPattern, "count", 10000))
if err != nil {
fmt.Print("Scan 扫描key出错", err)
}
redisKeys := reply[1].([]interface{})
for _, redisKey := range redisKeys {
reply2, err := redis.StringMap(conn.Do("zrevrange", redisKey, 0, 300, "withscores"))
if err != nil {
fmt.Print("获取出错了", err)
}
fun(string(redisKey.([]uint8)), reply2)
}
}
package consumer
var batchSize = 5000
package consumer
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/pkg/end_points"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
"math/big"
"net"
"time"
)
var sysNameIndex = make(map[int64]bool)
var metricsPointSlice = make([]*client.Point, 0, batchSize)
var healthPointSlice = make([]*client.Point, 0, batchSize)
type HealthMessageHandler struct {
}
func (HealthMessageHandler) MsgProcess(msg string) {
chunkMsg := end_points.ChunkMsg{}
err := json.Unmarshal([]byte(msg), &chunkMsg)
if err != nil {
logger.Error.Println("healthMessageHandler解析json失败:", err)
logger.Error.Println(msg)
return
}
buildMsg(chunkMsg)
}
func (HealthMessageHandler) Destroy() {
if len(metricsPointSlice) > 0 {
logger.Info.Println("metricsMessageHandler 提交本地缓存数据:", len(metricsPointSlice))
batchWrite(metricsPointSlice)
}
if len(healthPointSlice) > 0 {
logger.Info.Println("HealthMessageHandler 提交本地缓存数据:", len(healthPointSlice))
batchWrite(healthPointSlice)
}
}
func buildHealthInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, db *[]byte) {
tags := make(map[string]string, )
tags["sys_name"] = appName
tags["host"] = ip
fields := make(map[string]interface{})
dbInfo := end_points.DBInfo{}
err := json.Unmarshal(*db, &dbInfo)
if err != nil {
dbDetails := end_points.DBDetail{}
err = json.Unmarshal(*db, &dbDetails)
if err == nil {
fields[dbDetails.Details.Database] = isOK(dbDetails.Status.Code)
}
}else {
for k, v := range dbInfo.Details {
var fieldName = v.Details.Database + "—" + k
fields[fieldName] = isOK(v.Status.Code)
}
}
if len(healthPointSlice) >= submitLimit {
go batchWrite(healthPointSlice)
healthPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("health_info", tags, fields, timestamp)
healthPointSlice = append(healthPointSlice, point)
}
func buildMetricsInfluxMsg(appName string, ip string, timestamp time.Time, submitLimit int, health end_points.Health, metrics end_points.MetricsInfo) {
tags := make(map[string]string, )
fields := make(map[string]interface{})
tags["sys_name"] = appName
tags["host"] = ip
status := health.Status
fields["sever_status"] = isOK(status.Code)
redis := health.Details.Redis
fields["redis_status"] = isOK(redis.Status.Code)
diskSpace := health.Details.DiskSpace.Details
fields["disk_tol"] = diskSpace.Total
fields["disk_free"] = diskSpace.Free
fields["disk_threshold"] = diskSpace.Threshold
fields["mem_tol"] = metrics.Mem
fields["mem_free"] = metrics.MemFree
fields["heap_tol"] = metrics.Heap
fields["heap_init"] = metrics.HeapInit
fields["heap_used"] = metrics.HeapUsed
fields["nonheap_tol"] = metrics.Nonheap
fields["nonheap_init"] = metrics.NonheapInit
fields["nonheap_used"] = metrics.NonheapUsed
fields["nonheap_commit"] = metrics.NonheapCommitted
fields["thread_tol"] = metrics.ThreadsTotalStarted
fields["thread_peak"] = metrics.ThreadsPeak
fields["thread_daemon"] = metrics.ThreadsDaemon
fields["class_tol"] = metrics.Classes
fields["class_loaded"] = metrics.ClassesLoaded
fields["class_unloaded"] = metrics.ClassesUnloaded
fields["gc_parnew_count"] = metrics.GcParnewCount
fields["gc_parnew_time"] = metrics.GcParnewTime
fields["gc_concurrent_mark_sweep"] = metrics.GcConcurrentmarksweepCount
fields["gc_concurrent_mark_time"] = metrics.GcConcurrentmarksweepTime
fields["uptime"] = metrics.Uptime
fields["instance_uptime"] = metrics.InstanceUptime
fields["system_load_average"] = metrics.SystemloadAverage
fields["processors"] = metrics.Processors
if len(metricsPointSlice) >= submitLimit {
go batchWrite(metricsPointSlice)
metricsPointSlice = make([]*client.Point, 0, batchSize)
}
point, _ := client.NewPoint("machine_info", tags, fields, timestamp)
metricsPointSlice = append(metricsPointSlice, point)
}
func buildMsg(chunkMsg end_points.ChunkMsg) {
var ip = inetAtoN(chunkMsg.Ip)
sysNameIndex[ip] = true
var sysNameCount = len(sysNameIndex)
for _, p := range chunkMsg.EndPoints {
var appName = chunkMsg.AppName
var ip = chunkMsg.Ip
unix := time.Unix(0, p.Timestamp*1000000)
//metricsInfo
buildMetricsInfluxMsg(appName, ip, unix, sysNameCount, p.Health, p.Metrics)
//health_info
dbByte, _ := json.Marshal(p.Health.Details.Db)
buildHealthInfluxMsg(appName, ip, unix, sysNameCount, &dbByte)
}
}
func inetAtoN(ip string) int64 {
ret := big.NewInt(0)
ret.SetBytes(net.ParseIP(ip).To4())
return ret.Int64()
}
func isOK(code string) int {
if "UP" == code {
return 1
}
return 0
}
package consumer
import (
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"os"
"os/signal"
"sync/atomic"
"syscall"
)
var consumerCount int32
func AgentClusterConsumer(kafkaConf conf.KafkaConf, messageHandle MessageHandler) {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Group.Return.Notifications = true
consumer, err := cluster.NewConsumer(kafkaConf.Broker, kafkaConf.Group, []string{kafkaConf.Topic}, config)
if err != nil {
panic(err)
}
atomic.AddInt32(&consumerCount, 1)
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
defer func() {
_ = consumer.Close()
messageHandle.Destroy()
atomic.AddInt32(&consumerCount, -1)
logger.Info.Println("consumer结束")
if consumerCount == 0 {
os.Exit(0)
}
}()
// consume errors
go func() {
for err := range consumer.Errors() {
logger.Error.Println("consume error:", err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
logger.Info.Printf("Rebalanced: %+v\n", ntf)
}
}()
// consume messages, watch signals
for {
select {
case msg, ok := <-consumer.Messages():
if ok {
messageHandle.MsgProcess(string(msg.Value))
consumer.MarkOffset(msg, "") // mark message as processed
}
case <-signals:
return
}
}
}
package consumer
type MessageHandler interface {
MsgProcess(msg string)
Destroy()
}
package consumer
import (
"encoding/json"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/file_cache"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/influxdata/influxdb/client/v2"
)
func batchWrite(pointArray []*client.Point) {
if file_cache.Enabled() {
logger.Info.Println("写入缓存")
fileWrite(pointArray)
} else {
err := httpWrite(pointArray)
if err != nil {
file_cache.OpenCache()
fileWrite(pointArray)
}
logger.Info.Println("写入influx", len(pointArray))
}
}
func httpWrite(pointArray []*client.Point) error {
c := data.NewClient()
defer func() { _ = c.Close() }()
points, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "monitor",
//Precision : "ms",
})
if err != nil {
return err
}
points.AddPoints(pointArray)
err = c.Write(points)
if err != nil {
return err
}
return nil
}
func ReSubmit(data []string) error {
pointSlice := make([]*client.Point, 0)
for _, v := range data {
cp := file_cache.CreateCachePoint(v)
point, err := client.NewPoint(cp.Name, cp.Tags, cp.Fields, cp.Time)
if err != nil {
logger.Error.Println("构造client.point异常", err)
}
pointSlice = append(pointSlice, point)
if len(pointSlice) > 1000 {
err := httpWrite(pointSlice)
if err != nil {
return err
}
logger.Info.Println("缓存重新提交:1000")
pointSlice = make([]*client.Point, 0)
}
}
if len(pointSlice) > 0 {
err := httpWrite(pointSlice)
if err != nil {
logger.Info.Println(pointSlice)
return err
}
logger.Info.Println("缓存重新提交:", len(pointSlice))
}
logger.Info.Println("重新提交")
return nil
}
func fileWrite(pointArray []*client.Point) {
for _, p := range pointArray {
if p != nil {
current := file_cache.NewPoint(p)
data, err := json.Marshal(current)
if err != nil {
fmt.Println(err)
}
file_cache.Write(string(data))
}
}
}
......@@ -5,8 +5,7 @@ import (
"git.quantgroup.cn/DevOps/enoch/service/conf"
"git.quantgroup.cn/DevOps/enoch/service/data"
"git.quantgroup.cn/DevOps/enoch/service/log"
"net"
"os"
"github.com/vrg0/go-common/logger"
"strings"
)
......@@ -74,22 +73,3 @@ func Load() {
name = query(data.MONITOR)
logger.Info.Println("new: ", name)
}
func checkIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info.Println(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
\ No newline at end of file
package dingding
import (
"encoding/json"
"net/http"
"strings"
)
var DefaultDingURL = []string{"https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"}
type DinDingMsg struct {
MsgType string `json:"msgtype"`
// Link Link `json:"link"`
Text Text `json:"text"`
}
type Text struct {
Content string `json:"content"`
}
func SenderDingDing(info string, receiver [] string) error {
bodyStr := buildDingDingMsg(info)
for _, r := range receiver {
data := strings.NewReader(string(bodyStr))
_, e := http.Post(r, "application/json;charset=utf-8", data)
if e != nil {
return e
}
}
return nil
}
func buildDingDingMsg(info string) []byte {
msg := DinDingMsg{
MsgType: "text",
Text:Text{
Content:info,
},
}
msgStr, _ := json.Marshal(msg)
return msgStr
}
package dingding
import "testing"
var testDingUrl = []string{
"https://oapi.dingtalk.com/robot/send?access_token=05b2c46e460e94b32044251c0f8af666c55a4600758fda508120f962d589ea47",
}
func TestSenderDingDing(t *testing.T) {
e := SenderDingDing( "叮叮测试,吧啦吧啦吧", testDingUrl)
if e != nil {
t.Error(e)
}
}
package file_cache
import (
"encoding/json"
"github.com/influxdata/influxdb/client/v2"
"github.com/json-iterator/go"
"strconv"
"time"
"unsafe"
)
type CachePoint struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Fields map[string]interface{} `json:"fields"`
Time time.Time `json:"time"`
}
func NewPoint(point *client.Point) CachePoint {
cp := CachePoint{}
cp.Name = point.Name()
cp.Time = point.Time()
cp.Fields, _ = point.Fields()
cp.Tags = point.Tags()
return cp
}
func CreateCachePoint(data string) *CachePoint {
cp := CachePoint{}
jsoniter.Unmarshal([]byte(data), &cp)
if v, ok := cp.Fields["system_load_average"]; ok {
switch v.(type) {
case float64:
break
case int:
cp.Fields["system_load_average"],_ = strconv.ParseFloat(strconv.Itoa(v.(int)), 64)
break
case int64:
cp.Fields["system_load_average"],_ = strconv.ParseFloat(strconv.FormatInt(v.(int64),10), 64)
break
}
}
return &cp
}
func init() {
decodeNumberAsInt64IfPossible := func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
switch iter.WhatIsNext() {
case jsoniter.NumberValue:
var number json.Number
iter.ReadVal(&number)
i, err := strconv.ParseInt(string(number), 10, 64)
if err == nil {
*(*interface{})(ptr) = i
return
}
f, err := strconv.ParseFloat(string(number), 64)
if err == nil {
*(*interface{})(ptr) = f
return
}
// Not much we can do here.
default:
*(*interface{})(ptr) = iter.Read()
}
}
jsoniter.RegisterTypeDecoderFunc("interface {}", decodeNumberAsInt64IfPossible)
}
package file_cache
import (
"bufio"
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"time"
)
const (
WRITING = ".writing"
DELETE = ".delete"
CACHE = ".cache"
)
type fileHolder struct {
fileName string
path string
file *os.File
fileSize fileSize
}
type fileSize struct {
size int
lock sync.Mutex
}
func (fs *fileSize) Get() int {
return fs.size
}
func (fs *fileSize) Add(offset int) {
fs.lock.Lock()
defer fs.lock.Unlock()
fs.size = fs.size + offset
if fs.size < 0 {
fs.size = 0
return
}
}
var currentFile *fileHolder
var fileCacheOnce sync.Once
func Load(path string) {
if path == "" {
path = "../"
}
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
if !pathExists(path) {
mkdir(path)
}
currentFile = &fileHolder{}
currentFile.path = path
go func() {
fileCacheOnce.Do(func() {
mark() //将因为服务停止导致未及时更改状态的文件修改为缓存状态
Delete() //删除过期文件
pickup() //统计上次未提交的缓存文件的数量
})
}()
}
func create() {
current := time.Now().Unix()
currentFile.fileName = currentFile.path + strconv.FormatInt(current, 10)
file, err := os.Create(currentFile.fileName + WRITING)
if err != nil {
logger.Error.Println("创建缓存文件失败", err)
}
logger.Info.Println("打开缓存文件", currentFile.fileName)
currentFile.file = file
currentFile.fileSize.Add(1)
logger.Info.Println("文件缓存数:", currentFile.fileSize.Get())
}
func closed() {
defer os.Rename(currentFile.fileName+WRITING, currentFile.fileName+CACHE)
logger.Info.Println("关闭缓存文件")
currentFile.file.Close()
}
func Delete() {
fileNames := scan(DELETE)
for _, name := range fileNames {
os.Remove(currentFile.path + name)
logger.Info.Println("删除缓存文件", name)
}
}
func pickup() {
logger.Info.Println("获取缓存文件数量")
files := scan(CACHE)
currentFile.fileSize.Add(len(files))
}
func mark() {
fileName := scan(WRITING)
logger.Info.Println("重新标记缓存文件")
for _, name := range fileName {
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+CACHE)
}
}
func Recover(submit func(data []string) error) {
if currentFile.fileSize.Get() <= 0 {
return
}
fileNames := scan(CACHE)
for _, name := range fileNames {
err := submit(Read(currentFile.path + name))
if err != nil {
logger.Error.Println("重新提交缓存异常:", err)
return
}
os.Rename(currentFile.path+name, currentFile.path+strings.Split(name, ".")[0]+DELETE)
currentFile.fileSize.Add(-1)
logger.Info.Println("文件缓存数:", currentFile.fileSize.Get())
}
go Delete()
}
func Write(data string) error {
writer := bufio.NewWriter(currentFile.file)
_, err := writer.WriteString(data + "\n")
if err != nil {
return err
}
go writer.Flush()
return nil
}
func scan(suffix string) []string {
files, _ := ioutil.ReadDir(currentFile.path)
fileNames := make([]string, 0)
for _, f := range files {
if strings.HasSuffix(f.Name(), suffix) {
fileNames = append(fileNames, f.Name())
}
}
return fileNames
}
func Read(fileName string) []string {
file, err := os.Open(fileName)
if err != nil {
logger.Error.Println("未找到对应的文件:", err)
}
reader := bufio.NewReader(file)
data := make([]string, 0)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
data = append(data, string(line))
}
return data
}
func pathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
}
return !os.IsNotExist(err)
}
func mkdir(path string) bool {
err := os.Mkdir(path, os.ModePerm)
return err == nil
}
func RegisterJob(submit func(data []string) error) {
c := cron.New()
err := c.AddFunc("@every 1m", func() {
if !Enabled() {
logger.Info.Println("开始扫描缓存文件")
Recover(submit)
}
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
package file_cache
import (
"encoding/json"
"git.quantgroup.cn/DevOps/enoch/service/log"
"net/http"
"strings"
"sync"
"time"
)
/**
通过开关控制是否启用文件缓存,每次默认写入超过1分钟即关闭缓存
*/
type switcher struct {
state bool
origin int64
lock sync.Mutex
}
func (s *switcher) turnOn() {
s.lock.Lock()
defer s.lock.Unlock()
if !s.state { //打开状态,则关闭
s.state = true
s.origin = time.Now().Unix()
create()
senderDingDing()
}
}
func (s *switcher) turnOff() {
s.lock.Lock()
defer s.lock.Unlock()
if s.state {
s.state = false
s.origin = 0
closed()
}
}
func (s *switcher) status() bool {
if s.state {
current := time.Now().Unix()
diff := current - s.origin
if diff >= 60 {
logger.Info.Println("缓存切换")
s.turnOff()
}
}
return s.state
}
var cacheSwitcher *switcher
var alterMsg string
const (
url = "https://oapi.dingtalk.com/robot/send?access_token=9ffab8e4ae5f94e0fbf84aa91c9cb474d9e3d5bd0bb3c2daffe4cdfe0c2cbbc7"
contentType = "application/json;charset=utf-8"
)
func init() {
cacheSwitcher = &switcher{}
alterMsg = buildDingDingMsg()
}
func Enabled() bool {
return cacheSwitcher.status()
}
func OpenCache() {
cacheSwitcher.turnOn()
}
func senderDingDing() {
_, err := http.Post(url, contentType, strings.NewReader(alterMsg))
if err != nil {
logger.Error.Println(err)
}
}
func buildDingDingMsg() string {
msg := dingDingMsg{
MsgType: "text",
Text: text{
Content: "influxdb 写超时,已启用文件缓存",
},
}
msgStr, err := json.Marshal(msg)
if nil != err {
logger.Error.Println("无法序列化ding ding msg", err)
}
return string(msgStr)
}
type dingDingMsg struct {
MsgType string
Text text
}
type text struct {
Content string
}
package job
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service/alarm"
"git.quantgroup.cn/DevOps/enoch/service/log"
"github.com/robfig/cron"
"net"
"os"
"time"
)
/**
报警定时任务,每分钟执行一次
*/
func AutoAlarm() {
c := cron.New()
err := c.AddFunc("@every 1m", func() {
logger.Info.Println("开始执行定时任务", time.Now().Minute())
alarm.Load()
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
func CheckIp(ip string) bool {
addrs, err := net.InterfaceAddrs()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
for _, address := range addrs {
// 检查ip地址判断是否回环地址
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info.Println(ipnet.IP.String())
return ipnet.IP.String() == ip
}
}
}
return false
}
package job
import (
"fmt"
"git.quantgroup.cn/DevOps/enoch/service"
"github.com/robfig/cron"
"time"
)
func AutoEmailPerformInfo() {
c := cron.New()
err := c.AddFunc("@midnight", func() {
day := time.Now().Add(-1 * 24 * time.Hour).Format("20060102")
service.CounterCalcAndSendEmail(day)
service.DurationCalcAndSendEmail(day)
})
if err != nil {
fmt.Print("err")
}
c.Start()
}
package logger
import (
"io"
"log"
"os"
)
var (
Info *log.Logger
Warning *log.Logger
Error * log.Logger
)
func init(){
file, err := os.OpenFile("quantgroup.log", os.O_RDWR|os.O_CREATE, 0666)
if err!=nil{
log.Fatalln("打开日志文件失败:",err)
}
Info = log.New(io.MultiWriter(os.Stderr,file),"Info:",log.Ldate | log.Ltime | log.Lshortfile)
Warning = log.New(io.MultiWriter(os.Stderr,file),"Warning:",log.Ldate | log.Ltime | log.Lshortfile)
Error = log.New(io.MultiWriter(os.Stderr,file),"Error:",log.Ldate | log.Ltime | log.Lshortfile)
}
\ No newline at end of file
package node_check
/*
import (
"context"
"git.quantgroup.cn/DevOps/enoch/service/dingding"
"git.quantgroup.cn/DevOps/enoch/service/log"
"git.quantgroup.cn/DevOps/enoch/service/registry"
"github.com/hashicorp/consul/logger"
"net/http"
"os"
"sync"
......@@ -192,3 +194,4 @@ func NodeCheck() {
select {}
}
*/
package node_check
/*
import "testing"
func TestNodeCheck(t *testing.T) {
......@@ -7,3 +8,4 @@ func TestNodeCheck(t *testing.T) {
select{}
}
*/
package registry
import (
"bytes"
"encoding/gob"
"errors"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"reflect"
"strconv"
"sync"
"time"
)
//consul注册器
type consulRegistry struct {
masterClient *api.Client //主客户端
watchClients []*api.Client //监视器客户端
servicesWatchParse *watch.Plan //服务列表监视器
serviceWatchParseMap map[string]*watch.Plan //服务监视器映射
observerMap *sync.Map //观察者映射
serviceMap map[string]*Service //服务映射
serviceMapLock *sync.RWMutex //服务映射锁
}
//新建consul注册器
func newConsulRegistry() Registry {
return &consulRegistry{
masterClient: nil,
watchClients: make([]*api.Client, 0),
servicesWatchParse: nil,
serviceWatchParseMap: make(map[string]*watch.Plan),
observerMap: new(sync.Map),
serviceMap: make(map[string]*Service),
serviceMapLock: new(sync.RWMutex),
}
}
//内部初始化
func init() {
if _, ok := newRegistryMap["consul"]; !ok {
newRegistryMap["consul"] = newConsulRegistry
}
}
//初始化
//参数名 类型 格式 默认值
//cluster []string ip:port ["127.0.0.1:8500"]
//dc string string "dc1"
func (cr *consulRegistry) Init(options map[string]interface{}) error {
//参数过滤
cluster := []string{"127.0.0.1:8500"}
dc := "dc1"
if v, ok := options["cluster"]; ok {
if _, ok := v.([]string); ok {
cluster = v.([]string)
} else {
return errors.New("consul cluster []string")
}
}
if v, ok := options["dc"]; ok {
if _, ok := v.(string); ok {
dc = v.(string)
} else {
return errors.New("consul dc string")
}
}
//初始化参数
cr.masterClient = newConsulClient(cluster[0], dc, time.Millisecond*100)
for _, address := range cluster {
cr.watchClients = append(cr.watchClients, newConsulClient(address, dc, time.Second*300))
}
//服务初始化
parse, _ := watch.Parse(map[string]interface{}{"type": "services"})
parse.Handler = cr.handlerServices
cr.servicesWatchParse = parse
go cr.watch(cr.servicesWatchParse)
return nil
}
//新建consul客户端
func newConsulClient(address string, dc string, waitTime time.Duration) *api.Client {
config := api.DefaultConfig()
config.Address = address
config.Datacenter = dc
config.WaitTime = waitTime
client, _ := api.NewClient(config)
return client
}
//监控
func (cr *consulRegistry) watch(parse *watch.Plan) {
sentry := 0
defer func() {
if e := recover(); e != nil {
time.Sleep(time.Second * 1)
if sentry < len(cr.watchClients)-1 {
sentry++
} else {
sentry = 0
}
}
}()
_ = parse.RunWithClientAndLogger(cr.watchClients[sentry], nil)
}
//监控服务列表的变化
func (cr *consulRegistry) handlerServices(_ uint64, data interface{}) {
services := data.(map[string][]string)
//锁
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
for service, tags := range services {
//如果服务不存在,则创建服务、监听服务
if _, ok := cr.serviceWatchParseMap[service]; !ok {
parse, _ := watch.Parse(map[string]interface{}{"type": "service", "service": service})
parse.Handler = cr.handlerService
cr.serviceWatchParseMap[service] = parse
cr.serviceMap[service] = NewService(service)
go cr.watch(parse)
}
//更新标签
tagMap := make(map[string]struct{})
for _, tag := range tags {
tagMap[tag] = struct{}{}
}
if !reflect.DeepEqual(tagMap, cr.serviceMap[service].TagMap) {
cr.serviceMap[service].TagMap = tagMap
}
}
//处理被删除的服务
for service, parse := range cr.serviceWatchParseMap {
if _, ok := services[service]; !ok {
//删除服务
parse.Stop()
delete(cr.serviceWatchParseMap, service)
delete(cr.serviceMap, service)
//执行观察者
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DeleteService(service)
return true
})
}
}
}
//观察服务的变化
func (cr *consulRegistry) handlerService(_ uint64, data interface{}) {
services := data.([]*api.ServiceEntry)
cr.serviceMapLock.Lock()
defer cr.serviceMapLock.Unlock()
serviceName := services[0].Service.Service
nodeMap := make(map[string]*Node)
for _, service := range services {
if service.Service.ID == "consul" {
continue
}
//更新节点
node := NewNode(service.Service.Service, service.Service.ID, service.Service.Address, service.Service.Port)
node.Meta = service.Service.Meta
for _, health := range service.Checks {
if node.Id == health.ServiceID {
node.Status = health.Status
}
}
nodeMap[service.Service.ID] = node
}
cr.serviceMap[serviceName].NodeMap = nodeMap
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.UpdateNodes(cr.serviceMap[serviceName])
return true
})
}
//服务注册
func (cr *consulRegistry) Register(node *Node) error {
check := &api.AgentServiceCheck{
HTTP: "http://" + node.Address + ":" + strconv.Itoa(node.Port) + "/tech/health/check",
Interval: "5s",
Timeout: "10s",
// DeregisterCriticalServiceAfter: "24h",
DeregisterCriticalServiceAfter: "1m",
CheckID: node.Id,
}
registration := api.AgentServiceRegistration{
ID: node.Id,
Name: node.ServiceName,
Port: node.Port,
Address: node.Address,
Check: check,
Meta: node.Meta,
}
//可以重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceRegister(&registration); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.AddNode(node)
return true
})
return nil
}
}
return errors.New("service register fail " + node.Id)
}
//服务注销
func (cr *consulRegistry) Deregister(node *Node) error {
//可重试一次
for i := 0; i < 2; i++ {
if e := cr.masterClient.Agent().ServiceDeregister(node.Id); e == nil {
cr.observerMap.Range(func(_, value interface{}) bool {
observer := value.(Observer)
observer.DelNode(node)
return true
})
return nil
}
}
return errors.New("service deregister failed " + node.ServiceName + ":" + node.Id)
}
//获取服务映射
func (cr *consulRegistry) GetServiceMap() map[string]*Service {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
rtn := make(map[string]*Service)
buffer := bytes.NewBuffer(nil)
_ = gob.NewEncoder(buffer).Encode(&cr.serviceMap)
_ = gob.NewDecoder(bytes.NewBuffer(buffer.Bytes())).Decode(&rtn)
return rtn
}
//获取服务
func (cr *consulRegistry) GetService(serviceName string) (*Service, error) {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
//深拷贝
if v, ok := cr.serviceMap[serviceName]; ok {
rtn := NewService(serviceName)
for kk, vv := range v.NodeMap {
newNode := Node{
ServiceName:vv.ServiceName,
Id:vv.Id,
Port:vv.Port,
Address:vv.Address,
Status:vv.Status,
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
rtn.NodeMap[kk] = &newNode
}
return rtn, nil
} else {
return nil, errors.New("service not found")
}
}
//设置观察者
func (cr *consulRegistry) SetObserver(name string, observer Observer) error {
cr.serviceMapLock.RLock()
defer cr.serviceMapLock.RUnlock()
if _, ok := cr.observerMap.Load(name); ok {
return errors.New("observer is exists")
} else {
for _, service := range cr.serviceMap {
//深拷贝对象
newService := NewService(service.Name)
for kk, vv := range service.NodeMap {
newNode := Node{
ServiceName:vv.ServiceName,
Id:vv.Id,
Port:vv.Port,
Address:vv.Address,
Status:vv.Status,
}
for x, y := range vv.Meta {
newNode.Meta[x] = y
}
newService.NodeMap[kk] = &newNode
}
observer.UpdateNodes(newService)
}
cr.observerMap.Store(name, observer)
}
return nil
}
//删除观察
func (cr *consulRegistry) DelObserver(name string) error {
if _, ok := cr.observerMap.Load(name); ok {
cr.observerMap.Delete(name)
} else {
return errors.New("observer is not exists")
}
return nil
}
package registry
import (
"fmt"
"strconv"
"testing"
"time"
)
var r Registry = nil
func TestRefreshService(t *testing.T) {
TestNew(t)
if e := r.Init(map[string]interface{}{"dc": "3c", "cluster": []string{"172.30.12.4:8500"}}); e != nil {
t.Error(e)
}
time.Sleep(time.Second * 1)
smap := r.GetServiceMap()
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
fmt.Println(node)
}
}
time.Sleep(time.Second * 1)
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
_ = r.Deregister(node.Id)
// _ = r.Register(node)
}
}
time.Sleep(time.Second * 1)
r2 := New("consul")
if e := r2.Init(map[string]interface{}{"dc": "3c", "cluster": []string{"172.30.12.2:8500"}}); e != nil {
t.Error(e)
}
for _, service := range smap {
println("-------------|", service.Name, "|-------------")
for _, node := range service.NodeMap {
// _ = r.Deregister(node.Id)
_ = r2.Register(node)
}
}
}
func TestNew(t *testing.T) {
if r == nil {
r = New("consul")
}
if r == nil {
t.Error("consul new error")
}
}
func TestConsulRegistry_Init(t *testing.T) {
TestNew(t)
if e := r.Init(map[string]interface{}{"cluster": []string{"192.168.29.69:8500", "192.168.29.70:8500", "192.168.29.71:8500"}}); e != nil {
t.Error(e)
}
select {}
}
func TestConsulRegistry_Register(t *testing.T) {
TestNew(t)
go TestConsulRegistry_Init(t)
for i := 0; i < 10; i++ {
time.Sleep(time.Second * 1)
node := Node{
ServiceName: "name:" + strconv.Itoa(i/2),
Id: strconv.Itoa(i),
Address: "192.168.1.1",
Port: 123,
Meta: nil,
Status: "",
}
println("re:", i)
if e := r.Register(&node); e != nil {
t.Error(e)
}
}
}
func TestConsulRegistry_Deregister(t *testing.T) {
TestConsulRegistry_Register(t)
time.Sleep(time.Second * 5)
for i := 0; i < 10; i++ {
time.Sleep(time.Second * 1)
println("de:", i)
if e := r.Deregister(strconv.Itoa(i)); e != nil {
t.Error(e)
}
}
}
func TestConsulRegistry_GetServiceMap(t *testing.T) {
go TestConsulRegistry_Deregister(t)
for i := 0; i < 60; i++ {
time.Sleep(time.Second * 1)
s := r.GetServiceMap()
for _, v := range s {
fmt.Println("~~~~~~~~~~~~~~:", v.Name, v.TagMap, ":~~~~~~~~~~~~")
for _, n := range v.NodeMap {
fmt.Println(n)
}
}
}
}
func TestConsulRegistry_GetService(t *testing.T) {
TestConsulRegistry_Register(t)
if s, e := r.GetService("name:0"); e != nil {
t.Error(e)
} else {
fmt.Println(s.Name)
}
if _, e := r.GetService("xxx"); e == nil {
t.Error("GetService err")
}
}
type TObserver struct {
}
func (to *TObserver) DeleteService(serviceName string) {
fmt.Println(serviceName)
}
func (to *TObserver) UpdateNodes(_ *Service, node *Node) {
fmt.Println(node)
}
func TestConsulRegistry_SetObserver(t *testing.T) {
go TestConsulRegistry_Deregister(t)
time.Sleep(time.Second * 5)
_ = r.SetObserver("test", &TObserver{})
time.Sleep(time.Second * 5)
_ = r.DelObserver("test")
select {}
}
package registry
import "errors"
//节点
type Node struct {
ServiceName string //服务名称
Id string //节点编号
Address string //地址
Port int //端口
Meta map[string]string //元数据
Status string //状态
}
//服务
type Service struct {
Name string //服务名称
NodeMap map[string]*Node //节点映射
TagMap map[string]struct{} //标签
}
//观察者
type Observer interface {
//删除服务事件
DeleteService(serviceName string)
//更新节点事件
UpdateNodes(service *Service)
//添加节点
AddNode(node *Node)
//删除节点
DelNode(node *Node)
}
//注册器
type Registry interface {
//初始化
Init(options map[string]interface{}) error
//服务注册
Register(node *Node) error
//服务注销
Deregister(node *Node) error
//获取服务映射
GetServiceMap() map[string]*Service
//获取服务
GetService(serviceName string) (*Service, error)
//设置观察者
SetObserver(name string, observer Observer) error
//删除观察者
DelObserver(name string) error
}
var newRegistryMap = make(map[string]func() Registry, 0)
//新建注册器
func New(name string) Registry {
if newRegistry, ok := newRegistryMap[name]; ok {
return newRegistry()
} else {
return nil
}
}
//新建节点
func NewNode(serviceName string, id string, address string, port int) *Node {
return &Node{
ServiceName: serviceName,
Id: id,
Address: address,
Port: port,
Status: "unknown",
Meta: make(map[string]string, 0),
}
}
//新建服务
func NewService(name string) *Service {
return &Service{
Name: name,
NodeMap: make(map[string]*Node, 0),
TagMap: make(map[string]struct{}, 0),
}
}
//默认注册器
var defaultRegistry Registry = nil
//初始化
func Init(name string, options map[string]interface{}) error {
if defaultRegistry != nil {
return errors.New("default registry is exists")
}
defaultRegistry = New(name)
if defaultRegistry == nil {
return errors.New("new registry error")
}
return defaultRegistry.Init(options)
}
//服务注册
func Register(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Register(node)
}
//服务注销
func Deregister(node *Node) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.Deregister(node)
}
//获取服务映射
func GetServiceMap() map[string]*Service {
if defaultRegistry == nil {
return make(map[string]*Service, 0)
}
return defaultRegistry.GetServiceMap()
}
//获取服务
func GetService(serviceName string) (*Service, error) {
if defaultRegistry == nil {
return nil, errors.New("default registry is not exists")
}
return defaultRegistry.GetService(serviceName)
}
//设置观察者
func SetObserver(name string, observer Observer) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.SetObserver(name, observer)
}
//删除观察者
func DelObserver(name string) error {
if defaultRegistry == nil {
return errors.New("default registry is not exists")
}
return defaultRegistry.DelObserver(name)
}
package util
import (
"crypto/tls"
"gopkg.in/gomail.v2"
)
type HostInfo struct {
address string
port int
}
func SendEmail(title string, content string, receiver ... string) {
hostInfo := HostInfo{"mail.quantgroup.cn", 587}
m := gomail.NewMessage()
m.SetHeader("From", "program@quantgroup.cn")
//m.SetHeader("To", []string{receiver})
m.SetHeader("To", receiver...)
m.SetHeader("Subject", title)
m.SetBody("text/plain", content)
userName := "program@quantgroup.cn"
pwd := "Fuck147999!!!"
d := gomail.NewDialer(hostInfo.address, hostInfo.port, userName, pwd)
// 解决 x509: certificate signed by unknown authority
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
if err := d.DialAndSend(m); err != nil {
panic(err)
}
}
language: go
go:
- 1.8.x
- 1.9.x
- 1.10.x
os:
- linux
- osx
install:
- "wget http://sun.aei.polsl.pl/~sdeor/corpus/mr.bz2"
- "bzip2 -d mr.bz2"
script:
- "go build"
- "PAYLOAD=`pwd`/mr go test -v"
- "PAYLOAD=`pwd`/mr go test -bench ."
Simplified BSD License
Copyright (c) 2016, Datadog <info@datadoghq.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Zstd Go Wrapper
[C Zstd Homepage](https://github.com/Cyan4973/zstd)
The current headers and C files are from *v1.3.4* (Commit
[2555975](https://github.com/facebook/zstd/releases/tag/v1.3.4)).
## Usage
There are two main APIs:
* simple Compress/Decompress
* streaming API (io.Reader/io.Writer)
The compress/decompress APIs mirror that of lz4, while the streaming API was
designed to be a drop-in replacement for zlib.
### Simple `Compress/Decompress`
```go
// Compress compresses the byte array given in src and writes it to dst.
// If you already have a buffer allocated, you can pass it to prevent allocation
// If not, you can pass nil as dst.
// If the buffer is too small, it will be reallocated, resized, and returned bu the function
// If dst is nil, this will allocate the worst case size (CompressBound(src))
Compress(dst, src []byte) ([]byte, error)
```
```go
// CompressLevel is the same as Compress but you can pass another compression level
CompressLevel(dst, src []byte, level int) ([]byte, error)
```
```go
// Decompress will decompress your payload into dst.
// If you already have a buffer allocated, you can pass it to prevent allocation
// If not, you can pass nil as dst (allocates a 4*src size as default).
// If the buffer is too small, it will retry 3 times by doubling the dst size
// After max retries, it will switch to the slower stream API to be sure to be able
// to decompress. Currently switches if compression ratio > 4*2**3=32.
Decompress(dst, src []byte) ([]byte, error)
```
### Stream API
```go
// NewWriter creates a new object that can optionally be initialized with
// a precomputed dictionary. If dict is nil, compress without a dictionary.
// The dictionary array should not be changed during the use of this object.
// You MUST CALL Close() to write the last bytes of a zstd stream and free C objects.
NewWriter(w io.Writer) *Writer
NewWriterLevel(w io.Writer, level int) *Writer
NewWriterLevelDict(w io.Writer, level int, dict []byte) *Writer
// Write compresses the input data and write it to the underlying writer
(w *Writer) Write(p []byte) (int, error)
// Close flushes the buffer and frees C zstd objects
(w *Writer) Close() error
```
```go
// NewReader returns a new io.ReadCloser that will decompress data from the
// underlying reader. If a dictionary is provided to NewReaderDict, it must
// not be modified until Close is called. It is the caller's responsibility
// to call Close, which frees up C objects.
NewReader(r io.Reader) io.ReadCloser
NewReaderDict(r io.Reader, dict []byte) io.ReadCloser
```
### Benchmarks (benchmarked with v0.5.0)
The author of Zstd also wrote lz4. Zstd is intended to occupy a speed/ratio
level similar to what zlib currently provides. In our tests, the can always
be made to be better than zlib by chosing an appropriate level while still
keeping compression and decompression time faster than zlib.
You can run the benchmarks against your own payloads by using the Go benchmarks tool.
Just export your payload filepath as the `PAYLOAD` environment variable and run the benchmarks:
```go
go test -bench .
```
Compression of a 7Mb pdf zstd (this wrapper) vs [czlib](https://github.com/DataDog/czlib):
```
BenchmarkCompression 5 221056624 ns/op 67.34 MB/s
BenchmarkDecompression 100 18370416 ns/op 810.32 MB/s
BenchmarkFzlibCompress 2 610156603 ns/op 24.40 MB/s
BenchmarkFzlibDecompress 20 81195246 ns/op 183.33 MB/s
```
Ratio is also better by a margin of ~20%.
Compression speed is always better than zlib on all the payloads we tested;
However, [czlib](https://github.com/DataDog/czlib) has optimisations that make it
faster at decompressiong small payloads:
```
Testing with size: 11... czlib: 8.97 MB/s, zstd: 3.26 MB/s
Testing with size: 27... czlib: 23.3 MB/s, zstd: 8.22 MB/s
Testing with size: 62... czlib: 31.6 MB/s, zstd: 19.49 MB/s
Testing with size: 141... czlib: 74.54 MB/s, zstd: 42.55 MB/s
Testing with size: 323... czlib: 155.14 MB/s, zstd: 99.39 MB/s
Testing with size: 739... czlib: 235.9 MB/s, zstd: 216.45 MB/s
Testing with size: 1689... czlib: 116.45 MB/s, zstd: 345.64 MB/s
Testing with size: 3858... czlib: 176.39 MB/s, zstd: 617.56 MB/s
Testing with size: 8811... czlib: 254.11 MB/s, zstd: 824.34 MB/s
Testing with size: 20121... czlib: 197.43 MB/s, zstd: 1339.11 MB/s
Testing with size: 45951... czlib: 201.62 MB/s, zstd: 1951.57 MB/s
```
zstd starts to shine with payloads > 1KB
### Stability - Current state: STABLE
The C library seems to be pretty stable and according to the author has been tested and fuzzed.
For the Go wrapper, the test cover most usual cases and we have succesfully tested it on all staging and prod data.
BSD License
For Zstandard software
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
This diff is collapsed.
/*
* Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#ifndef ZSTD_COMPILER_H
#define ZSTD_COMPILER_H
/*-*******************************************************
* Compiler specifics
*********************************************************/
/* force inlining */
#if defined (__GNUC__) || defined(__cplusplus) || defined(__STDC_VERSION__) && __STDC_VERSION__ >= 199901L /* C99 */
# define INLINE_KEYWORD inline
#else
# define INLINE_KEYWORD
#endif
#if defined(__GNUC__)
# define FORCE_INLINE_ATTR __attribute__((always_inline))
#elif defined(_MSC_VER)
# define FORCE_INLINE_ATTR __forceinline
#else
# define FORCE_INLINE_ATTR
#endif
/**
* FORCE_INLINE_TEMPLATE is used to define C "templates", which take constant
* parameters. They must be inlined for the compiler to elimininate the constant
* branches.
*/
#define FORCE_INLINE_TEMPLATE static INLINE_KEYWORD FORCE_INLINE_ATTR
/**
* HINT_INLINE is used to help the compiler generate better code. It is *not*
* used for "templates", so it can be tweaked based on the compilers
* performance.
*
* gcc-4.8 and gcc-4.9 have been shown to benefit from leaving off the
* always_inline attribute.
*
* clang up to 5.0.0 (trunk) benefit tremendously from the always_inline
* attribute.
*/
#if !defined(__clang__) && defined(__GNUC__) && __GNUC__ >= 4 && __GNUC_MINOR__ >= 8 && __GNUC__ < 5
# define HINT_INLINE static INLINE_KEYWORD
#else
# define HINT_INLINE static INLINE_KEYWORD FORCE_INLINE_ATTR
#endif
/* force no inlining */
#ifdef _MSC_VER
# define FORCE_NOINLINE static __declspec(noinline)
#else
# ifdef __GNUC__
# define FORCE_NOINLINE static __attribute__((__noinline__))
# else
# define FORCE_NOINLINE static
# endif
#endif
/* target attribute */
#ifndef __has_attribute
#define __has_attribute(x) 0 /* Compatibility with non-clang compilers. */
#endif
#if defined(__GNUC__)
# define TARGET_ATTRIBUTE(target) __attribute__((__target__(target)))
#else
# define TARGET_ATTRIBUTE(target)
#endif
/* Enable runtime BMI2 dispatch based on the CPU.
* Enabled for clang & gcc >=4.8 on x86 when BMI2 isn't enabled by default.
*/
#ifndef DYNAMIC_BMI2
#if (defined(__clang__) && __has_attribute(__target__)) \
|| (defined(__GNUC__) \
&& (__GNUC__ >= 5 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8))) \
&& (defined(__x86_64__) || defined(_M_X86)) \
&& !defined(__BMI2__)
# define DYNAMIC_BMI2 1
#else
# define DYNAMIC_BMI2 0
#endif
#endif
/* prefetch */
#if defined(_MSC_VER) && (defined(_M_X64) || defined(_M_I86)) /* _mm_prefetch() is not defined outside of x86/x64 */
# include <mmintrin.h> /* https://msdn.microsoft.com/fr-fr/library/84szxsww(v=vs.90).aspx */
# define PREFETCH(ptr) _mm_prefetch((const char*)ptr, _MM_HINT_T0)
#elif defined(__GNUC__)
# define PREFETCH(ptr) __builtin_prefetch(ptr, 0, 0)
#else
# define PREFETCH(ptr) /* disabled */
#endif
/* disable warnings */
#ifdef _MSC_VER /* Visual Studio */
# include <intrin.h> /* For Visual 2005 */
# pragma warning(disable : 4100) /* disable: C4100: unreferenced formal parameter */
# pragma warning(disable : 4127) /* disable: C4127: conditional expression is constant */
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
# pragma warning(disable : 4214) /* disable: C4214: non-int bitfields */
# pragma warning(disable : 4324) /* disable: C4324: padded structure */
#endif
#endif /* ZSTD_COMPILER_H */
This diff is collapsed.
/*
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
#ifndef ZSTD_COMMON_CPU_H
#define ZSTD_COMMON_CPU_H
/**
* Implementation taken from folly/CpuId.h
* https://github.com/facebook/folly/blob/master/folly/CpuId.h
*/
#include <string.h>
#include "mem.h"
#ifdef _MSC_VER
#include <intrin.h>
#endif
typedef struct {
U32 f1c;
U32 f1d;
U32 f7b;
U32 f7c;
} ZSTD_cpuid_t;
MEM_STATIC ZSTD_cpuid_t ZSTD_cpuid(void) {
U32 f1c = 0;
U32 f1d = 0;
U32 f7b = 0;
U32 f7c = 0;
#ifdef _MSC_VER
int reg[4];
__cpuid((int*)reg, 0);
{
int const n = reg[0];
if (n >= 1) {
__cpuid((int*)reg, 1);
f1c = (U32)reg[2];
f1d = (U32)reg[3];
}
if (n >= 7) {
__cpuidex((int*)reg, 7, 0);
f7b = (U32)reg[1];
f7c = (U32)reg[2];
}
}
#elif defined(__i386__) && defined(__PIC__) && !defined(__clang__) && defined(__GNUC__)
/* The following block like the normal cpuid branch below, but gcc
* reserves ebx for use of its pic register so we must specially
* handle the save and restore to avoid clobbering the register
*/
U32 n;
__asm__(
"pushl %%ebx\n\t"
"cpuid\n\t"
"popl %%ebx\n\t"
: "=a"(n)
: "a"(0)
: "ecx", "edx");
if (n >= 1) {
U32 f1a;
__asm__(
"pushl %%ebx\n\t"
"cpuid\n\t"
"popl %%ebx\n\t"
: "=a"(f1a), "=c"(f1c), "=d"(f1d)
: "a"(1)
:);
}
if (n >= 7) {
__asm__(
"pushl %%ebx\n\t"
"cpuid\n\t"
"movl %%ebx, %%eax\n\r"
"popl %%ebx"
: "=a"(f7b), "=c"(f7c)
: "a"(7), "c"(0)
: "edx");
}
#elif defined(__x86_64__) || defined(_M_X64) || defined(__i386__)
U32 n;
__asm__("cpuid" : "=a"(n) : "a"(0) : "ebx", "ecx", "edx");
if (n >= 1) {
U32 f1a;
__asm__("cpuid" : "=a"(f1a), "=c"(f1c), "=d"(f1d) : "a"(1) : "ebx");
}
if (n >= 7) {
U32 f7a;
__asm__("cpuid"
: "=a"(f7a), "=b"(f7b), "=c"(f7c)
: "a"(7), "c"(0)
: "edx");
}
#endif
{
ZSTD_cpuid_t cpuid;
cpuid.f1c = f1c;
cpuid.f1d = f1d;
cpuid.f7b = f7b;
cpuid.f7c = f7c;
return cpuid;
}
}
#define X(name, r, bit) \
MEM_STATIC int ZSTD_cpuid_##name(ZSTD_cpuid_t const cpuid) { \
return ((cpuid.r) & (1U << bit)) != 0; \
}
/* cpuid(1): Processor Info and Feature Bits. */
#define C(name, bit) X(name, f1c, bit)
C(sse3, 0)
C(pclmuldq, 1)
C(dtes64, 2)
C(monitor, 3)
C(dscpl, 4)
C(vmx, 5)
C(smx, 6)
C(eist, 7)
C(tm2, 8)
C(ssse3, 9)
C(cnxtid, 10)
C(fma, 12)
C(cx16, 13)
C(xtpr, 14)
C(pdcm, 15)
C(pcid, 17)
C(dca, 18)
C(sse41, 19)
C(sse42, 20)
C(x2apic, 21)
C(movbe, 22)
C(popcnt, 23)
C(tscdeadline, 24)
C(aes, 25)
C(xsave, 26)
C(osxsave, 27)
C(avx, 28)
C(f16c, 29)
C(rdrand, 30)
#undef C
#define D(name, bit) X(name, f1d, bit)
D(fpu, 0)
D(vme, 1)
D(de, 2)
D(pse, 3)
D(tsc, 4)
D(msr, 5)
D(pae, 6)
D(mce, 7)
D(cx8, 8)
D(apic, 9)
D(sep, 11)
D(mtrr, 12)
D(pge, 13)
D(mca, 14)
D(cmov, 15)
D(pat, 16)
D(pse36, 17)
D(psn, 18)
D(clfsh, 19)
D(ds, 21)
D(acpi, 22)
D(mmx, 23)
D(fxsr, 24)
D(sse, 25)
D(sse2, 26)
D(ss, 27)
D(htt, 28)
D(tm, 29)
D(pbe, 31)
#undef D
/* cpuid(7): Extended Features. */
#define B(name, bit) X(name, f7b, bit)
B(bmi1, 3)
B(hle, 4)
B(avx2, 5)
B(smep, 7)
B(bmi2, 8)
B(erms, 9)
B(invpcid, 10)
B(rtm, 11)
B(mpx, 14)
B(avx512f, 16)
B(avx512dq, 17)
B(rdseed, 18)
B(adx, 19)
B(smap, 20)
B(avx512ifma, 21)
B(pcommit, 22)
B(clflushopt, 23)
B(clwb, 24)
B(avx512pf, 26)
B(avx512er, 27)
B(avx512cd, 28)
B(sha, 29)
B(avx512bw, 30)
B(avx512vl, 31)
#undef B
#define C(name, bit) X(name, f7c, bit)
C(prefetchwt1, 0)
C(avx512vbmi, 1)
#undef C
#undef X
#endif /* ZSTD_COMMON_CPU_H */
This diff is collapsed.
/*
* divsufsort.h for libdivsufsort-lite
* Copyright (c) 2003-2008 Yuta Mori All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
#ifndef _DIVSUFSORT_H
#define _DIVSUFSORT_H 1
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
/*- Prototypes -*/
/**
* Constructs the suffix array of a given string.
* @param T [0..n-1] The input string.
* @param SA [0..n-1] The output array of suffixes.
* @param n The length of the given string.
* @param openMP enables OpenMP optimization.
* @return 0 if no error occurred, -1 or -2 otherwise.
*/
int
divsufsort(const unsigned char *T, int *SA, int n, int openMP);
/**
* Constructs the burrows-wheeler transformed string of a given string.
* @param T [0..n-1] The input string.
* @param U [0..n-1] The output string. (can be T)
* @param A [0..n-1] The temporary array. (can be NULL)
* @param n The length of the given string.
* @param num_indexes The length of secondary indexes array. (can be NULL)
* @param indexes The secondary indexes array. (can be NULL)
* @param openMP enables OpenMP optimization.
* @return The primary index if no error occurred, -1 or -2 otherwise.
*/
int
divbwt(const unsigned char *T, unsigned char *U, int *A, int n, unsigned char * num_indexes, int * indexes, int openMP);
#ifdef __cplusplus
} /* extern "C" */
#endif /* __cplusplus */
#endif /* _DIVSUFSORT_H */
/*
Common functions of New Generation Entropy library
Copyright (C) 2016, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- FSE+HUF source repository : https://github.com/Cyan4973/FiniteStateEntropy
- Public forum : https://groups.google.com/forum/#!forum/lz4c
*************************************************************************** */
/* *************************************
* Dependencies
***************************************/
#include "mem.h"
#include "error_private.h" /* ERR_*, ERROR */
#define FSE_STATIC_LINKING_ONLY /* FSE_MIN_TABLELOG */
#include "fse.h"
#define HUF_STATIC_LINKING_ONLY /* HUF_TABLELOG_ABSOLUTEMAX */
#include "huf.h"
/*=== Version ===*/
unsigned FSE_versionNumber(void) { return FSE_VERSION_NUMBER; }
/*=== Error Management ===*/
unsigned FSE_isError(size_t code) { return ERR_isError(code); }
const char* FSE_getErrorName(size_t code) { return ERR_getErrorName(code); }
unsigned HUF_isError(size_t code) { return ERR_isError(code); }
const char* HUF_getErrorName(size_t code) { return ERR_getErrorName(code); }
/*-**************************************************************
* FSE NCount encoding-decoding
****************************************************************/
size_t FSE_readNCount (short* normalizedCounter, unsigned* maxSVPtr, unsigned* tableLogPtr,
const void* headerBuffer, size_t hbSize)
{
const BYTE* const istart = (const BYTE*) headerBuffer;
const BYTE* const iend = istart + hbSize;
const BYTE* ip = istart;
int nbBits;
int remaining;
int threshold;
U32 bitStream;
int bitCount;
unsigned charnum = 0;
int previous0 = 0;
if (hbSize < 4) return ERROR(srcSize_wrong);
bitStream = MEM_readLE32(ip);
nbBits = (bitStream & 0xF) + FSE_MIN_TABLELOG; /* extract tableLog */
if (nbBits > FSE_TABLELOG_ABSOLUTE_MAX) return ERROR(tableLog_tooLarge);
bitStream >>= 4;
bitCount = 4;
*tableLogPtr = nbBits;
remaining = (1<<nbBits)+1;
threshold = 1<<nbBits;
nbBits++;
while ((remaining>1) & (charnum<=*maxSVPtr)) {
if (previous0) {
unsigned n0 = charnum;
while ((bitStream & 0xFFFF) == 0xFFFF) {
n0 += 24;
if (ip < iend-5) {
ip += 2;
bitStream = MEM_readLE32(ip) >> bitCount;
} else {
bitStream >>= 16;
bitCount += 16;
} }
while ((bitStream & 3) == 3) {
n0 += 3;
bitStream >>= 2;
bitCount += 2;
}
n0 += bitStream & 3;
bitCount += 2;
if (n0 > *maxSVPtr) return ERROR(maxSymbolValue_tooSmall);
while (charnum < n0) normalizedCounter[charnum++] = 0;
if ((ip <= iend-7) || (ip + (bitCount>>3) <= iend-4)) {
ip += bitCount>>3;
bitCount &= 7;
bitStream = MEM_readLE32(ip) >> bitCount;
} else {
bitStream >>= 2;
} }
{ int const max = (2*threshold-1) - remaining;
int count;
if ((bitStream & (threshold-1)) < (U32)max) {
count = bitStream & (threshold-1);
bitCount += nbBits-1;
} else {
count = bitStream & (2*threshold-1);
if (count >= threshold) count -= max;
bitCount += nbBits;
}
count--; /* extra accuracy */
remaining -= count < 0 ? -count : count; /* -1 means +1 */
normalizedCounter[charnum++] = (short)count;
previous0 = !count;
while (remaining < threshold) {
nbBits--;
threshold >>= 1;
}
if ((ip <= iend-7) || (ip + (bitCount>>3) <= iend-4)) {
ip += bitCount>>3;
bitCount &= 7;
} else {
bitCount -= (int)(8 * (iend - 4 - ip));
ip = iend - 4;
}
bitStream = MEM_readLE32(ip) >> (bitCount & 31);
} } /* while ((remaining>1) & (charnum<=*maxSVPtr)) */
if (remaining != 1) return ERROR(corruption_detected);
if (bitCount > 32) return ERROR(corruption_detected);
*maxSVPtr = charnum-1;
ip += (bitCount+7)>>3;
return ip-istart;
}
/*! HUF_readStats() :
Read compact Huffman tree, saved by HUF_writeCTable().
`huffWeight` is destination buffer.
`rankStats` is assumed to be a table of at least HUF_TABLELOG_MAX U32.
@return : size read from `src` , or an error Code .
Note : Needed by HUF_readCTable() and HUF_readDTableX?() .
*/
size_t HUF_readStats(BYTE* huffWeight, size_t hwSize, U32* rankStats,
U32* nbSymbolsPtr, U32* tableLogPtr,
const void* src, size_t srcSize)
{
U32 weightTotal;
const BYTE* ip = (const BYTE*) src;
size_t iSize;
size_t oSize;
if (!srcSize) return ERROR(srcSize_wrong);
iSize = ip[0];
/* memset(huffWeight, 0, hwSize); *//* is not necessary, even though some analyzer complain ... */
if (iSize >= 128) { /* special header */
oSize = iSize - 127;
iSize = ((oSize+1)/2);
if (iSize+1 > srcSize) return ERROR(srcSize_wrong);
if (oSize >= hwSize) return ERROR(corruption_detected);
ip += 1;
{ U32 n;
for (n=0; n<oSize; n+=2) {
huffWeight[n] = ip[n/2] >> 4;
huffWeight[n+1] = ip[n/2] & 15;
} } }
else { /* header compressed with FSE (normal case) */
FSE_DTable fseWorkspace[FSE_DTABLE_SIZE_U32(6)]; /* 6 is max possible tableLog for HUF header (maybe even 5, to be tested) */
if (iSize+1 > srcSize) return ERROR(srcSize_wrong);
oSize = FSE_decompress_wksp(huffWeight, hwSize-1, ip+1, iSize, fseWorkspace, 6); /* max (hwSize-1) values decoded, as last one is implied */
if (FSE_isError(oSize)) return oSize;
}
/* collect weight stats */
memset(rankStats, 0, (HUF_TABLELOG_MAX + 1) * sizeof(U32));
weightTotal = 0;
{ U32 n; for (n=0; n<oSize; n++) {
if (huffWeight[n] >= HUF_TABLELOG_MAX) return ERROR(corruption_detected);
rankStats[huffWeight[n]]++;
weightTotal += (1 << huffWeight[n]) >> 1;
} }
if (weightTotal == 0) return ERROR(corruption_detected);
/* get last non-null symbol weight (implied, total must be 2^n) */
{ U32 const tableLog = BIT_highbit32(weightTotal) + 1;
if (tableLog > HUF_TABLELOG_MAX) return ERROR(corruption_detected);
*tableLogPtr = tableLog;
/* determine last weight */
{ U32 const total = 1 << tableLog;
U32 const rest = total - weightTotal;
U32 const verif = 1 << BIT_highbit32(rest);
U32 const lastWeight = BIT_highbit32(rest) + 1;
if (verif != rest) return ERROR(corruption_detected); /* last value must be a clean power of 2 */
huffWeight[oSize] = (BYTE)lastWeight;
rankStats[lastWeight]++;
} }
/* check tree construction validity */
if ((rankStats[1] < 2) || (rankStats[1] & 1)) return ERROR(corruption_detected); /* by construction : at least 2 elts of rank 1, must be even */
/* results */
*nbSymbolsPtr = (U32)(oSize+1);
return iSize+1;
}
/*
* Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
/* The purpose of this file is to have a single list of error strings embedded in binary */
#include "error_private.h"
const char* ERR_getErrorString(ERR_enum code)
{
static const char* const notErrorCode = "Unspecified error code";
switch( code )
{
case PREFIX(no_error): return "No error detected";
case PREFIX(GENERIC): return "Error (generic)";
case PREFIX(prefix_unknown): return "Unknown frame descriptor";
case PREFIX(version_unsupported): return "Version not supported";
case PREFIX(frameParameter_unsupported): return "Unsupported frame parameter";
case PREFIX(frameParameter_windowTooLarge): return "Frame requires too much memory for decoding";
case PREFIX(corruption_detected): return "Corrupted block detected";
case PREFIX(checksum_wrong): return "Restored data doesn't match checksum";
case PREFIX(parameter_unsupported): return "Unsupported parameter";
case PREFIX(parameter_outOfBound): return "Parameter is out of bound";
case PREFIX(init_missing): return "Context should be init first";
case PREFIX(memory_allocation): return "Allocation error : not enough memory";
case PREFIX(workSpace_tooSmall): return "workSpace buffer is not large enough";
case PREFIX(stage_wrong): return "Operation not authorized at current processing stage";
case PREFIX(tableLog_tooLarge): return "tableLog requires too much memory : unsupported";
case PREFIX(maxSymbolValue_tooLarge): return "Unsupported max Symbol Value : too large";
case PREFIX(maxSymbolValue_tooSmall): return "Specified maxSymbolValue is too small";
case PREFIX(dictionary_corrupted): return "Dictionary is corrupted";
case PREFIX(dictionary_wrong): return "Dictionary mismatch";
case PREFIX(dictionaryCreation_failed): return "Cannot create Dictionary from provided samples";
case PREFIX(dstSize_tooSmall): return "Destination buffer is too small";
case PREFIX(srcSize_wrong): return "Src size is incorrect";
/* following error codes are not stable and may be removed or changed in a future version */
case PREFIX(frameIndex_tooLarge): return "Frame index is too large";
case PREFIX(seekableIO): return "An I/O error occurred when reading/seeking";
case PREFIX(maxCode):
default: return notErrorCode;
}
}
/*
* Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
/* Note : this module is expected to remain private, do not expose it */
#ifndef ERROR_H_MODULE
#define ERROR_H_MODULE
#if defined (__cplusplus)
extern "C" {
#endif
/* ****************************************
* Dependencies
******************************************/
#include <stddef.h> /* size_t */
#include "zstd_errors.h" /* enum list */
/* ****************************************
* Compiler-specific
******************************************/
#if defined(__GNUC__)
# define ERR_STATIC static __attribute__((unused))
#elif defined (__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
# define ERR_STATIC static inline
#elif defined(_MSC_VER)
# define ERR_STATIC static __inline
#else
# define ERR_STATIC static /* this version may generate warnings for unused static functions; disable the relevant warning */
#endif
/*-****************************************
* Customization (error_public.h)
******************************************/
typedef ZSTD_ErrorCode ERR_enum;
#define PREFIX(name) ZSTD_error_##name
/*-****************************************
* Error codes handling
******************************************/
#undef ERROR /* reported already defined on VS 2015 (Rich Geldreich) */
#define ERROR(name) ZSTD_ERROR(name)
#define ZSTD_ERROR(name) ((size_t)-PREFIX(name))
ERR_STATIC unsigned ERR_isError(size_t code) { return (code > ERROR(maxCode)); }
ERR_STATIC ERR_enum ERR_getErrorCode(size_t code) { if (!ERR_isError(code)) return (ERR_enum)0; return (ERR_enum) (0-code); }
/*-****************************************
* Error Strings
******************************************/
const char* ERR_getErrorString(ERR_enum code); /* error_private.c */
ERR_STATIC const char* ERR_getErrorName(size_t code)
{
return ERR_getErrorString(ERR_getErrorCode(code));
}
#if defined (__cplusplus)
}
#endif
#endif /* ERROR_H_MODULE */
package zstd
/*
#define ZSTD_STATIC_LINKING_ONLY
#include "zstd.h"
*/
import "C"
// ErrorCode is an error returned by the zstd library.
type ErrorCode int
// Error returns the error string given by zstd
func (e ErrorCode) Error() string {
return C.GoString(C.ZSTD_getErrorName(C.size_t(e)))
}
func cIsError(code int) bool {
return int(C.ZSTD_isError(C.size_t(code))) != 0
}
// getError returns an error for the return code, or nil if it's not an error
func getError(code int) error {
if code < 0 && cIsError(code) {
return ErrorCode(code)
}
return nil
}
// IsDstSizeTooSmallError returns whether the error correspond to zstd standard sDstSizeTooSmall error
func IsDstSizeTooSmallError(e error) bool {
if e != nil && e.Error() == "Destination buffer is too small" {
return true
}
return false
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/*
* Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
* You may select, at your option, one of the above-listed licenses.
*/
/* ====== Dependencies ======= */
#include <stddef.h> /* size_t */
#include "pool.h"
#include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */
/* ====== Compiler specifics ====== */
#if defined(_MSC_VER)
# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
#endif
#ifdef ZSTD_MULTITHREAD
#include "threading.h" /* pthread adaptation */
/* A job is a function and an opaque argument */
typedef struct POOL_job_s {
POOL_function function;
void *opaque;
} POOL_job;
struct POOL_ctx_s {
ZSTD_customMem customMem;
/* Keep track of the threads */
ZSTD_pthread_t *threads;
size_t numThreads;
/* The queue is a circular buffer */
POOL_job *queue;
size_t queueHead;
size_t queueTail;
size_t queueSize;
/* The number of threads working on jobs */
size_t numThreadsBusy;
/* Indicates if the queue is empty */
int queueEmpty;
/* The mutex protects the queue */
ZSTD_pthread_mutex_t queueMutex;
/* Condition variable for pushers to wait on when the queue is full */
ZSTD_pthread_cond_t queuePushCond;
/* Condition variables for poppers to wait on when the queue is empty */
ZSTD_pthread_cond_t queuePopCond;
/* Indicates if the queue is shutting down */
int shutdown;
};
/* POOL_thread() :
Work thread for the thread pool.
Waits for jobs and executes them.
@returns : NULL on failure else non-null.
*/
static void* POOL_thread(void* opaque) {
POOL_ctx* const ctx = (POOL_ctx*)opaque;
if (!ctx) { return NULL; }
for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueEmpty && !ctx->shutdown) {
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
}
/* empty => shutting down: so stop */
if (ctx->queueEmpty) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return opaque;
}
/* Pop a job off the queue */
{ POOL_job const job = ctx->queue[ctx->queueHead];
ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
ctx->numThreadsBusy++;
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
job.function(job.opaque);
/* If the intended queue size was 0, signal after finishing job */
if (ctx->queueSize == 1) {
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
} }
} /* for (;;) */
/* Unreachable */
}
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
POOL_ctx* ctx;
/* Check the parameters */
if (!numThreads) { return NULL; }
/* Allocate the context and zero initialize */
ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
if (!ctx) { return NULL; }
/* Initialize the job queue.
* It needs one extra space since one space is wasted to differentiate empty
* and full queues.
*/
ctx->queueSize = queueSize + 1;
ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
ctx->queueHead = 0;
ctx->queueTail = 0;
ctx->numThreadsBusy = 0;
ctx->queueEmpty = 1;
(void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
(void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
(void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
ctx->shutdown = 0;
/* Allocate space for the thread handles */
ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
ctx->numThreads = 0;
ctx->customMem = customMem;
/* Check for errors */
if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
/* Initialize the threads */
{ size_t i;
for (i = 0; i < numThreads; ++i) {
if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
ctx->numThreads = i;
POOL_free(ctx);
return NULL;
} }
ctx->numThreads = numThreads;
}
return ctx;
}
/*! POOL_join() :
Shutdown the queue, wake any sleeping threads, and join all of the threads.
*/
static void POOL_join(POOL_ctx* ctx) {
/* Shut down the queue */
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
ctx->shutdown = 1;
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
/* Wake up sleeping threads */
ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
/* Join all of the threads */
{ size_t i;
for (i = 0; i < ctx->numThreads; ++i) {
ZSTD_pthread_join(ctx->threads[i], NULL);
} }
}
void POOL_free(POOL_ctx *ctx) {
if (!ctx) { return; }
POOL_join(ctx);
ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
ZSTD_free(ctx->queue, ctx->customMem);
ZSTD_free(ctx->threads, ctx->customMem);
ZSTD_free(ctx, ctx->customMem);
}
size_t POOL_sizeof(POOL_ctx *ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
return sizeof(*ctx)
+ ctx->queueSize * sizeof(POOL_job)
+ ctx->numThreads * sizeof(ZSTD_pthread_t);
}
/**
* Returns 1 if the queue is full and 0 otherwise.
*
* If the queueSize is 1 (the pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free and no job is waiting.
*/
static int isQueueFull(POOL_ctx const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
return ctx->numThreadsBusy == ctx->numThreads ||
!ctx->queueEmpty;
}
}
static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
{
POOL_job const job = {function, opaque};
assert(ctx != NULL);
if (ctx->shutdown) return;
ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job;
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
ZSTD_pthread_cond_signal(&ctx->queuePopCond);
}
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
{
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
/* Wait until there is space in the queue for the new job */
while (isQueueFull(ctx) && (!ctx->shutdown)) {
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
}
POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
}
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
{
assert(ctx != NULL);
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
if (isQueueFull(ctx)) {
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 0;
}
POOL_add_internal(ctx, function, opaque);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return 1;
}
#else /* ZSTD_MULTITHREAD not defined */
/* ========================== */
/* No multi-threading support */
/* ========================== */
/* We don't need any data, but if it is empty, malloc() might return NULL. */
struct POOL_ctx_s {
int dummy;
};
static POOL_ctx g_ctx;
POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
}
POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
(void)numThreads;
(void)queueSize;
(void)customMem;
return &g_ctx;
}
void POOL_free(POOL_ctx* ctx) {
assert(!ctx || ctx == &g_ctx);
(void)ctx;
}
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
}
int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);
return 1;
}
size_t POOL_sizeof(POOL_ctx* ctx) {
if (ctx==NULL) return 0; /* supports sizeof NULL */
assert(ctx == &g_ctx);
return sizeof(*ctx);
}
#endif /* ZSTD_MULTITHREAD */
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment