From 58919f35f5b42d7ca51e9183646f897ecbc77cb6 Mon Sep 17 00:00:00 2001 From: wangqiang Date: Thu, 23 Aug 2018 15:46:19 +0800 Subject: [PATCH] =?UTF-8?q?logkit=20update=20export=20=E5=AF=BC=E5=87=BA?= =?UTF-8?q?=E5=88=B0kodo=E7=9A=84=E5=88=86=E7=89=87=E7=AD=96=E7=95=A5?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E8=AE=BE=E7=BD=AE=E6=97=B6=E9=97=B4=E5=92=8C?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sender/pandora/pandora.go | 7 +- .../qiniu/pandora-go-sdk/pipeline/api.go | 132 ++++++++++++++++-- .../qiniu/pandora-go-sdk/pipeline/models.go | 55 +++++++- .../pandora-go-sdk/pipeline/schemafree.go | 15 +- vendor/vendor.json | 38 ++--- 5 files changed, 209 insertions(+), 38 deletions(-) diff --git a/sender/pandora/pandora.go b/sender/pandora/pandora.go index f13a31ec7..82457cea9 100644 --- a/sender/pandora/pandora.go +++ b/sender/pandora/pandora.go @@ -199,7 +199,7 @@ func NewSender(conf conf.MapConf) (pandoraSender sender.Sender, err error) { prefix, _ := conf.GetStringOr(sender.KeyPandoraKodoFilePrefix, "logkitauto/date=$(year)-$(mon)-$(day)/hour=$(hour)/min=$(min)/$(sec)") compress, _ := conf.GetBoolOr(sender.KeyPandoraKodoGzip, false) kodoRotateStrategy, _ := conf.GetStringOr(sender.KeyPandoraKodoRotateStrategy, "interval") - kodoRotateSize, _ := conf.GetIntOr(sender.KeyPandoraKodoRotateSize, 500*1024) + kodoRotateSize, _ := conf.GetIntOr(sender.KeyPandoraKodoRotateSize, pipeline.DefaultLogkitRotateSize) kodoRotateSize = kodoRotateSize * 1024 kodoRotateInterval, _ := conf.GetIntOr(sender.KeyPandoraKodoRotateInterval, 10*60) kodoFileRetention, _ := conf.GetIntOr(sender.KeyPandoraKodoFileRetention, 0) @@ -522,6 +522,11 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) { Format: s.opt.format, Compress: s.opt.kodoCompress, AutoExportKodoTokens: s.opt.tokens.KodoTokens, + RotateStrategy: s.opt.kodoRotateStrategy, + RotateSize: s.opt.kodoRotateSize, + RotateInterval: s.opt.kodoRotateInterval, + RotateSizeType: "B", + RotateNumber: s.opt.kodoRotateSize, }, ToTSDB: s.opt.enableTsdb, AutoExportToTSDBInput: pipeline.AutoExportToTSDBInput{ diff --git a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/api.go b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/api.go index ca142c2c4..9257e36f6 100644 --- a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/api.go +++ b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/api.go @@ -21,8 +21,12 @@ import ( ) const ( - systemVariableType = "system" - userVariableType = "user" + systemVariableType = "system" + userVariableType = "user" + defaultServerRotateInterval = 3600 + defaultServerRotateSize = 128 * 1024 * 1024 + DefaultLogkitRotateSize = 512000 * 1024 + DefaultLogkitRotateInterval = 600 ) func (c *Pipeline) CreateGroup(input *CreateGroupInput) (err error) { @@ -382,14 +386,19 @@ func (c *Pipeline) UpdateRepoWithKodo(input *UpdateRepoInput, ex ExportDesc) err } spec := &ExportKodoSpec{ - Bucket: bucketName, - Fields: newfields, - AccessKey: ak, - Retention: int(retention), - Compress: compress, - Email: email, - Format: format, - KeyPrefix: keyPrefix, + Bucket: bucketName, + Fields: newfields, + AccessKey: ak, + Retention: int(retention), + Compress: compress, + Email: email, + Format: format, + KeyPrefix: keyPrefix, + RotateInterval: input.Option.RotateInterval, + RotateSize: input.Option.RotateSize, + RotateStrategy: input.Option.RotateStrategy, + RotateNumber: input.Option.RotateNumber, + RotateSizeType: input.Option.RotateSizeType, } err := c.UpdateExport(&UpdateExportInput{ RepoName: input.RepoName, @@ -443,6 +452,109 @@ func (c *Pipeline) UpdateRepo(input *UpdateRepoInput) (err error) { log.Error("updateRepo list exports error", err) return } + for _, export := range exports.Exports { + if export.Type == "kodo" { + if val, ok := export.Spec["rotateStrategy"]; ok { + if rotateStrategy, ok := val.(string); ok { + input.Option.RotateStrategy = rotateStrategy + } + } + syncServer := false + if val, ok := export.Spec["rotateInterval"]; ok { + rotateInterval := 0 + switch val.(type) { + case int64: + rotateInterval = int(val.(int64)) + case int32: + rotateInterval = int(val.(int32)) + case int: + rotateInterval = val.(int) + case float64: + rotateInterval = int(val.(float64)) + case float32: + rotateInterval = int(val.(float32)) + } + // 服务端为 logkit默认值,且logkit本次设置的值input.Option.RotateInterval不为默认值,则用input.Option.RotateInterval更新服务端的值 + if (rotateInterval == DefaultLogkitRotateInterval && input.Option.RotateInterval != DefaultLogkitRotateInterval) { + // 本次 input.Option.RotateInterval 0,需要设置为默认值 + if input.Option.RotateInterval == 0 { + input.Option.RotateInterval = DefaultLogkitRotateInterval + syncServer = true + } + + } else { + // 服务端不为0和默认值,则使用服务端的值 + if (rotateInterval != defaultServerRotateInterval && rotateInterval != 0) { + input.Option.RotateInterval = rotateInterval + syncServer = true + } else { + // logkit本次设置的为0,则设置为default值 + if input.Option.RotateInterval == 0 { + input.Option.RotateInterval = defaultServerRotateInterval + syncServer = true + } + } + } + } + + if val, ok := export.Spec["rotateSize"]; ok { + rotateSize := 0 + switch val.(type) { + case int64: + rotateSize = int(val.(int64)) + case int32: + rotateSize = int(val.(int32)) + case int: + rotateSize = val.(int) + case float64: + rotateSize = int(val.(float64)) + case float32: + rotateSize = int(val.(float32)) + } + // 服务端为 logkit默认值,且logkit本次设置的值input.Option.RotateSize不为默认值,则用input.Option.RotateSize更新服务端的值 + if (rotateSize == DefaultLogkitRotateSize && input.Option.RotateSize != DefaultLogkitRotateSize) { + // 本次 input.Option.RotateSize 0,需要设置为默认值 + if input.Option.RotateSize == 0 { + input.Option.RotateSize = DefaultLogkitRotateSize + syncServer = true + } + + } else { + // 服务端不为0和默认值,则使用服务端的值 + if (rotateSize != defaultServerRotateSize && rotateSize != 0) { + input.Option.RotateSize = rotateSize + syncServer = true + } else { + // logkit本次设置的为0,则设置为default值 + if input.Option.RotateSize == 0 { + input.Option.RotateSize = defaultServerRotateSize + syncServer = true + } + } + } + } + + if syncServer { + if val, ok := export.Spec["rotateNumber"]; ok { + switch val.(type) { + case int64: + input.Option.RotateNumber = int(val.(int64)) + case float64: + input.Option.RotateNumber = int(val.(float64)) + } + } + if val, ok := export.Spec["rotateSizeType"]; ok { + if rotateSizeType, ok := val.(string); ok { + input.Option.RotateSizeType = rotateSizeType + } + } + } + if input.Option.RotateNumber == 0 { + input.Option.RotateNumber = input.Option.RotateSize + input.Option.RotateSizeType = "B" + } + } + } exs := make(map[string]ExportDesc) for _, ex := range exports.Exports { exs[ex.Name] = ex diff --git a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/models.go b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/models.go index e505c800f..fbc317433 100644 --- a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/models.go +++ b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/models.go @@ -49,6 +49,7 @@ const ( PandoraTypeArray = "array" PandoraTypeMap = "map" PandoraTypeJsonString = "jsonstring" + PandoraTypeIP = "ip" ) const ( @@ -72,6 +73,7 @@ var schemaTypes = map[string]bool{ PandoraTypeMap: true, PandoraTypeBool: true, PandoraTypeJsonString: true, + PandoraTypeIP: true, } func validateGroupName(g string) error { @@ -334,7 +336,7 @@ func (e *RepoSchemaEntry) Validate() (err error) { } if !schemaTypes[e.ValueType] { - err = reqerr.NewInvalidArgs("Schema", fmt.Sprintf("invalid field type: %s, field type should be one of \"float\", \"string\", \"date\", \"long\", \"boolean\", \"array\", \"map\" and \"jsonstring\"", e.ValueType)).WithComponent("pipleline") + err = reqerr.NewInvalidArgs("Schema", fmt.Sprintf("invalid field type: %s, field type should be one of \"float\", \"string\", \"date\", \"long\", \"boolean\", \"array\", \"map\", \"jsonstring\", and \"ip\"", e.ValueType)).WithComponent("pipleline") return } if e.ValueType == "array" { @@ -704,6 +706,8 @@ type AutoExportToKODOInput struct { RotateStrategy string RotateSize int RotateInterval int + RotateSizeType string + RotateNumber int Format string Email string Retention int //数字,单位为天 @@ -1355,11 +1359,52 @@ func (s *ExportMongoSpec) Validate() (err error) { return } +/* + LocateIPDetails struct is used to config ip locate for a specified ip field + + ShouldLoacteField: + should be set to false if do not want this ip field to be located into location info + + WantedFields: + WantedFields field is used to define the location wanted + Keys should be chosen from following strings: + ["wantCountry", "wantRegion", "wantCity", "wantIsp"] + + FieldNames: + FieldNames field is used to define the fieldname where the location info should be stored in the logdb + Keys should be chosen from following strings: + ["ipCountryFieldName", "ipRegionFieldName", "ipCityFieldName", "ipIspFieldName"] + + NOTE: If FieldNames is not provided or the fieldName is not valid, then default fieldnames will be used +*/ +type LocateIPDetails struct { + ShouldLocateField bool `json:"shouldLocateField"` + WantedFields map[string]bool `json:"wantedFields"` + FieldNames map[string]string `json:"fieldNames"` +} + +/* + LocateIPConfig struct is used to define the over all ip locating behavior + + ShouldLocateIP: + If this field is set to 'false', then no ip field will be located + + Mappings: + This mapping is used to define behavior for each ip field (if 'ShouldLcoateIP' is set as true) + Mappings field follows the following format: + {"ip_field_name_in_repo": locateIPConfig_for_the_field} +*/ +type LocateIPConfig struct { + ShouldLocateIP bool `json:"shouldLocateIP"` + Mappings map[string]*LocateIPDetails `json:"mappings"` +} + type ExportLogDBSpec struct { - DestRepoName string `json:"destRepoName"` - Doc map[string]interface{} `json:"doc"` - OmitInvalid bool `json:"omitInvalid"` - OmitEmpty bool `json:"omitEmpty"` + DestRepoName string `json:"destRepoName"` + Doc map[string]interface{} `json:"doc"` + OmitInvalid bool `json:"omitInvalid"` + OmitEmpty bool `json:"omitEmpty"` + LocateIPConfig *LocateIPConfig `json:"locateIPConfig"` } func (s *ExportLogDBSpec) Validate() (err error) { diff --git a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go index cef985860..ea0a652e7 100644 --- a/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go +++ b/vendor/github.com/qiniu/pandora-go-sdk/pipeline/schemafree.go @@ -3,6 +3,7 @@ package pipeline import ( "encoding/json" "fmt" + "net" "reflect" "sort" "strconv" @@ -10,6 +11,7 @@ import ( "time" "github.com/qiniu/log" + "github.com/qiniu/pandora-go-sdk/base" . "github.com/qiniu/pandora-go-sdk/base/models" "github.com/qiniu/pandora-go-sdk/base/reqerr" @@ -331,6 +333,8 @@ func dataConvert(data interface{}, schema RepoSchemaEntry) (converted interface{ return newdata, nil } } + case PandoraTypeIP: + return data, nil } return data, fmt.Errorf("can not convert data[%v] type(%v) to pandora type(%v), err %v", data, reflect.TypeOf(data), schema.ValueType, err) } @@ -425,7 +429,7 @@ func checkIgnore(value interface{}, schemeType string) bool { return false } switch schemeType { - case PandoraTypeJsonString, PandoraTypeArray, PandoraTypeMap, PandoraTypeLong, PandoraTypeFloat: + case PandoraTypeJsonString, PandoraTypeArray, PandoraTypeMap, PandoraTypeLong, PandoraTypeFloat, PandoraTypeIP: if str == "" { return true } @@ -976,6 +980,7 @@ PandoraTypeArray :全部支持 PandoraTypeMap :全部支持 */ // 该函数有两个作用,1. 获取 data 中所有字段的 schema; 2. 将 data 中值为 nil, 无法判断类型的键值对,从 data 中删掉 +// 当值为string的时候,如果数据满足date格式,则推断为PandoraTypeDate,如果数据满足IP格式,则推断为PandoraTypeIP func GetTrimedDataSchema(data Data) (valueType map[string]RepoSchemaEntry) { valueType = make(map[string]RepoSchemaEntry) for k, v := range data { @@ -1064,9 +1069,11 @@ func GetTrimedDataSchema(data Data) (valueType map[string]RepoSchemaEntry) { // 由于数据为空,且无法判断类型, 所以从数据中将该条键值对删掉 delete(data, k) case string: - _, err := time.Parse(time.RFC3339, nv) - if err == nil { + // 如果数据满足date格式,则推断为PandoraTypeDate,如果数据满足IP格式,则推断为PandoraTypeIP + if _, err := time.Parse(time.RFC3339, nv); err == nil { valueType[k] = formValueType(k, PandoraTypeDate) + } else if ipAddr := net.ParseIP(nv); ipAddr != nil { + valueType[k] = formValueType(k, PandoraTypeIP) } else { valueType[k] = formValueType(k, PandoraTypeString) } @@ -1112,6 +1119,8 @@ func getDefault(t RepoSchemaEntry) (result interface{}) { case PandoraTypeBool: result = make([]bool, 0) } + case PandoraTypeIP: + result = "" } return } diff --git a/vendor/vendor.json b/vendor/vendor.json index 6cefaf599..769d728cc 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -471,56 +471,56 @@ { "checksumSHA1": "0JEyusBC8nfE7dF2sKLylHTh93Y=", "path": "github.com/qiniu/pandora-go-sdk/base", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "7t2Z+kAE+w1wEigJqyPp4yjDWYA=", "path": "github.com/qiniu/pandora-go-sdk/base/config", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "k5JfcSQ5YsAFd6DSHpSeuNWbql8=", "path": "github.com/qiniu/pandora-go-sdk/base/models", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "lV2zb3SZ4BKaa1XpC4Q3ktbVgDo=", "path": "github.com/qiniu/pandora-go-sdk/base/ratelimit", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "sLeqqUJX9pa++wH3PtRSVbaCejs=", "path": "github.com/qiniu/pandora-go-sdk/base/reqerr", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "oEpRonb6KY/u9OWNOxSEjyyqqXk=", "path": "github.com/qiniu/pandora-go-sdk/base/request", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "Hoo04OF4fQYRDb6iBAP/gZ/Z7Q4=", "path": "github.com/qiniu/pandora-go-sdk/logdb", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { - "checksumSHA1": "SPg2Z+2s4MEnKGqOwzb7DMjTgeY=", + "checksumSHA1": "ASsp2TW/LvoyuZ0JeaIHXDs+WEc=", "path": "github.com/qiniu/pandora-go-sdk/pipeline", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "YeSUJIE3zLnpjK3g161wiMeIbmk=", "path": "github.com/qiniu/pandora-go-sdk/tsdb", - "revision": "ed18ebdedca86c819014ba4d94c663bb32fb8e5b", - "revisionTime": "2018-08-16T05:41:57Z" + "revision": "1b026c9cd89f8b1d7b54a32eecaef582f27c40ea", + "revisionTime": "2018-08-22T06:41:56Z" }, { "checksumSHA1": "KAzbLjI9MzW2tjfcAsK75lVRp6I=",