From 7517d221a8ef03685e124a49d3029d57addd6d58 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 12 Aug 2019 08:57:41 +0300 Subject: [PATCH 1/2] feat(influxdb): queue batches to the influxdb if it's slowing down Previously to this k6 will write to influxdb every second, but if that write took more than 1 second it won't start a second write but instead wait for it. This will generally lead to the write times getting bigger and bigger as more and more data is being written until the max body that influxdb will take is reached when it will return an error and k6 will drop that data. With this commit a configurable number of parallel writes (10 by default), starting again every 1 second (also now configurable). Additionally if we reach the 10 concurrent writes instead of sending all the data that accumulates we will just queue the samples that were generated. This should considerably help with no hitting the max body size of influxdb. I tested with a simple script, doing batch request for an empty local file with 40VUs. Without an output it was getting 8.1K RPS with 650MB of memory usage. Previous to this commit the usage of ram was ~5.7GB for 5736 rps and practically all the data gets lost if you don't up the max body and even than a lot of the data is lost while the memory usage goes up. After this commit the usage of ram was ~2.4GB(or less in some of the tests) with 6273 RPS and there was no lost of data. Even with this commit doing 2 hour of that simple script dies after 1hour and 35 minutes using around 15GB (the test system has 16). Can't be sure of lost of data, as influxdb eat 32GB of memory trying to visualize it and I had to kill it ;(. Some problems with this solution are that: 1. We use a lot of goroutines if things start slowing down - probably not a big problem, but still good idea to fix. 2. We can probably better batch stuff if we add/keep all the unsend samples together and cut them in let say 50k samples. 3. By far the biggest: because the writes are slow if the test is stopped (with Ctrl+C) or it finishes naturally, waiting for those writes can take considerable amount of time - in the above example the 4 minutes tests generally took around 5 minutes :( All of those can be better handled with some more sophisticated queueing code at later time. closes #1081, fixes #1100, fixes #182 --- stats/influxdb/collector.go | 37 ++++++---- stats/influxdb/collector_test.go | 118 +++++++++++++++++++++++++++++++ stats/influxdb/config.go | 44 +++++++++--- stats/kafka/collector_test.go | 1 + 4 files changed, 179 insertions(+), 21 deletions(-) create mode 100644 stats/influxdb/collector_test.go diff --git a/stats/influxdb/collector.go b/stats/influxdb/collector.go index e473507dc06..a6414c733f3 100644 --- a/stats/influxdb/collector.go +++ b/stats/influxdb/collector.go @@ -22,6 +22,7 @@ package influxdb import ( "context" + "errors" "sync" "time" @@ -32,10 +33,6 @@ import ( "github.com/loadimpact/k6/stats" ) -const ( - pushInterval = 1 * time.Second -) - // Verify that Collector implements lib.Collector var _ lib.Collector = &Collector{} @@ -44,8 +41,10 @@ type Collector struct { Config Config BatchConf client.BatchPointsConfig - buffer []stats.Sample - bufferLock sync.Mutex + buffer []stats.Sample + bufferLock sync.Mutex + wg sync.WaitGroup + semaphoreCh chan struct{} } func New(conf Config) (*Collector, error) { @@ -54,10 +53,14 @@ func New(conf Config) (*Collector, error) { return nil, err } batchConf := MakeBatchConfig(conf) + if conf.ConcurrentWrites.Int64 <= 0 { + return nil, errors.New("influxdb's ConcurrentWrites must be a positive number") + } return &Collector{ - Client: cl, - Config: conf, - BatchConf: batchConf, + Client: cl, + Config: conf, + BatchConf: batchConf, + semaphoreCh: make(chan struct{}, conf.ConcurrentWrites.Int64), }, nil } @@ -74,13 +77,16 @@ func (c *Collector) Init() error { func (c *Collector) Run(ctx context.Context) { logrus.Debug("InfluxDB: Running!") - ticker := time.NewTicker(pushInterval) + ticker := time.NewTicker(time.Duration(c.Config.PushInterval.Duration)) for { select { case <-ticker.C: - c.commit() + c.wg.Add(1) + go c.commit() case <-ctx.Done(): - c.commit() + c.wg.Add(1) + go c.commit() + c.wg.Wait() return } } @@ -99,11 +105,16 @@ func (c *Collector) Link() string { } func (c *Collector) commit() { + defer c.wg.Done() c.bufferLock.Lock() samples := c.buffer c.buffer = nil c.bufferLock.Unlock() - + // let first get the data and then wait our turn + c.semaphoreCh <- struct{}{} + defer func() { + <-c.semaphoreCh + }() logrus.Debug("InfluxDB: Committing...") batch, err := c.batchFromSamples(samples) diff --git a/stats/influxdb/collector_test.go b/stats/influxdb/collector_test.go new file mode 100644 index 00000000000..61e0b0edc98 --- /dev/null +++ b/stats/influxdb/collector_test.go @@ -0,0 +1,118 @@ +package influxdb + +import ( + "bytes" + "context" + "io" + "net" + "net/http" + "sync" + "testing" + "time" + + "github.com/loadimpact/k6/stats" + "github.com/stretchr/testify/require" + null "gopkg.in/guregu/null.v3" +) + +func TestBadConcurrentWrites(t *testing.T) { + c := NewConfig() + t.Run("0", func(t *testing.T) { + c.ConcurrentWrites = null.IntFrom(0) + _, err := New(*c) + require.Error(t, err) + require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") + }) + + t.Run("-2", func(t *testing.T) { + c.ConcurrentWrites = null.IntFrom(-2) + _, err := New(*c) + require.Error(t, err) + require.Equal(t, err.Error(), "influxdb's ConcurrentWrites must be a positive number") + }) + + t.Run("2", func(t *testing.T) { + c.ConcurrentWrites = null.IntFrom(2) + _, err := New(*c) + require.NoError(t, err) + }) +} + +func testCollectorCycle(t testing.TB, handler http.HandlerFunc, body func(testing.TB, *Collector)) { + s := &http.Server{ + Addr: ":", + Handler: handler, + MaxHeaderBytes: 1 << 20, + } + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer func() { + _ = l.Close() + }() + + defer func() { + require.NoError(t, s.Shutdown(context.Background())) + }() + + go func() { + require.Equal(t, http.ErrServerClosed, s.Serve(l)) + }() + + config := NewConfig() + config.Addr = null.StringFrom("http://" + l.Addr().String()) + c, err := New(*config) + require.NoError(t, err) + + require.NoError(t, c.Init()) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer cancel() + wg.Add(1) + go func() { + defer wg.Done() + c.Run(ctx) + }() + + body(t, c) + + cancel() + wg.Wait() +} +func TestCollector(t *testing.T) { + var samplesRead int + defer func() { + require.Equal(t, samplesRead, 20) + }() + testCollectorCycle(t, func(rw http.ResponseWriter, r *http.Request) { + var b = bytes.NewBuffer(nil) + _, _ = io.Copy(b, r.Body) + for { + s, err := b.ReadString('\n') + if len(s) > 0 { + samplesRead++ + } + if err != nil { + break + } + } + + rw.WriteHeader(204) + }, func(tb testing.TB, c *Collector) { + var samples = make(stats.Samples, 10) + for i := 0; i < len(samples); i++ { + samples[i] = stats.Sample{ + Metric: stats.New("testGauge", stats.Gauge), + Time: time.Now(), + Tags: stats.NewSampleTags(map[string]string{ + "something": "else", + "VU": "21", + "else": "something", + }), + Value: 2.0, + } + } + c.Collect([]stats.SampleContainer{samples}) + c.Collect([]stats.SampleContainer{samples}) + }) + +} diff --git a/stats/influxdb/config.go b/stats/influxdb/config.go index 52298b7290b..dc557165adb 100644 --- a/stats/influxdb/config.go +++ b/stats/influxdb/config.go @@ -24,6 +24,7 @@ import ( "net/url" "strconv" "strings" + "time" "github.com/kubernetes/helm/pkg/strvals" "github.com/loadimpact/k6/lib/types" @@ -34,11 +35,13 @@ import ( type Config struct { // Connection. - Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"` - Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"` - Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"` - Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"` - PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"` + Addr null.String `json:"addr" envconfig:"INFLUXDB_ADDR"` + Username null.String `json:"username,omitempty" envconfig:"INFLUXDB_USERNAME"` + Password null.String `json:"password,omitempty" envconfig:"INFLUXDB_PASSWORD"` + Insecure null.Bool `json:"insecure,omitempty" envconfig:"INFLUXDB_INSECURE"` + PayloadSize null.Int `json:"payloadSize,omitempty" envconfig:"INFLUXDB_PAYLOAD_SIZE"` + PushInterval types.NullDuration `json:"pushInterval,omitempty" envconfig:"INFLUXDB_PUSH_INTERVAL"` + ConcurrentWrites null.Int `json:"concurrentWrites,omitempty" envconfig:"INFLUXDB_CONCURRENT_WRITES"` // Samples. DB null.String `json:"db" envconfig:"INFLUXDB_DB"` @@ -50,9 +53,11 @@ type Config struct { func NewConfig() *Config { c := &Config{ - Addr: null.NewString("http://localhost:8086", false), - DB: null.NewString("k6", false), - TagsAsFields: []string{"vu", "iter", "url"}, + Addr: null.NewString("http://localhost:8086", false), + DB: null.NewString("k6", false), + TagsAsFields: []string{"vu", "iter", "url"}, + ConcurrentWrites: null.NewInt(10, false), + PushInterval: types.NewNullDuration(time.Second, false), } return c } @@ -88,6 +93,13 @@ func (c Config) Apply(cfg Config) Config { if len(cfg.TagsAsFields) > 0 { c.TagsAsFields = cfg.TagsAsFields } + if cfg.PushInterval.Valid { + c.PushInterval = cfg.PushInterval + } + + if cfg.ConcurrentWrites.Valid { + c.ConcurrentWrites = cfg.ConcurrentWrites + } return c } @@ -154,6 +166,9 @@ func ParseURL(text string) (Config, error) { case "payload_size": var size int size, err = strconv.Atoi(vs[0]) + if err != nil { + return c, err + } c.PayloadSize = null.IntFrom(int64(size)) case "precision": c.Precision = null.StringFrom(vs[0]) @@ -161,6 +176,19 @@ func ParseURL(text string) (Config, error) { c.Retention = null.StringFrom(vs[0]) case "consistency": c.Consistency = null.StringFrom(vs[0]) + + case "pushInterval": + err = c.PushInterval.UnmarshalText([]byte(vs[0])) + if err != nil { + return c, err + } + case "concurrentWrites": + var writes int + writes, err = strconv.Atoi(vs[0]) + if err != nil { + return c, err + } + c.ConcurrentWrites = null.IntFrom(int64(writes)) case "tagsAsFields": c.TagsAsFields = vs default: diff --git a/stats/kafka/collector_test.go b/stats/kafka/collector_test.go index 1c45c034036..0452857950c 100644 --- a/stats/kafka/collector_test.go +++ b/stats/kafka/collector_test.go @@ -61,6 +61,7 @@ func TestRun(t *testing.T) { func TestFormatSamples(t *testing.T) { c := Collector{} + c.Config.InfluxDBConfig.ConcurrentWrites = null.IntFrom(10) metric := stats.New("my_metric", stats.Gauge) samples := stats.Samples{ {Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}, From 04e1379ce62154d1b9de59d485cbc5c63d37a063 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Mon, 12 Aug 2019 14:15:48 +0300 Subject: [PATCH 2/2] chore(influxdb): update the import path of influxdb lib to the new one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This also upgrades it to the latest version and adds a benchmark which shows some less allocation: name old time/op new time/op delta Influxdb1Second-4 3.56ms ± 0% 3.56ms ± 1% ~ (p=0.841 n=5+5) Influxdb2Second-4 12.1ms ± 0% 12.1ms ± 0% ~ (p=0.095 n=5+5) Influxdb100Milliseconds-4 341µs ± 1% 333µs ± 1% -2.34% (p=0.008 n=5+5) name old alloc/op new alloc/op delta Influxdb1Second-4 19.1kB ± 0% 16.6kB ± 0% -13.37% (p=0.008 n=5+5) Influxdb2Second-4 18.8kB ± 0% 16.2kB ± 0% -13.63% (p=0.016 n=5+4) Influxdb100Milliseconds-4 18.7kB ± 0% 16.1kB ± 0% -13.72% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Influxdb1Second-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) Influxdb2Second-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) Influxdb100Milliseconds-4 291 ± 0% 231 ± 0% -20.62% (p=0.008 n=5+5) The same test as in fce3884b gets 6909 RPS with 2.5GB of memory usage so a considerate bump in rps for small amount of memory. --- Gopkg.lock | 12 +- stats/influxdb/bench_test.go | 59 +++ stats/influxdb/collector.go | 6 +- stats/influxdb/util.go | 2 +- stats/influxdb/util_test.go | 2 +- vendor/github.com/influxdata/influxdb/LICENSE | 20 -- .../influxdb/LICENSE_OF_DEPENDENCIES.md | 62 ---- .../influxdata/influxdb/models/consistency.go | 48 --- .../influxdata/influxdb1-client/LICENSE | 21 ++ .../models/inline_fnv.go | 2 +- .../models/inline_strconv_parse.go | 2 +- .../models/points.go | 339 +++++++++++------- .../models/rows.go | 0 .../models/statistic.go | 0 .../models/time.go | 0 .../models/uint_support.go | 0 .../pkg/escape/bytes.go | 2 +- .../pkg/escape/strings.go | 0 .../client => influxdb1-client}/v2/client.go | 216 +++++++---- .../client => influxdb1-client}/v2/udp.go | 4 + 20 files changed, 460 insertions(+), 337 deletions(-) create mode 100644 stats/influxdb/bench_test.go delete mode 100644 vendor/github.com/influxdata/influxdb/LICENSE delete mode 100644 vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md delete mode 100644 vendor/github.com/influxdata/influxdb/models/consistency.go create mode 100644 vendor/github.com/influxdata/influxdb1-client/LICENSE rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/inline_fnv.go (91%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/inline_strconv_parse.go (94%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/points.go (91%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/rows.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/statistic.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/time.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/models/uint_support.go (100%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/pkg/escape/bytes.go (96%) rename vendor/github.com/influxdata/{influxdb => influxdb1-client}/pkg/escape/strings.go (100%) rename vendor/github.com/influxdata/{influxdb/client => influxdb1-client}/v2/client.go (83%) rename vendor/github.com/influxdata/{influxdb/client => influxdb1-client}/v2/udp.go (94%) diff --git a/Gopkg.lock b/Gopkg.lock index 459b3e502b0..955039335a0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -256,16 +256,16 @@ version = "v1.0" [[projects]] - digest = "1:32b1db7dd0c6cee1e65ab3841b7f8d8a9f0a6d97f85229137efee95a0dfcfddd" - name = "github.com/influxdata/influxdb" + branch = "master" + digest = "1:50708c8fc92aec981df5c446581cf9f90ba9e2a5692118e0ce75d4534aaa14a2" + name = "github.com/influxdata/influxdb1-client" packages = [ - "client/v2", "models", "pkg/escape", + "v2", ] pruneopts = "NUT" - revision = "6ac835404e7e64ea7299a6eebcce1ab1ef15fe3c" - version = "v1.5.0" + revision = "8ff2fc3824fcb533795f9a2f233275f0bb18d6c5" [[projects]] branch = "master" @@ -697,7 +697,7 @@ "github.com/dustin/go-humanize", "github.com/fatih/color", "github.com/gorilla/websocket", - "github.com/influxdata/influxdb/client/v2", + "github.com/influxdata/influxdb1-client/v2", "github.com/julienschmidt/httprouter", "github.com/kelseyhightower/envconfig", "github.com/klauspost/compress/zstd", diff --git a/stats/influxdb/bench_test.go b/stats/influxdb/bench_test.go new file mode 100644 index 00000000000..e3f3b046fa8 --- /dev/null +++ b/stats/influxdb/bench_test.go @@ -0,0 +1,59 @@ +package influxdb + +import ( + "io" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/loadimpact/k6/stats" +) + +func benchmarkInfluxdb(b *testing.B, t time.Duration) { + testCollectorCycle(b, func(rw http.ResponseWriter, r *http.Request) { + for { + time.Sleep(t) + m, _ := io.CopyN(ioutil.Discard, r.Body, 1<<18) // read 1/4 mb a time + if m == 0 { + break + } + } + rw.WriteHeader(204) + }, func(tb testing.TB, c *Collector) { + b = tb.(*testing.B) + b.ResetTimer() + + var samples = make(stats.Samples, 10) + for i := 0; i < len(samples); i++ { + samples[i] = stats.Sample{ + Metric: stats.New("testGauge", stats.Gauge), + Time: time.Now(), + Tags: stats.NewSampleTags(map[string]string{ + "something": "else", + "VU": "21", + "else": "something", + }), + Value: 2.0, + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.Collect([]stats.SampleContainer{samples}) + time.Sleep(time.Nanosecond * 20) + } + }) +} + +func BenchmarkInfluxdb1Second(b *testing.B) { + benchmarkInfluxdb(b, time.Second) +} + +func BenchmarkInfluxdb2Second(b *testing.B) { + benchmarkInfluxdb(b, 2*time.Second) +} + +func BenchmarkInfluxdb100Milliseconds(b *testing.B) { + benchmarkInfluxdb(b, 100*time.Millisecond) +} diff --git a/stats/influxdb/collector.go b/stats/influxdb/collector.go index a6414c733f3..0b9bb9e6584 100644 --- a/stats/influxdb/collector.go +++ b/stats/influxdb/collector.go @@ -26,11 +26,10 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/client/v2" - "github.com/sirupsen/logrus" - + client "github.com/influxdata/influxdb1-client/v2" "github.com/loadimpact/k6/lib" "github.com/loadimpact/k6/stats" + "github.com/sirupsen/logrus" ) // Verify that Collector implements lib.Collector @@ -116,6 +115,7 @@ func (c *Collector) commit() { <-c.semaphoreCh }() logrus.Debug("InfluxDB: Committing...") + logrus.WithField("samples", len(samples)).Debug("InfluxDB: Writing...") batch, err := c.batchFromSamples(samples) if err != nil { diff --git a/stats/influxdb/util.go b/stats/influxdb/util.go index 4f88975836a..eeaa31cf4c9 100644 --- a/stats/influxdb/util.go +++ b/stats/influxdb/util.go @@ -23,7 +23,7 @@ package influxdb import ( "strings" - client "github.com/influxdata/influxdb/client/v2" + client "github.com/influxdata/influxdb1-client/v2" null "gopkg.in/guregu/null.v3" ) diff --git a/stats/influxdb/util_test.go b/stats/influxdb/util_test.go index 2ad746d1082..29bef1b7e0f 100644 --- a/stats/influxdb/util_test.go +++ b/stats/influxdb/util_test.go @@ -23,7 +23,7 @@ package influxdb import ( "testing" - client "github.com/influxdata/influxdb/client/v2" + client "github.com/influxdata/influxdb1-client/v2" "github.com/stretchr/testify/assert" null "gopkg.in/guregu/null.v3" ) diff --git a/vendor/github.com/influxdata/influxdb/LICENSE b/vendor/github.com/influxdata/influxdb/LICENSE deleted file mode 100644 index 63cef79ba6f..00000000000 --- a/vendor/github.com/influxdata/influxdb/LICENSE +++ /dev/null @@ -1,20 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2013-2016 Errplane Inc. - -Permission is hereby granted, free of charge, to any person obtaining a copy of -this software and associated documentation files (the "Software"), to deal in -the Software without restriction, including without limitation the rights to -use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software is furnished to do so, -subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md b/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md deleted file mode 100644 index ea6fc69f30d..00000000000 --- a/vendor/github.com/influxdata/influxdb/LICENSE_OF_DEPENDENCIES.md +++ /dev/null @@ -1,62 +0,0 @@ -- # List -- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE) -- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE) -- github.com/BurntSushi/toml [MIT LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING) -- github.com/RoaringBitmap/roaring [APACHE LICENSE](https://github.com/RoaringBitmap/roaring/blob/master/LICENSE) -- github.com/beorn7/perks [MIT LICENSE](https://github.com/beorn7/perks/blob/master/LICENSE) -- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license) -- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE) -- github.com/cespare/xxhash [MIT LICENSE](https://github.com/cespare/xxhash/blob/master/LICENSE.txt) -- github.com/clarkduvall/hyperloglog [MIT LICENSE](https://github.com/clarkduvall/hyperloglog/blob/master/LICENSE) -- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE) -- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE) -- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE) -- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE) -- github.com/glycerine/go-unsnap-stream [MIT LICENSE](https://github.com/glycerine/go-unsnap-stream/blob/master/LICENSE) -- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE) -- github.com/golang/protobuf [BSD LICENSE](https://github.com/golang/protobuf/blob/master/LICENSE) -- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE) -- github.com/google/go-cmp [BSD LICENSE](https://github.com/google/go-cmp/blob/master/LICENSE) -- github.com/influxdata/influxql [MIT LICENSE](https://github.com/influxdata/influxql/blob/master/LICENSE) -- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt) -- github.com/influxdata/yamux [MOZILLA PUBLIC LICENSE](https://github.com/influxdata/yamux/blob/master/LICENSE) -- github.com/influxdata/yarpc [MIT LICENSE](https://github.com/influxdata/yarpc/blob/master/LICENSE) -- github.com/jsternberg/zap-logfmt [MIT LICENSE](https://github.com/jsternberg/zap-logfmt/blob/master/LICENSE) -- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE) -- github.com/mattn/go-isatty [MIT LICENSE](https://github.com/mattn/go-isatty/blob/master/LICENSE) -- github.com/matttproud/golang_protobuf_extensions [APACHE LICENSE](https://github.com/matttproud/golang_protobuf_extensions/blob/master/LICENSE) -- github.com/opentracing/opentracing-go [MIT LICENSE](https://github.com/opentracing/opentracing-go/blob/master/LICENSE) -- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE) -- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING) -- github.com/philhofer/fwd [MIT LICENSE](https://github.com/philhofer/fwd/blob/master/LICENSE.md) -- github.com/prometheus/client_golang [MIT LICENSE](https://github.com/prometheus/client_golang/blob/master/LICENSE) -- github.com/prometheus/client_model [MIT LICENSE](https://github.com/prometheus/client_model/blob/master/LICENSE) -- github.com/prometheus/common [APACHE LICENSE](https://github.com/prometheus/common/blob/master/LICENSE) -- github.com/prometheus/procfs [APACHE LICENSE](https://github.com/prometheus/procfs/blob/master/LICENSE) -- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE) -- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE) -- github.com/tinylib/msgp [MIT LICENSE](https://github.com/tinylib/msgp/blob/master/LICENSE) -- go.uber.org/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt) -- go.uber.org/multierr [MIT LICENSE](https://github.com/uber-go/multierr/blob/master/LICENSE.txt) -- go.uber.org/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt) -- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE) -- golang.org/x/net [BSD LICENSE](https://github.com/golang/net/blob/master/LICENSE) -- golang.org/x/sys [BSD LICENSE](https://github.com/golang/sys/blob/master/LICENSE) -- golang.org/x/text [BSD LICENSE](https://github.com/golang/text/blob/master/LICENSE) -- golang.org/x/time [BSD LICENSE](https://github.com/golang/time/blob/master/LICENSE) -- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt) -- github.com/xlab/treeprint [MIT LICENSE](https://github.com/xlab/treeprint/blob/master/LICENSE) - - - - - - - - - - - - - - diff --git a/vendor/github.com/influxdata/influxdb/models/consistency.go b/vendor/github.com/influxdata/influxdb/models/consistency.go deleted file mode 100644 index 2a3269bca11..00000000000 --- a/vendor/github.com/influxdata/influxdb/models/consistency.go +++ /dev/null @@ -1,48 +0,0 @@ -package models - -import ( - "errors" - "strings" -) - -// ConsistencyLevel represent a required replication criteria before a write can -// be returned as successful. -// -// The consistency level is handled in open-source InfluxDB but only applicable to clusters. -type ConsistencyLevel int - -const ( - // ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet. - ConsistencyLevelAny ConsistencyLevel = iota - - // ConsistencyLevelOne requires at least one data node acknowledged a write. - ConsistencyLevelOne - - // ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write. - ConsistencyLevelQuorum - - // ConsistencyLevelAll requires all data nodes to acknowledge a write. - ConsistencyLevelAll -) - -var ( - // ErrInvalidConsistencyLevel is returned when parsing the string version - // of a consistency level. - ErrInvalidConsistencyLevel = errors.New("invalid consistency level") -) - -// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const. -func ParseConsistencyLevel(level string) (ConsistencyLevel, error) { - switch strings.ToLower(level) { - case "any": - return ConsistencyLevelAny, nil - case "one": - return ConsistencyLevelOne, nil - case "quorum": - return ConsistencyLevelQuorum, nil - case "all": - return ConsistencyLevelAll, nil - default: - return 0, ErrInvalidConsistencyLevel - } -} diff --git a/vendor/github.com/influxdata/influxdb1-client/LICENSE b/vendor/github.com/influxdata/influxdb1-client/LICENSE new file mode 100644 index 00000000000..83bafde92ee --- /dev/null +++ b/vendor/github.com/influxdata/influxdb1-client/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 InfluxData + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go b/vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go similarity index 91% rename from vendor/github.com/influxdata/influxdb/models/inline_fnv.go rename to vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go index eec1ae8b013..177885d0cb8 100644 --- a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/inline_fnv.go @@ -1,4 +1,4 @@ -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" // from stdlib hash/fnv/fnv.go const ( diff --git a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go b/vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go similarity index 94% rename from vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go rename to vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go index 8db4837384a..7d171b31332 100644 --- a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/inline_strconv_parse.go @@ -1,4 +1,4 @@ -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" import ( "reflect" diff --git a/vendor/github.com/influxdata/influxdb/models/points.go b/vendor/github.com/influxdata/influxdb1-client/models/points.go similarity index 91% rename from vendor/github.com/influxdata/influxdb/models/points.go rename to vendor/github.com/influxdata/influxdb1-client/models/points.go index 7cfebd0717c..f51163070d5 100644 --- a/vendor/github.com/influxdata/influxdb/models/points.go +++ b/vendor/github.com/influxdata/influxdb1-client/models/points.go @@ -1,5 +1,5 @@ // Package models implements basic objects used throughout the TICK stack. -package models // import "github.com/influxdata/influxdb/models" +package models // import "github.com/influxdata/influxdb1-client/models" import ( "bytes" @@ -12,20 +12,27 @@ import ( "strconv" "strings" "time" + "unicode" + "unicode/utf8" - "github.com/influxdata/influxdb/pkg/escape" + "github.com/influxdata/influxdb1-client/pkg/escape" ) +type escapeSet struct { + k [1]byte + esc [2]byte +} + var ( - measurementEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), + measurementEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, } - tagEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), - '=': []byte(`\=`), + tagEscapeCodes = [...]escapeSet{ + {k: [1]byte{','}, esc: [2]byte{'\\', ','}}, + {k: [1]byte{' '}, esc: [2]byte{'\\', ' '}}, + {k: [1]byte{'='}, esc: [2]byte{'\\', '='}}, } // ErrPointMustHaveAField is returned when operating on a point that does not have any fields. @@ -64,6 +71,9 @@ type Point interface { // Tags returns the tag set for the point. Tags() Tags + // ForEachTag iterates over each tag invoking fn. If fn return false, iteration stops. + ForEachTag(fn func(k, v []byte) bool) + // AddTag adds or replaces a tag value for a point. AddTag(key, value string) @@ -263,36 +273,46 @@ func ParsePointsString(buf string) ([]Point, error) { // NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. // This can have the unintended effect preventing buf from being garbage collected. func ParseKey(buf []byte) (string, Tags) { - meas, tags := ParseKeyBytes(buf) - return string(meas), tags + name, tags := ParseKeyBytes(buf) + return string(name), tags } func ParseKeyBytes(buf []byte) ([]byte, Tags) { + return ParseKeyBytesWithTags(buf, nil) +} + +func ParseKeyBytesWithTags(buf []byte, tags Tags) ([]byte, Tags) { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) - var tags Tags + var name []byte if state == tagKeyState { - tags = parseTags(buf) + tags = parseTags(buf, tags) // scanMeasurement returns the location of the comma if there are tags, strip that off - return buf[:i-1], tags + name = buf[:i-1] + } else { + name = buf[:i] } - return buf[:i], tags + return unescapeMeasurement(name), tags } func ParseTags(buf []byte) Tags { - return parseTags(buf) + return parseTags(buf, nil) } -func ParseName(buf []byte) ([]byte, error) { +func ParseName(buf []byte) []byte { // Ignore the error because scanMeasurement returns "missing fields" which we ignore // when just parsing a key state, i, _ := scanMeasurement(buf, 0) + var name []byte if state == tagKeyState { - return buf[:i-1], nil + name = buf[:i-1] + } else { + name = buf[:i] } - return buf[:i], nil + + return unescapeMeasurement(name) } // ParsePointsWithPrecision is similar to ParsePoints, but allows the @@ -315,7 +335,6 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin continue } - // lines which start with '#' are comments start := skipWhitespace(block, 0) // If line is all whitespace, just skip it @@ -323,6 +342,7 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin continue } + // lines which start with '#' are comments if block[start] == '#' { continue } @@ -348,7 +368,7 @@ func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision strin } func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { - // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + // scan the first block which is measurement[,tag1=value1,tag2=value2...] pos, key, err := scanKey(buf, 0) if err != nil { return nil, err @@ -375,7 +395,7 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err } var maxKeyErr error - walkFields(fields, func(k, v []byte) bool { + err = walkFields(fields, func(k, v []byte) bool { if sz := seriesKeySize(key, k); sz > MaxKeyLength { maxKeyErr = fmt.Errorf("max key length exceeded: %v > %v", sz, MaxKeyLength) return false @@ -383,6 +403,10 @@ func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, err return true }) + if err != nil { + return nil, err + } + if maxKeyErr != nil { return nil, maxKeyErr } @@ -1199,23 +1223,33 @@ func scanFieldValue(buf []byte, i int) (int, []byte) { } func EscapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, []byte{b}, esc, -1) + for _, c := range measurementEscapeCodes { + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) + } } return in } func unescapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, esc, []byte{b}, -1) + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + for i := range measurementEscapeCodes { + c := &measurementEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) + } } return in } func escapeTag(in []byte) []byte { - for b, esc := range tagEscapeCodes { - if bytes.IndexByte(in, b) != -1 { - in = bytes.Replace(in, []byte{b}, esc, -1) + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.k[:], c.esc[:], -1) } } return in @@ -1226,9 +1260,10 @@ func unescapeTag(in []byte) []byte { return in } - for b, esc := range tagEscapeCodes { - if bytes.IndexByte(in, b) != -1 { - in = bytes.Replace(in, esc, []byte{b}, -1) + for i := range tagEscapeCodes { + c := &tagEscapeCodes[i] + if bytes.IndexByte(in, c.k[0]) != -1 { + in = bytes.Replace(in, c.esc[:], c.k[:], -1) } } return in @@ -1280,7 +1315,8 @@ func unescapeStringField(in string) string { } // NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If -// an unsupported field value (NaN) or out of range time is passed, this function returns an error. +// an unsupported field value (NaN, or +/-Inf) or out of range time is passed, this function +// returns an error. func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) { key, err := pointKey(name, tags, fields, t) if err != nil { @@ -1311,11 +1347,17 @@ func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte switch value := value.(type) { case float64: // Ensure the caller validates and handles invalid field values + if math.IsInf(value, 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } if math.IsNaN(value) { return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) } case float32: // Ensure the caller validates and handles invalid field values + if math.IsInf(float64(value), 0) { + return nil, fmt.Errorf("+/-Inf is an unsupported value for field %s", key) + } if math.IsNaN(float64(value)) { return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) } @@ -1441,10 +1483,14 @@ func (p *point) Tags() Tags { if p.cachedTags != nil { return p.cachedTags } - p.cachedTags = parseTags(p.key) + p.cachedTags = parseTags(p.key, nil) return p.cachedTags } +func (p *point) ForEachTag(fn func(k, v []byte) bool) { + walkTags(p.key, fn) +} + func (p *point) HasTag(tag []byte) bool { if len(p.key) == 0 { return false @@ -1504,11 +1550,14 @@ func walkTags(buf []byte, fn func(key, value []byte) bool) { // walkFields walks each field key and value via fn. If fn returns false, the iteration // is stopped. The values are the raw byte slices and not the converted types. -func walkFields(buf []byte, fn func(key, value []byte) bool) { +func walkFields(buf []byte, fn func(key, value []byte) bool) error { var i int var key, val []byte for len(buf) > 0 { i, key = scanTo(buf, 0, '=') + if i > len(buf)-2 { + return fmt.Errorf("invalid value: field-key=%s", key) + } buf = buf[i+1:] i, val = scanFieldValue(buf, 0) buf = buf[i:] @@ -1521,29 +1570,52 @@ func walkFields(buf []byte, fn func(key, value []byte) bool) { buf = buf[1:] } } + return nil } -func parseTags(buf []byte) Tags { +// parseTags parses buf into the provided destination tags, returning destination +// Tags, which may have a different length and capacity. +func parseTags(buf []byte, dst Tags) Tags { if len(buf) == 0 { return nil } - tags := make(Tags, bytes.Count(buf, []byte(","))) - p := 0 + n := bytes.Count(buf, []byte(",")) + if cap(dst) < n { + dst = make(Tags, n) + } else { + dst = dst[:n] + } + + // Ensure existing behaviour when point has no tags and nil slice passed in. + if dst == nil { + dst = Tags{} + } + + // Series keys can contain escaped commas, therefore the number of commas + // in a series key only gives an estimation of the upper bound on the number + // of tags. + var i int walkTags(buf, func(key, value []byte) bool { - tags[p].Key = key - tags[p].Value = value - p++ + dst[i].Key, dst[i].Value = key, value + i++ return true }) - return tags + return dst[:i] } // MakeKey creates a key for a set of tags. func MakeKey(name []byte, tags Tags) []byte { + return AppendMakeKey(nil, name, tags) +} + +// AppendMakeKey appends the key derived from name and tags to dst and returns the extended buffer. +func AppendMakeKey(dst []byte, name []byte, tags Tags) []byte { // unescape the name and then re-escape it to avoid double escaping. // The key should always be stored in escaped form. - return append(EscapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) + dst = append(dst, EscapeMeasurement(unescapeMeasurement(name))...) + dst = tags.AppendHashKey(dst) + return dst } // SetTags replaces the tags for the point. @@ -1868,28 +1940,80 @@ func NewTags(m map[string]string) Tags { return a } -// Keys returns the list of keys for a tag set. -func (a Tags) Keys() []string { - if len(a) == 0 { - return nil - } - keys := make([]string, len(a)) - for i, tag := range a { - keys[i] = string(tag.Key) +// HashKey hashes all of a tag's keys. +func (a Tags) HashKey() []byte { + return a.AppendHashKey(nil) +} + +func (a Tags) needsEscape() bool { + for i := range a { + t := &a[i] + for j := range tagEscapeCodes { + c := &tagEscapeCodes[j] + if bytes.IndexByte(t.Key, c.k[0]) != -1 || bytes.IndexByte(t.Value, c.k[0]) != -1 { + return true + } + } } - return keys + return false } -// Values returns the list of values for a tag set. -func (a Tags) Values() []string { +// AppendHashKey appends the result of hashing all of a tag's keys and values to dst and returns the extended buffer. +func (a Tags) AppendHashKey(dst []byte) []byte { + // Empty maps marshal to empty bytes. if len(a) == 0 { - return nil + return dst + } + + // Type invariant: Tags are sorted + + sz := 0 + var escaped Tags + if a.needsEscape() { + var tmp [20]Tag + if len(a) < len(tmp) { + escaped = tmp[:len(a)] + } else { + escaped = make(Tags, len(a)) + } + + for i := range a { + t := &a[i] + nt := &escaped[i] + nt.Key = escapeTag(t.Key) + nt.Value = escapeTag(t.Value) + sz += len(nt.Key) + len(nt.Value) + } + } else { + sz = a.Size() + escaped = a } - values := make([]string, len(a)) - for i, tag := range a { - values[i] = string(tag.Value) + + sz += len(escaped) + (len(escaped) * 2) // separators + + // Generate marshaled bytes. + if cap(dst)-len(dst) < sz { + nd := make([]byte, len(dst), len(dst)+sz) + copy(nd, dst) + dst = nd } - return values + buf := dst[len(dst) : len(dst)+sz] + idx := 0 + for i := range escaped { + k := &escaped[i] + if len(k.Value) == 0 { + continue + } + buf[idx] = ',' + idx++ + copy(buf[idx:], k.Key) + idx += len(k.Key) + buf[idx] = '=' + idx++ + copy(buf[idx:], k.Value) + idx += len(k.Value) + } + return dst[:len(dst)+idx] } // String returns the string representation of the tags. @@ -1911,8 +2035,8 @@ func (a Tags) String() string { // for data structures or delimiters for example. func (a Tags) Size() int { var total int - for _, t := range a { - total += t.Size() + for i := range a { + total += a[i].Size() } return total } @@ -2008,18 +2132,6 @@ func (a *Tags) SetString(key, value string) { a.Set([]byte(key), []byte(value)) } -// Delete removes a tag by key. -func (a *Tags) Delete(key []byte) { - for i, t := range *a { - if bytes.Equal(t.Key, key) { - copy((*a)[i:], (*a)[i+1:]) - (*a)[len(*a)-1] = Tag{} - *a = (*a)[:len(*a)-1] - return - } - } -} - // Map returns a map representation of the tags. func (a Tags) Map() map[string]string { m := make(map[string]string, len(a)) @@ -2029,60 +2141,6 @@ func (a Tags) Map() map[string]string { return m } -// Merge merges the tags combining the two. If both define a tag with the -// same key, the merged value overwrites the old value. -// A new map is returned. -func (a Tags) Merge(other map[string]string) Tags { - merged := make(map[string]string, len(a)+len(other)) - for _, t := range a { - merged[string(t.Key)] = string(t.Value) - } - for k, v := range other { - merged[k] = v - } - return NewTags(merged) -} - -// HashKey hashes all of a tag's keys. -func (a Tags) HashKey() []byte { - // Empty maps marshal to empty bytes. - if len(a) == 0 { - return nil - } - - // Type invariant: Tags are sorted - - escaped := make(Tags, 0, len(a)) - sz := 0 - for _, t := range a { - ek := escapeTag(t.Key) - ev := escapeTag(t.Value) - - if len(ev) > 0 { - escaped = append(escaped, Tag{Key: ek, Value: ev}) - sz += len(ek) + len(ev) - } - } - - sz += len(escaped) + (len(escaped) * 2) // separators - - // Generate marshaled bytes. - b := make([]byte, sz) - buf := b - idx := 0 - for _, k := range escaped { - buf[idx] = ',' - idx++ - copy(buf[idx:idx+len(k.Key)], k.Key) - idx += len(k.Key) - buf[idx] = '=' - idx++ - copy(buf[idx:idx+len(k.Value)], k.Value) - idx += len(k.Value) - } - return b[:idx] -} - // CopyTags returns a shallow copy of tags. func CopyTags(a Tags) Tags { other := make(Tags, len(a)) @@ -2326,3 +2384,30 @@ func appendField(b []byte, k string, v interface{}) []byte { return b } + +// ValidKeyToken returns true if the token used for measurement, tag key, or tag +// value is a valid unicode string and only contains printable, non-replacement characters. +func ValidKeyToken(s string) bool { + if !utf8.ValidString(s) { + return false + } + for _, r := range s { + if !unicode.IsPrint(r) || r == unicode.ReplacementChar { + return false + } + } + return true +} + +// ValidKeyTokens returns true if the measurement name and all tags are valid. +func ValidKeyTokens(name string, tags Tags) bool { + if !ValidKeyToken(name) { + return false + } + for _, tag := range tags { + if !ValidKeyToken(string(tag.Key)) || !ValidKeyToken(string(tag.Value)) { + return false + } + } + return true +} diff --git a/vendor/github.com/influxdata/influxdb/models/rows.go b/vendor/github.com/influxdata/influxdb1-client/models/rows.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/rows.go rename to vendor/github.com/influxdata/influxdb1-client/models/rows.go diff --git a/vendor/github.com/influxdata/influxdb/models/statistic.go b/vendor/github.com/influxdata/influxdb1-client/models/statistic.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/statistic.go rename to vendor/github.com/influxdata/influxdb1-client/models/statistic.go diff --git a/vendor/github.com/influxdata/influxdb/models/time.go b/vendor/github.com/influxdata/influxdb1-client/models/time.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/time.go rename to vendor/github.com/influxdata/influxdb1-client/models/time.go diff --git a/vendor/github.com/influxdata/influxdb/models/uint_support.go b/vendor/github.com/influxdata/influxdb1-client/models/uint_support.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/models/uint_support.go rename to vendor/github.com/influxdata/influxdb1-client/models/uint_support.go diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go similarity index 96% rename from vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go rename to vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go index f3b31f42d36..39a33f6742c 100644 --- a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go +++ b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/bytes.go @@ -1,6 +1,6 @@ // Package escape contains utilities for escaping parts of InfluxQL // and InfluxDB line protocol. -package escape // import "github.com/influxdata/influxdb/pkg/escape" +package escape // import "github.com/influxdata/influxdb1-client/pkg/escape" import ( "bytes" diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go b/vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go similarity index 100% rename from vendor/github.com/influxdata/influxdb/pkg/escape/strings.go rename to vendor/github.com/influxdata/influxdb1-client/pkg/escape/strings.go diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb1-client/v2/client.go similarity index 83% rename from vendor/github.com/influxdata/influxdb/client/v2/client.go rename to vendor/github.com/influxdata/influxdb1-client/v2/client.go index 6c2c56a114a..0cf7b5f21a2 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/client.go +++ b/vendor/github.com/influxdata/influxdb1-client/v2/client.go @@ -1,5 +1,5 @@ // Package client (v2) is the current official Go client for InfluxDB. -package client // import "github.com/influxdata/influxdb/client/v2" +package client // import "github.com/influxdata/influxdb1-client/v2" import ( "bytes" @@ -17,7 +17,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb1-client/models" ) // HTTPConfig is the config data needed to create an HTTP Client. @@ -45,6 +45,9 @@ type HTTPConfig struct { // TLSConfig allows the user to set their own TLS config for the HTTP // Client. If set, this option overrides InsecureSkipVerify. TLSConfig *tls.Config + + // Proxy configures the Proxy function on the HTTP client. + Proxy func(req *http.Request) (*url.URL, error) } // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct. @@ -75,6 +78,10 @@ type Client interface { // the UDP client. Query(q Query) (*Response, error) + // QueryAsChunk makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + QueryAsChunk(q Query) (*ChunkedResponse, error) + // Close releases any resources a Client may be using. Close() error } @@ -99,6 +106,7 @@ func NewHTTPClient(conf HTTPConfig) (Client, error) { TLSClientConfig: &tls.Config{ InsecureSkipVerify: conf.InsecureSkipVerify, }, + Proxy: conf.Proxy, } if conf.TLSConfig != nil { tr.TLSClientConfig = conf.TLSConfig @@ -153,7 +161,7 @@ func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { } if resp.StatusCode != http.StatusNoContent { - var err = fmt.Errorf(string(body)) + var err = errors.New(string(body)) return 0, "", err } @@ -359,6 +367,9 @@ func (c *client) Write(bp BatchPoints) error { var b bytes.Buffer for _, p := range bp.Points() { + if p == nil { + continue + } if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { return err } @@ -400,7 +411,7 @@ func (c *client) Write(bp BatchPoints) error { } if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { - var err = fmt.Errorf(string(body)) + var err = errors.New(string(body)) return err } @@ -409,12 +420,13 @@ func (c *client) Write(bp BatchPoints) error { // Query defines a query to send to the server. type Query struct { - Command string - Database string - Precision string - Chunked bool - ChunkSize int - Parameters map[string]interface{} + Command string + Database string + RetentionPolicy string + Precision string + Chunked bool + ChunkSize int + Parameters map[string]interface{} } // NewQuery returns a query object. @@ -428,6 +440,19 @@ func NewQuery(command, database, precision string) Query { } } +// NewQueryWithRP returns a query object. +// The database, retention policy, and precision arguments can be empty strings if they are not needed +// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater. +func NewQueryWithRP(command, database, retentionPolicy, precision string) Query { + return Query{ + Command: command, + Database: database, + RetentionPolicy: retentionPolicy, + Precision: precision, + Parameters: make(map[string]interface{}), + } +} + // NewQueryWithParameters returns a query object. // The database and precision arguments can be empty strings if they are not needed for the query. // parameters is a map of the parameter names used in the command to their values. @@ -450,11 +475,11 @@ type Response struct { // It returns nil if no errors occurred on any statements. func (r *Response) Error() error { if r.Err != "" { - return fmt.Errorf(r.Err) + return errors.New(r.Err) } for _, result := range r.Results { if result.Err != "" { - return fmt.Errorf(result.Err) + return errors.New(result.Err) } } return nil @@ -475,72 +500,26 @@ type Result struct { // Query sends a command to the server and returns the Response. func (c *client) Query(q Query) (*Response, error) { - u := c.url - u.Path = path.Join(u.Path, "query") - - jsonParameters, err := json.Marshal(q.Parameters) - - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", u.String(), nil) + req, err := c.createDefaultRequest(q) if err != nil { return nil, err } - - req.Header.Set("Content-Type", "") - req.Header.Set("User-Agent", c.useragent) - - if c.username != "" { - req.SetBasicAuth(c.username, c.password) - } - params := req.URL.Query() - params.Set("q", q.Command) - params.Set("db", q.Database) - params.Set("params", string(jsonParameters)) if q.Chunked { params.Set("chunked", "true") if q.ChunkSize > 0 { params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) } + req.URL.RawQuery = params.Encode() } - - if q.Precision != "" { - params.Set("epoch", q.Precision) - } - req.URL.RawQuery = params.Encode() - resp, err := c.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb - // but instead some other service. If the error code is also a 500+ code, then some - // downstream loadbalancer/proxy/etc had an issue and we should report that. - if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { - body, err := ioutil.ReadAll(resp.Body) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode) - } - - return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) - } - - // If we get an unexpected content type, then it is also not from influx direct and therefore - // we want to know what we received and what status code was returned for debugging purposes. - if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { - // Read up to 1kb of the body to help identify downstream errors and limit the impact of things - // like downstream serving a large file - body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) - if err != nil || len(body) == 0 { - return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) - } - - return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + if err := checkResponse(resp); err != nil { + return nil, err } var response Response @@ -549,6 +528,9 @@ func (c *client) Query(q Query) (*Response, error) { for { r, err := cr.NextResponse() if err != nil { + if err == io.EOF { + break + } // If we got an error while decoding the response, send that back. return nil, err } @@ -586,10 +568,99 @@ func (c *client) Query(q Query) (*Response, error) { return &response, nil } +// QueryAsChunk sends a command to the server and returns the Response. +func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) { + req, err := c.createDefaultRequest(q) + if err != nil { + return nil, err + } + params := req.URL.Query() + params.Set("chunked", "true") + if q.ChunkSize > 0 { + params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + req.URL.RawQuery = params.Encode() + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + + if err := checkResponse(resp); err != nil { + return nil, err + } + return NewChunkedResponse(resp.Body), nil +} + +func checkResponse(resp *http.Response) error { + // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb + // but instead some other service. If the error code is also a 500+ code, then some + // downstream loadbalancer/proxy/etc had an issue and we should report that. + if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError { + body, err := ioutil.ReadAll(resp.Body) + if err != nil || len(body) == 0 { + return fmt.Errorf("received status code %d from downstream server", resp.StatusCode) + } + + return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body) + } + + // If we get an unexpected content type, then it is also not from influx direct and therefore + // we want to know what we received and what status code was returned for debugging purposes. + if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" { + // Read up to 1kb of the body to help identify downstream errors and limit the impact of things + // like downstream serving a large file + body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil || len(body) == 0 { + return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode) + } + + return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body) + } + return nil +} + +func (c *client) createDefaultRequest(q Query) (*http.Request, error) { + u := c.url + u.Path = path.Join(u.Path, "query") + + jsonParameters, err := json.Marshal(q.Parameters) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + if q.RetentionPolicy != "" { + params.Set("rp", q.RetentionPolicy) + } + params.Set("params", string(jsonParameters)) + + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + return req, nil + +} + // duplexReader reads responses and writes it to another writer while // satisfying the reader interface. type duplexReader struct { - r io.Reader + r io.ReadCloser w io.Writer } @@ -601,6 +672,11 @@ func (r *duplexReader) Read(p []byte) (n int, err error) { return n, err } +// Close closes the response. +func (r *duplexReader) Close() error { + return r.r.Close() +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { @@ -611,8 +687,12 @@ type ChunkedResponse struct { // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { + rc, ok := r.(io.ReadCloser) + if !ok { + rc = ioutil.NopCloser(r) + } resp := &ChunkedResponse{} - resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.duplex = &duplexReader{r: rc, w: &resp.buf} resp.dec = json.NewDecoder(resp.duplex) resp.dec.UseNumber() return resp @@ -621,10 +701,9 @@ func NewChunkedResponse(r io.Reader) *ChunkedResponse { // NextResponse reads the next line of the stream and returns a response. func (r *ChunkedResponse) NextResponse() (*Response, error) { var response Response - if err := r.dec.Decode(&response); err != nil { if err == io.EOF { - return nil, nil + return nil, err } // A decoding error happened. This probably means the server crashed // and sent a last-ditch error message to us. Ensure we have read the @@ -636,3 +715,8 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { r.buf.Reset() return &response, nil } + +// Close closes the response. +func (r *ChunkedResponse) Close() error { + return r.duplex.Close() +} diff --git a/vendor/github.com/influxdata/influxdb/client/v2/udp.go b/vendor/github.com/influxdata/influxdb1-client/v2/udp.go similarity index 94% rename from vendor/github.com/influxdata/influxdb/client/v2/udp.go rename to vendor/github.com/influxdata/influxdb1-client/v2/udp.go index 779a28b33f3..9867868b41c 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/udp.go +++ b/vendor/github.com/influxdata/influxdb1-client/v2/udp.go @@ -107,6 +107,10 @@ func (uc *udpclient) Query(q Query) (*Response, error) { return nil, fmt.Errorf("Querying via UDP is not supported") } +func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { return 0, "", nil }