diff --git a/CHANGELOG.md b/CHANGELOG.md index b4292bb5..6190dce0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,13 @@ -## unreleased +## Unreleased +### Features +- [#285](https://github.com/influxdata/influxdb-client-go/pull/285) Added *Client.Ping()* function as the only validation method available in both OSS and Cloud. + ### Bug fixes +- [#285](https://github.com/influxdata/influxdb-client-go/pull/285) Functions *Client.Health()* and *Client.Ready()* correctly report an error when called against InfluxDB Cloud. -### Features +### Breaking change +- [#285](https://github.com/influxdata/influxdb-client-go/pull/285) Function *Client.Ready()* now returns `*domain.Ready` with full uptime info. + ## 2.5.1[2021-09-17] ### Bug fixes diff --git a/README.md b/README.md index 1c561dd9..6d1d33ca 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ This repository contains the reference Go client for InfluxDB 2. - [Queries in Detail](#queries) - [Concurrency](#concurrency) - [Proxy and redirects](#proxy-and-redirects) + - [Checking Server State](#checking-server-state) - [InfluxDB 1.8 API compatibility](#influxdb-18-api-compatibility) - [Contributing](#contributing) - [License](#license) @@ -531,6 +532,17 @@ httpClient := &http.Client{ client := influxdb2.NewClientWithOptions("http://localhost:8086", token, influxdb2.DefaultOptions().SetHTTPClient(httpClient)) ``` +### Checking Server State +There are three functions for checking whether a server is up and ready for communication: + +| Function| Description | Availability | +|:----------|:----------|:----------| +| [Health()](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#Client.Health) | Detailed info about the server status, along with version string | OSS | +| [Ready()](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#Client.Ready) | Server uptime info | OSS | +| [Ping()](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#Client.Ping) | Whether a server is up | OSS, Cloud | + +Only the [Ping()](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#Client.Ping) function works in InfluxDB Cloud server. + ## InfluxDB 1.8 API compatibility [InfluxDB 1.8.0 introduced forward compatibility APIs](https://docs.influxdata.com/influxdb/latest/tools/api/#influxdb-2-0-api-compatibility-endpoints) for InfluxDB 2.0. This allow you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source. diff --git a/api/query_test.go b/api/query_test.go index 1d00d694..4e70c0d4 100644 --- a/api/query_test.go +++ b/api/query_test.go @@ -108,28 +108,28 @@ func TestQueryCVSResultSingleTable(t *testing.T) { func TestQueryCVSResultMultiTables(t *testing.T) { csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string #group,false,false,true,true,false,false,true,true,true,true -#default,_result,,,,,,,,, +#default,_result1,,,,,,,,, ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string #group,false,false,true,true,false,false,true,true,true,true -#default,_result,,,,,,,,, +#default,_result2,,,,,,,,, ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,-1,i,test,1,adsfasdf #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,boolean,string,string,string,string #group,false,false,true,true,false,false,true,true,true,true -#default,_result,,,,,,,,, +#default,_result3,,,,,,,,, ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b ,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,false,f,test,0,adsfasdf ,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,true,f,test,0,adsfasdf #datatype,string,long,dateTime:RFC3339Nano,dateTime:RFC3339Nano,dateTime:RFC3339Nano,unsignedLong,string,string,string,string #group,false,false,true,true,false,false,true,true,true,true -#default,_result,,,,,,,,, +#default,_result4,,,,,,,,, ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b ,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,0,i,test,0,adsfasdf ,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,2,i,test,0,adsfasdf @@ -137,7 +137,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ` expectedTable1 := query.NewFluxTableMetadataFull(0, []*query.FluxColumn{ - query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("string", "_result1", "result", false, 0), query.NewFluxColumnFull("long", "", "table", false, 1), query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), @@ -151,7 +151,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord11 := query.NewFluxRecord(0, map[string]interface{}{ - "result": "_result", + "result": "_result1", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -165,7 +165,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord12 := query.NewFluxRecord(0, map[string]interface{}{ - "result": "_result", + "result": "_result1", "table": int64(0), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -180,7 +180,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { expectedTable2 := query.NewFluxTableMetadataFull(1, []*query.FluxColumn{ - query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("string", "_result2", "result", false, 0), query.NewFluxColumnFull("long", "", "table", false, 1), query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), @@ -194,7 +194,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord21 := query.NewFluxRecord(1, map[string]interface{}{ - "result": "_result", + "result": "_result2", "table": int64(1), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -208,7 +208,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord22 := query.NewFluxRecord(1, map[string]interface{}{ - "result": "_result", + "result": "_result2", "table": int64(1), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -223,7 +223,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { expectedTable3 := query.NewFluxTableMetadataFull(2, []*query.FluxColumn{ - query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("string", "_result3", "result", false, 0), query.NewFluxColumnFull("long", "", "table", false, 1), query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2), query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3), @@ -237,7 +237,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord31 := query.NewFluxRecord(2, map[string]interface{}{ - "result": "_result", + "result": "_result3", "table": int64(2), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -251,7 +251,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord32 := query.NewFluxRecord(2, map[string]interface{}{ - "result": "_result", + "result": "_result3", "table": int64(2), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -266,7 +266,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { expectedTable4 := query.NewFluxTableMetadataFull(3, []*query.FluxColumn{ - query.NewFluxColumnFull("string", "_result", "result", false, 0), + query.NewFluxColumnFull("string", "_result4", "result", false, 0), query.NewFluxColumnFull("long", "", "table", false, 1), query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_start", true, 2), query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_stop", true, 3), @@ -280,7 +280,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord41 := query.NewFluxRecord(3, map[string]interface{}{ - "result": "_result", + "result": "_result4", "table": int64(3), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -294,7 +294,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { ) expectedRecord42 := query.NewFluxRecord(3, map[string]interface{}{ - "result": "_result", + "result": "_result4", "table": int64(3), "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"), "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"), @@ -382,6 +382,7 @@ func TestQueryCVSResultMultiTables(t *testing.T) { require.Equal(t, queryResult.table, expectedTable4) require.NotNil(t, queryResult.Record()) require.Equal(t, queryResult.Record(), expectedRecord42) + assert.Equal(t, "_result4", queryResult.Record().ValueByKey("result")) require.False(t, queryResult.Next()) require.Nil(t, queryResult.Err()) @@ -474,7 +475,7 @@ func TestQueryRawResult(t *testing.T) { ``, `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string`, `#group,false,false,true,true,false,false,true,true,true,true`, - `#default,_result,,,,,,,,,`, + `#default,_result2,,,,,,,,,`, `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b`, `,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf`, `,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,1,i,test,1,adsfasdf`, diff --git a/api/write_test.go b/api/write_test.go index 98d57cd8..3de04b5c 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -84,26 +84,17 @@ func TestGzipWithFlushing(t *testing.T) { } func TestFlushInterval(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") - writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(10)) + writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(30)) points := test.GenPoints(5) for _, p := range points { writeAPI.WritePoint(p) } require.Len(t, service.Lines(), 0) - <-time.After(time.Millisecond * 15) + <-time.After(time.Millisecond * 50) require.Len(t, service.Lines(), 5) writeAPI.Close() service.Close() - writeAPI = NewWriteAPI("my-org", "my-bucket", service, writeAPI.writeOptions.SetFlushInterval(50)) - for _, p := range points { - writeAPI.WritePoint(p) - } - require.Len(t, service.Lines(), 0) - <-time.After(time.Millisecond * 60) - require.Len(t, service.Lines(), 5) - - writeAPI.Close() } func TestRetry(t *testing.T) { diff --git a/client.go b/client.go index 0f9ebc68..9a8041e8 100644 --- a/client.go +++ b/client.go @@ -29,11 +29,13 @@ type Client interface { // and returns details about newly created entities along with the authorization object. // Retention period of zero will result to infinite retention. Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) - // Ready checks InfluxDB server is running. It doesn't validate authentication params. - Ready(ctx context.Context) (bool, error) + // Ready returns InfluxDB uptime info of server. It doesn't validate authentication params. + Ready(ctx context.Context) (*domain.Ready, error) // Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status. // Health doesn't validate authentication params. Health(ctx context.Context) (*domain.HealthCheck, error) + // Ping validates whether InfluxDB server is running. It doesn't validate authentication params. + Ping(ctx context.Context) (bool, error) // Close ensures all ongoing asynchronous write clients finish. // Also closes all idle connections, in case of HTTP client was created internally. Close() @@ -136,16 +138,20 @@ func (c *clientImpl) HTTPService() http.Service { return c.httpService } -func (c *clientImpl) Ready(ctx context.Context) (bool, error) { +func (c *clientImpl) Ready(ctx context.Context) (*domain.Ready, error) { params := &domain.GetReadyParams{} response, err := c.apiClient.GetReadyWithResponse(ctx, params) if err != nil { - return false, err + return nil, err } if response.JSONDefault != nil { - return false, domain.ErrorToHTTPError(response.JSONDefault, response.StatusCode()) + return nil, domain.ErrorToHTTPError(response.JSONDefault, response.StatusCode()) } - return true, nil + if response.JSON200 == nil { //response with status 2xx, but not JSON + return nil, errors.New("cannot read Ready response") + + } + return response.JSON200, nil } func (c *clientImpl) Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) { @@ -189,10 +195,21 @@ func (c *clientImpl) Health(ctx context.Context) (*domain.HealthCheck, error) { //unhealthy server return response.JSON503, nil } + if response.JSON200 == nil { //response with status 2xx, but not JSON + return nil, errors.New("cannot read Health response") + } return response.JSON200, nil } +func (c *clientImpl) Ping(ctx context.Context) (bool, error) { + resp, err := c.apiClient.GetPingWithResponse(ctx) + if err != nil { + return false, err + } + return resp.StatusCode() == 204, nil +} + func createKey(org, bucket string) string { return org + "\t" + bucket } diff --git a/client_e2e_test.go b/client_e2e_test.go index 7a07307c..d7b65cde 100644 --- a/client_e2e_test.go +++ b/client_e2e_test.go @@ -1,3 +1,4 @@ +//go:build e2e // +build e2e // Copyright 2020-2021 InfluxData, Inc. All rights reserved. @@ -63,13 +64,14 @@ func TestSetup(t *testing.T) { func TestReady(t *testing.T) { client := influxdb2.NewClient(serverURL, "") - ok, err := client.Ready(context.Background()) - if err != nil { - t.Error(err) - } - if !ok { - t.Fail() - } + ready, err := client.Ready(context.Background()) + require.NoError(t, err) + require.NotNil(t, ready) + require.NotNil(t, ready.Started) + assert.True(t, ready.Started.Before(time.Now())) + dur, err := time.ParseDuration(*ready.Up) + require.NoError(t, err) + assert.True(t, dur.Seconds() > 0) } func TestHealth(t *testing.T) { @@ -83,6 +85,14 @@ func TestHealth(t *testing.T) { assert.Equal(t, domain.HealthCheckStatusPass, health.Status) } +func TestPing(t *testing.T) { + client := influxdb2.NewClient(serverURL, "") + + ok, err := client.Ping(context.Background()) + require.NoError(t, err) + assert.True(t, ok) +} + func TestWrite(t *testing.T) { client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(3)) writeAPI := client.WriteAPI("my-org", "my-bucket") @@ -161,6 +171,14 @@ func TestQuery(t *testing.T) { } +func TestPingV1(t *testing.T) { + client := influxdb2.NewClient(serverV1URL, "") + + ok, err := client.Ping(context.Background()) + require.NoError(t, err) + assert.True(t, ok) +} + func TestHealthV1Compatibility(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") diff --git a/client_test.go b/client_test.go index 4066d9ab..cd0311ed 100644 --- a/client_test.go +++ b/client_test.go @@ -79,7 +79,7 @@ func TestUserAgent(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { <-time.After(100 * time.Millisecond) if r.Header.Get("User-Agent") == http2.UserAgent { - w.WriteHeader(http.StatusOK) + w.WriteHeader(http.StatusNoContent) } else { w.WriteHeader(http.StatusNotFound) } @@ -87,9 +87,9 @@ func TestUserAgent(t *testing.T) { defer server.Close() c := NewClient(server.URL, "x") - ready, err := c.Ready(context.Background()) - assert.True(t, ready) - assert.Nil(t, err) + up, err := c.Ping(context.Background()) + require.NoError(t, err) + assert.True(t, up) err = c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") assert.NoError(t, err) @@ -167,3 +167,29 @@ func TestServerErrorEmptyBody(t *testing.T) { require.Error(t, err) assert.Equal(t, "Unexpected status code 404", err.Error()) } + +func TestReadyFail(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Write([]byte(``)) + })) + + defer server.Close() + c := NewClient(server.URL, "x") + r, err := c.Ready(context.Background()) + assert.Error(t, err) + assert.Nil(t, r) +} + +func TestHealthFail(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + w.Write([]byte(``)) + })) + + defer server.Close() + c := NewClient(server.URL, "x") + h, err := c.Health(context.Background()) + assert.Error(t, err) + assert.Nil(t, h) +} diff --git a/internal/write/service_test.go b/internal/write/service_test.go index 9101ce56..a0525712 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + ilog "log" "strings" "sync" "testing" @@ -132,6 +133,7 @@ func TestRetryStrategy(t *testing.T) { func TestBufferOverwrite(t *testing.T) { log.Log.SetLogLevel(log.DebugLevel) + ilog.SetFlags(ilog.Ldate | ilog.Lmicroseconds) hs := test.NewTestService(t, "http://localhost:8086") // Buffer limit 15000, bach is 5000 => buffer for 3 batches opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000) @@ -165,7 +167,6 @@ func TestBufferOverwrite(t *testing.T) { assert.Equal(t, 3, srv.retryQueue.list.Len()) // Write early and overwrite - <-time.After(time.Millisecond) b4 := NewBatch("4\n", opts.RetryInterval(), opts.MaxRetryTime()) // No write will occur, because retry delay has not passed yet // However new bach will be added to retry queue. Retry queue has limit 3,