Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Unifying retry strategy among InfluxDB 2 clients #179

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### Features
1. [#165](https://github.com/influxdata/influxdb-client-go/pull/165) Allow overriding the http.Client for the http service.
1. [#179](https://github.com/influxdata/influxdb-client-go/pull/179) Unifying retry strategy among InfluxDB 2 clients: added exponential backoff.

### Bug fixes
1. [#175](https://github.com/influxdata/influxdb-client-go/pull/175) Fixed WriteAPIs management. Keeping single instance for each org and bucket pair.
Expand Down
27 changes: 20 additions & 7 deletions api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ type Options struct {
batchSize uint
// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
flushInterval uint
// Default retry interval in ms, if not sent by server. Default 1000ms
retryInterval uint
// Maximum count of retry attempts of failed writes
maxRetries uint
// Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 10,000
retryBufferLimit uint
// Precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second
// Default time.Nanosecond
precision time.Duration
// Whether to use GZip compression in requests. Default false
useGZip bool
// Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged.
defaultTags map[string]string
// Default retry interval in ms, if not sent by server. Default 5,000ms
retryInterval uint
// Maximum count of retry attempts of failed writes
maxRetries uint
// Maximum number of points to keep for retry. Should be multiple of BatchSize. Default 50,000
retryBufferLimit uint
// Maximum retry interval, default 5min (300,000ms)
maxRetryInterval uint
}

// BatchSize returns size of batch
Expand Down Expand Up @@ -84,6 +86,17 @@ func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options {
return o
}

// MaxRetryInterval return maximum retry interval in ms. Default 5min.
func (o *Options) MaxRetryInterval() uint {
return o.maxRetryInterval
}

// SetMaxRetryInterval set maximum retry interval in ms
func (o *Options) SetMaxRetryInterval(maxRetryIntervalMs uint) *Options {
o.maxRetryInterval = maxRetryIntervalMs
return o
}

// Precision returns time precision for writes
func (o *Options) Precision() time.Duration {
return o.precision
Expand Down Expand Up @@ -124,5 +137,5 @@ func (o *Options) DefaultTags() map[string]string {

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000, defaultTags: make(map[string]string)}
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 5000, maxRetryInterval: 300000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50000, defaultTags: make(map[string]string)}
}
11 changes: 7 additions & 4 deletions api/write/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ func TestDefaultOptions(t *testing.T) {
assert.Equal(t, false, opts.UseGZip())
assert.Equal(t, uint(1000), opts.FlushInterval())
assert.Equal(t, time.Nanosecond, opts.Precision())
assert.Equal(t, uint(10000), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(50000), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Equal(t, uint(300000), opts.MaxRetryInterval())
assert.Len(t, opts.DefaultTags(), 0)
}

Expand All @@ -31,16 +32,18 @@ func TestSettingsOptions(t *testing.T) {
SetFlushInterval(5000).
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(5000).
SetRetryInterval(1000).
SetMaxRetries(7).
SetMaxRetryInterval(150000).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Equal(t, uint(150000), opts.MaxRetryInterval())
assert.Len(t, opts.DefaultTags(), 2)
}
22 changes: 11 additions & 11 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,56 @@ package api

import (
"context"

"sync"
"testing"
"time"

"github.com/influxdata/influxdb-client-go/api/write"
"github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWritePoint(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
points := genPoints(10)
err := writeAPI.WritePoint(context.Background(), points...)
require.Nil(t, err)
require.Len(t, service.lines, 10)
require.Len(t, service.Lines(), 10)
for i, p := range points {
line := write.PointToLineProtocol(p, writeAPI.writeOptions.Precision())
//cut off last \n char
line = line[:len(line)-1]
assert.Equal(t, service.lines[i], line)
assert.Equal(t, service.Lines()[i], line)
}
}

func TestWriteRecord(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(10)
err := writeAPI.WriteRecord(context.Background(), lines...)
require.Nil(t, err)
require.Len(t, service.lines, 10)
require.Len(t, service.Lines(), 10)
for i, l := range lines {
assert.Equal(t, l, service.lines[i])
assert.Equal(t, l, service.Lines()[i])
}
service.Close()

err = writeAPI.WriteRecord(context.Background())
require.Nil(t, err)
require.Len(t, service.lines, 0)
require.Len(t, service.Lines(), 0)

service.replyError = &http.Error{Code: "invalid", Message: "data"}
service.SetReplyError(&http.Error{Code: "invalid", Message: "data"})
err = writeAPI.WriteRecord(context.Background(), lines...)
require.NotNil(t, err)
require.Equal(t, "invalid: data", err.Error())
}

func TestWriteContextCancel(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(10)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -70,5 +70,5 @@ func TestWriteContextCancel(t *testing.T) {
cancel()
wg.Wait()
require.Equal(t, context.Canceled, err)
assert.Len(t, service.lines, 0)
assert.Len(t, service.Lines(), 0)
}
144 changes: 13 additions & 131 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,8 @@
package api

import (
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"strings"
"sync"
"testing"
Expand All @@ -20,124 +15,11 @@ import (
"github.com/influxdata/influxdb-client-go/api/write"
ihttp "github.com/influxdata/influxdb-client-go/internal/http"
"github.com/influxdata/influxdb-client-go/internal/log"
"github.com/influxdata/influxdb-client-go/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testHTTPService struct {
serverURL string
authorization string
lines []string
t *testing.T
wasGzip bool
requestHandler func(c *testHTTPService, url string, body io.Reader) error
replyError *ihttp.Error
lock sync.Mutex
}

func (t *testHTTPService) ServerURL() string {
return t.serverURL
}

func (t *testHTTPService) ServerAPIURL() string {
return t.serverURL
}

func (t *testHTTPService) Authorization() string {
return t.authorization
}

func (t *testHTTPService) HTTPClient() *http.Client {
return nil
}

func (t *testHTTPService) Close() {
t.lock.Lock()
if len(t.lines) > 0 {
t.lines = t.lines[:0]
}
t.wasGzip = false
t.replyError = nil
t.requestHandler = nil
t.lock.Unlock()
}

func (t *testHTTPService) ReplyError() *ihttp.Error {
t.lock.Lock()
defer t.lock.Unlock()
return t.replyError
}

func (t *testHTTPService) SetAuthorization(_ string) {

}
func (t *testHTTPService) GetRequest(_ context.Context, _ string, _ ihttp.RequestCallback, _ ihttp.ResponseCallback) *ihttp.Error {
return nil
}
func (t *testHTTPService) DoHTTPRequest(_ *http.Request, _ ihttp.RequestCallback, _ ihttp.ResponseCallback) *ihttp.Error {
return nil
}

func (t *testHTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ ihttp.RequestCallback) (*http.Response, error) {
return nil, nil
}

func (t *testHTTPService) PostRequest(_ context.Context, url string, body io.Reader, requestCallback ihttp.RequestCallback, _ ihttp.ResponseCallback) *ihttp.Error {
req, err := http.NewRequest("POST", url, nil)
if err != nil {
return ihttp.NewError(err)
}
if requestCallback != nil {
requestCallback(req)
}
if req.Header.Get("Content-Encoding") == "gzip" {
body, _ = gzip.NewReader(body)
t.wasGzip = true
}
assert.Equal(t.t, fmt.Sprintf("%swrite?bucket=my-bucket&org=my-org&precision=ns", t.serverURL), url)

if t.ReplyError() != nil {
return t.ReplyError()
}
if t.requestHandler != nil {
err = t.requestHandler(t, url, body)
} else {
err = t.decodeLines(body)
}

if err != nil {
return ihttp.NewError(err)
} else {
return nil
}
}

func (t *testHTTPService) decodeLines(body io.Reader) error {
bytes, err := ioutil.ReadAll(body)
if err != nil {
return err
}
lines := strings.Split(string(bytes), "\n")
lines = lines[:len(lines)-1]
t.lock.Lock()
t.lines = append(t.lines, lines...)
t.lock.Unlock()
return nil
}

func (t *testHTTPService) Lines() []string {
t.lock.Lock()
defer t.lock.Unlock()
return t.lines
}

func newTestService(t *testing.T, serverURL string) *testHTTPService {
return &testHTTPService{
t: t,
serverURL: serverURL + "/api/v2/",
}
}

func genPoints(num int) []*write.Point {
points := make([]*write.Point, num)
rand.Seed(321)
Expand Down Expand Up @@ -182,7 +64,7 @@ func genRecords(num int) []string {
}

func TestWriteAPIWriteDefaultTag(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().
SetBatchSize(1)
opts.AddDefaultTag("dft", "a")
Expand All @@ -201,7 +83,7 @@ func TestWriteAPIWriteDefaultTag(t *testing.T) {
}

func TestWriteAPIImpl_Write(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
points := genPoints(10)
for _, p := range points {
Expand All @@ -218,7 +100,7 @@ func TestWriteAPIImpl_Write(t *testing.T) {
}

func TestGzipWithFlushing(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(4)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
points := genPoints(5)
Expand All @@ -230,7 +112,7 @@ func TestGzipWithFlushing(t *testing.T) {
end := time.Now()
fmt.Printf("Flash duration: %dns\n", end.Sub(start).Nanoseconds())
assert.Len(t, service.Lines(), 5)
assert.True(t, service.wasGzip)
assert.True(t, service.WasGzip())

service.Close()
writeAPI.writeOptions.SetUseGZip(false)
Expand All @@ -239,12 +121,12 @@ func TestGzipWithFlushing(t *testing.T) {
}
writeAPI.waitForFlushing()
assert.Len(t, service.Lines(), 5)
assert.False(t, service.wasGzip)
assert.False(t, service.WasGzip())

writeAPI.Close()
}
func TestFlushInterval(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500))
points := genPoints(5)
for _, p := range points {
Expand All @@ -268,7 +150,7 @@ func TestFlushInterval(t *testing.T) {
}

func TestRetry(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(5)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
points := genPoints(15)
Expand All @@ -278,10 +160,10 @@ func TestRetry(t *testing.T) {
writeAPI.waitForFlushing()
require.Len(t, service.Lines(), 5)
service.Close()
service.replyError = &ihttp.Error{
service.SetReplyError(&ihttp.Error{
StatusCode: 429,
RetryAfter: 5,
}
})
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
}
Expand All @@ -305,13 +187,13 @@ func TestRetry(t *testing.T) {
}

func TestWriteError(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetDebugLevel(3)
service.replyError = &ihttp.Error{
service.SetReplyError(&ihttp.Error{
StatusCode: 400,
Code: "write",
Message: "error",
}
})
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
errCh := writeAPI.Errors()
var recErr error
Expand Down
Loading