diff --git a/README.md b/README.md index 65d5000..91f654d 100644 --- a/README.md +++ b/README.md @@ -14,31 +14,28 @@ go get github.com/GreptimeTeam/greptimedb-client-go ## Example -you can visit [Example][example] for usage details. +you can visit [Documentation][document] for usage details and documentation, ## Usage #### Datatype Supported -- int32 -- int64 -- int (as int64) -- uint32 -- uint64 -- uint (as uint64) -- float32 -- float64 +- int8, int16, int32, int64, int +- uint8, uint16, uint32, uint64, uint +- float32, float64 - bool - []byte - string - time.Time -- int8, int16 (as int32) // not recommended -- uint8, uint16 (as uint32) // not recommended -#### Precision for Timestamp +#### Timestamp -The default precision is `Millisecond`, you can set a different precision, -once the precision is set, you can not change it anymore. +you can customize timestamp index via calling methods of [Metric][metric_doc] + +##### precision + +The default timestamp column precision is `Millisecond`, you can set a different precision. +And once the precision is setted, you can not change it any more. - time.Second - time.Millisecond @@ -49,17 +46,24 @@ once the precision is set, you can not change it anymore. metric.SetTimePrecision(time.Microsecond) ``` -#### Stream Insert +##### alias -You can send several insert request by `Send()` and notify DB no more messages by `CloseAndRecv()` +The default timestamp column name is `ts`, if you want to use another name, you +can change it: -you can visit [stream_client_test.go](stream_client_test.go) for details +```go +metric.SetTimestampAlias("timestamp") +``` #### Prometheus -We also support querying with RangePromql and Promql(TODO). +How to query with RangePromql and Promql, you can visit [promql_test.go](query_promql_test.go) for details + +#### Stream Insert + +You can send several insert requests by `Send()` and notify GreptimeDB no more messages by `CloseAndRecv()` -you can visit [promql_test.go](promql_test.go) for details +You can visit [stream_client_test.go](stream_client_test.go) for details ## License @@ -67,4 +71,5 @@ This greptimedb-client-go uses the __Apache 2.0 license__ to strike a balance between open contributions and allowing you to use the software however you want. -[example]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go#example-package +[document]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go +[metric_doc]: https://pkg.go.dev/github.com/GreptimeTeam/greptimedb-client-go#Metric diff --git a/client.go b/client.go index 81221da..8464015 100644 --- a/client.go +++ b/client.go @@ -61,8 +61,8 @@ func NewClient(cfg *Config) (*Client, error) { }, nil } -// Insert helps to insert multiple rows into greptimedb -func (c *Client) Insert(ctx context.Context, req InsertRequest) (uint32, error) { +// Insert helps to insert multiple rows of multiple tables into greptimedb +func (c *Client) Insert(ctx context.Context, req InsertsRequest) (uint32, error) { request, err := req.build(c.cfg) if err != nil { return 0, err diff --git a/client_test.go b/client_test.go index ec1c834..e138eca 100644 --- a/client_test.go +++ b/client_test.go @@ -17,7 +17,10 @@ package greptime import ( "context" "fmt" + "net/http" + "net/url" "strconv" + "strings" "testing" "time" @@ -39,14 +42,14 @@ type monitor struct { } var ( - database = "public" - host = "127.0.0.1" - port = 0 + database = "public" + host = "127.0.0.1" + grpcPort, httpPort = 4001, 4000 ) func init() { repo := "greptime/greptimedb" - tag := "0.2.0-nightly-20230328" + tag := "0.3.0-alpha" var err error pool, err := dockertest.NewPool("") @@ -63,8 +66,11 @@ func init() { resource, err := pool.RunWithOptions(&dockertest.RunOptions{ Repository: repo, Tag: tag, - ExposedPorts: []string{"4001", "4002"}, - Entrypoint: []string{"greptime", "standalone", "start", "--rpc-addr=0.0.0.0:4001", "--mysql-addr=0.0.0.0:4002"}, + ExposedPorts: []string{"4000", "4001", "4002"}, + Entrypoint: []string{"greptime", "standalone", "start", + "--http-addr=0.0.0.0:4000", + "--rpc-addr=0.0.0.0:4001", + "--mysql-addr=0.0.0.0:4002"}, }, func(config *dc.HostConfig) { // set AutoRemove to true so that stopped container goes away by itself config.AutoRemove = true @@ -85,12 +91,13 @@ func init() { log.WithError(nil).Warn("Expire container failed") } - pool.MaxWait = 60 * time.Second + pool.MaxWait = 30 * time.Second if err := pool.Retry(func() error { // TODO(vinland-avalon): some functions, like ping() to check if container is ready time.Sleep(time.Second) - port, err = strconv.Atoi(resource.GetPort(("4001/tcp"))) + httpPort, err = strconv.Atoi(resource.GetPort(("4000/tcp"))) + grpcPort, err = strconv.Atoi(resource.GetPort(("4001/tcp"))) if err != nil { return err } @@ -104,19 +111,31 @@ func newClient(t *testing.T) *Client { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } - cfg := NewCfg(host).WithPort(port).WithDatabase(database).WithDialOptions(options...) + cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, err) return client } +func createTable(t *testing.T, schema string) { + data := url.Values{} + data.Set("sql", schema) + body := strings.NewReader(data.Encode()) + uri := fmt.Sprintf("http://localhost:%d/v1/sql?db=%s", httpPort, database) + resp, err := http.DefaultClient.Post(uri, "application/x-www-form-urlencoded", body) + assert.Nil(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + defer resp.Body.Close() +} + func TestInvalidClient(t *testing.T) { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithTimeout(time.Second), } - cfg := NewCfg("invalid host").WithPort(port).WithDatabase(database).WithDialOptions(options...) + cfg := NewCfg("invalid host").WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, client) assert.NotNil(t, err) @@ -165,9 +184,11 @@ func TestInsertAndQueryWithSql(t *testing.T) { } req := InsertRequest{} - req.WithDatabase(database).WithTable(table).WithMetric(metric) + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) - n, err := client.Insert(context.Background(), req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(len(insertMonitors)), n) @@ -229,8 +250,11 @@ func TestPrecisionSecond(t *testing.T) { // We set the precision as nanosecond metric.SetTimePrecision(time.Nanosecond) req := InsertRequest{} - req.WithTable(table).WithMetric(metric).WithDatabase(database) - n, err := client.Insert(context.Background(), req) + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -279,9 +303,11 @@ func TestNilInColumn(t *testing.T) { metric.AddSeries(series2) req := InsertRequest{} - req.WithTable(table).WithMetric(metric).WithDatabase(database) + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) - n, err := client.Insert(context.Background(), req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(len(insertMonitors)), n) @@ -320,7 +346,7 @@ func TestNoNeedAuth(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), } // Client can always connect to a no-auth database, even the usernames and passwords are wrong - cfg := NewCfg(host).WithPort(port).WithDatabase(database).WithAuth("user", "pwd").WithDialOptions(options...) + cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithAuth("user", "pwd").WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, err) @@ -331,8 +357,10 @@ func TestNoNeedAuth(t *testing.T) { metric.AddSeries(series) req := InsertRequest{} - req.WithTable(table).WithMetric(metric).WithDatabase(database) - n, err := client.Insert(context.Background(), req) + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -360,7 +388,9 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) { req := InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err := client.Insert(context.Background(), req) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -373,7 +403,9 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) { req = InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err = client.Insert(context.Background(), req) + reqs = InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err = client.Insert(context.Background(), reqs) assert.NotNil(t, err) assert.ErrorContains(t, err, "Type of column count does not match type in schema, expect Int64(Int64Type), given Float64(Float64Type)") } @@ -392,7 +424,9 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) { req := InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err := client.Insert(context.Background(), req) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -406,7 +440,9 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) { req = InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err = client.Insert(context.Background(), req) + reqs = InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err = client.Insert(context.Background(), reqs) assert.NotNil(t, err) assert.ErrorContains(t, err, "Type of column ts does not match type in schema, expect Timestamp(Second(TimestampSecondType)), given Timestamp(Millisecond(TimestampMillisecondType))") } @@ -426,7 +462,9 @@ func TestGetNonMatchedTypeColumn(t *testing.T) { req := InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err := client.Insert(context.Background(), req) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -478,7 +516,9 @@ func TestGetNotExistColumn(t *testing.T) { req := InsertRequest{} req.WithTable(table).WithMetric(metric) - n, err := client.Insert(context.Background(), req) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -530,9 +570,9 @@ func TestDataTypes(t *testing.T) { float64V float64 float32V float32 stringV string - byteV []byte - boolV bool - timeV time.Time + // byteV []byte + boolV bool + timeV time.Time } data := datatype{ @@ -549,9 +589,9 @@ func TestDataTypes(t *testing.T) { float64V: 64.0, float32V: 32.0, stringV: "string", - byteV: []byte("byte"), - boolV: true, - timeV: time.UnixMilli(1677728740012), + // byteV: []byte("byte"), + boolV: true, + timeV: time.UnixMilli(1677728740012), } client := newClient(t) @@ -595,9 +635,9 @@ func TestDataTypes(t *testing.T) { assert.Nil(t, series.AddStringTag("string_v_tag", data.stringV)) assert.Nil(t, series.AddStringField("string_v_field", data.stringV)) - // bytes - assert.Nil(t, series.AddBytesTag("byte_v_tag", data.byteV)) - assert.Nil(t, series.AddBytesField("byte_v_field", data.byteV)) + // TODO(yuanbohan): support []byte + // assert.Nil(t, series.AddBytesTag("byte_v_tag", data.byteV)) + // assert.Nil(t, series.AddBytesField("byte_v_field", data.byteV)) // bool assert.Nil(t, series.AddBoolTag("bool_v_tag", data.boolV)) @@ -607,9 +647,11 @@ func TestDataTypes(t *testing.T) { metric.AddSeries(series) req := InsertRequest{} - req.WithTable(table).WithMetric(metric).WithDatabase(database) + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) - n, err := client.Insert(context.Background(), req) + n, err := client.Insert(context.Background(), reqs) assert.Nil(t, err) assert.Equal(t, uint32(1), n) @@ -687,11 +729,10 @@ func TestDataTypes(t *testing.T) { assert.True(t, ok) // bytes - byteV, ok := series.GetBytes("byte_v_tag") - assert.True(t, ok) - - _, ok = series.GetBytes("byte_v_field") - assert.True(t, ok) + // byteV, ok := series.GetBytes("byte_v_tag") + // assert.True(t, ok) + // _, ok = series.GetBytes("byte_v_field") + // assert.True(t, ok) // bool boolV, ok := series.GetBool("bool_v_tag") @@ -716,9 +757,183 @@ func TestDataTypes(t *testing.T) { float64V: float64V, float32V: float32(float32V), stringV: stringV, - byteV: byteV, - boolV: boolV, - timeV: timeV, + // byteV: byteV, + boolV: boolV, + timeV: timeV, + } + assert.Equal(t, data, querydata) +} + +func TestCreateTableInAdvance(t *testing.T) { + table := "create_datatypes_table_in_advance" + schema := "CREATE TABLE " + table + " (" + + " id varchar," + + " i64 bigint," + + " i32 int," + + " i16 smallint," + + " i8 tinyint," + + " u64 bigint unsigned," + + " u32 int unsigned," + + " u16 smallint unsigned," + + " u8 tinyint unsigned," + + " f32 float," + + " f64 double," + + " bool boolean," + + // TODO(yuanbohan): support []byte + // " bytes varbinary," + + " times TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP," + + " TIME INDEX (times)," + + " PRIMARY KEY(id))" + createTable(t, schema) + + type datatype struct { + id string + i64 int64 + i32 int32 + i16 int16 + i8 int8 + u64 uint64 + u32 uint32 + u16 uint16 + u8 uint8 + f64 float64 + f32 float32 + bool bool + // bytes []byte + } + + now := time.Now() + data := datatype{ + id: "test", + i64: 64, + i32: 32, + i16: 16, + i8: 8, + u64: 64, + u32: 32, + u16: 16, + u8: 8, + f64: 64.0, + f32: 32.0, + // bytes: []byte("byte"), + bool: true, + } + + client := newClient(t) + + series := Series{} + + // string + assert.Nil(t, series.AddTag("id", data.id)) + + // int + assert.Nil(t, series.AddField("i64", data.i64)) + assert.Nil(t, series.AddField("i32", data.i32)) + assert.Nil(t, series.AddField("i16", data.i16)) + assert.Nil(t, series.AddField("i8", data.i8)) + + // uint + assert.Nil(t, series.AddField("u64", data.u64)) + assert.Nil(t, series.AddField("u32", data.u32)) + assert.Nil(t, series.AddField("u16", data.u16)) + assert.Nil(t, series.AddField("u8", data.u8)) + + // float + assert.Nil(t, series.AddField("f64", data.f64)) + assert.Nil(t, series.AddField("f32", data.f32)) + + // TODO(yuanbohan): support []byte + // assert.Nil(t, series.AddField("bytes", data.bytes)) + + // bool + assert.Nil(t, series.AddBoolField("bool", data.bool)) + + assert.Nil(t, series.SetTimestamp(now)) + + // Insert + metric := Metric{} + metric.SetTimestampAlias("times") + metric.SetTimePrecision(time.Second) + metric.AddSeries(series) + + req := InsertRequest{} + req.WithTable(table).WithMetric(metric) + reqs := InsertsRequest{} + reqs.WithDatabase(database).Insert(req) + + n, err := client.Insert(context.Background(), reqs) + assert.Nil(t, err) + assert.Equal(t, uint32(1), n) + + // Query with metric + queryReq := QueryRequest{} + queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database) + + resMetric, err := client.Query(context.Background(), queryReq) + assert.Nil(t, err) + assert.Equal(t, 1, len(resMetric.GetSeries())) + + series = resMetric.GetSeries()[0] + + // int + int64V, ok := series.Get("i64") + assert.True(t, ok) + int32V, ok := series.Get("i32") + assert.True(t, ok) + int16V, ok := series.Get("i16") + assert.True(t, ok) + int8V, ok := series.Get("i8") + assert.True(t, ok) + + // uint + uint64V, ok := series.Get("u64") + assert.True(t, ok) + uint32V, ok := series.Get("u32") + assert.True(t, ok) + uint16V, ok := series.Get("u16") + assert.True(t, ok) + uint8V, ok := series.Get("u8") + assert.True(t, ok) + + // float + float64V, ok := series.Get("f64") + assert.True(t, ok) + float32V, ok := series.Get("f32") + assert.True(t, ok) + + // string + stringV, ok := series.Get("id") + assert.True(t, ok) + + // TODO(yuanbohan): support []byte + // byteV, ok := series.Get("bytes") + // assert.True(t, ok) + + // bool + boolV, ok := series.Get("bool") + assert.True(t, ok) + + querydata := datatype{ + id: stringV.(string), + + i64: int64V.(int64), + i32: int32V.(int32), + i16: int16V.(int16), + i8: int8V.(int8), + + u64: uint64V.(uint64), + u32: uint32V.(uint32), + u16: uint16V.(uint16), + u8: uint8V.(uint8), + + f64: float64V.(float64), + f32: float32V.(float32), + + // bytes: byteV.([]byte), + bool: boolV.(bool), } assert.Equal(t, data, querydata) + + timeV := series.GetTimestamp() + assert.Equal(t, now.Unix(), timeV.Unix()) } diff --git a/doc.go b/doc.go index 48ca986..0ec3c2a 100644 --- a/doc.go +++ b/doc.go @@ -18,7 +18,7 @@ // // You can call [NewClient] with [Config] to init a concurrent safe [Client], and // construct rows of data by [Metric] and [Series], call [Client.Insert] to insert -// [InsertRequest] into greptimedb, and call [Client.Query] to retrieve data from +// [InsertsRequest] into greptimedb, and call [Client.Query] to retrieve data from // greptimedb via [QueryRequest]. // // # Promql diff --git a/doc_test.go b/doc_test.go index 68a3d63..cf58a31 100644 --- a/doc_test.go +++ b/doc_test.go @@ -28,9 +28,9 @@ import ( var ( client *greptime.Client - // table used in this Example, you don't have to create it in advance, - // if the table not exist, it will be created automatically. - table string = "monitor" + // monitorTable used in this Example, you don't have to create it in advance, + // if the monitorTable not exist, it will be created automatically. + monitorTable string = "monitor" ) // initClient creates a client with config. @@ -65,7 +65,7 @@ func initClient() { // - series.AddField // - series.AddXxxTag // - series.AddXxxField -func insert() { +func constructInsertRequest(table string) greptime.InsertRequest { series := greptime.Series{} series.AddTag("region", "az") // type is checked automatically series.AddStringTag("host", "localhost") // type is specified by user @@ -79,11 +79,27 @@ func insert() { // Create an InsertRequest using fluent style // the specified table will be created automatically if it's not exist insertRequest := greptime.InsertRequest{} + insertRequest.WithTable(table).WithMetric(metric) + + return insertRequest +} + +func insert() { + insertsRequest := greptime.InsertsRequest{} + // You can insert data of different tables into greptimedb in one InsertsRequest. + // This insertsRequest includes two InsertRequest of two different tables + insertsRequest. + Insert(constructInsertRequest(monitorTable)). + Insert(constructInsertRequest("temperatures")) + + // if you want to insert into different table in one request, you can construct + // another InsertRequest, and include it via: insertsRequest.Insert(insertRequest) + // if you want to specify another database, you can specify it via: `WithDatabase(database)` - insertRequest.WithTable(table).WithMetric(metric) // .WithDatabase(database) + // insertsRequest.WithDatabase("your database") - // Fire the real Insert request and Get the affected number of rows - n, err := client.Insert(context.Background(), insertRequest) + // Fire the real Inserts request and Get the affected number of rows + n, err := client.Insert(context.Background(), insertsRequest) if err != nil { fmt.Printf("fail to insert, err: %+v\n", err) return @@ -109,7 +125,7 @@ func queryViaSql() { req := greptime.QueryRequest{} // if you want to specify another database, you can specify it via: `WithDatabase(database)` - req.WithSql("SELECT * FROM " + table) // .WithDatabase(database) + req.WithSql("SELECT * FROM " + monitorTable) // .WithDatabase(database) resMetric, err := client.Query(context.Background(), req) if err != nil { @@ -135,7 +151,7 @@ func queryViaSql() { // the response format is in []byte, and is absolutely the same as Prometheus func queryViaInstantPromql() { - promql := greptime.NewInstantPromql(table) + promql := greptime.NewInstantPromql(monitorTable) req := greptime.QueryRequest{} req.WithInstantPromql(promql) resp, err := client.PromqlQuery(context.Background(), req) @@ -158,7 +174,7 @@ func queryViaInstantPromql() { func queryViaRangePromql() { end := time.Now() start := end.Add(time.Duration(-15) * time.Second) // 15 seconds before - promql := greptime.NewRangePromql(table).WithStart(start).WithEnd(end).WithStep(time.Second) + promql := greptime.NewRangePromql(monitorTable).WithStart(start).WithEnd(end).WithStep(time.Second) req := greptime.QueryRequest{} req.WithRangePromql(promql) resp, err := client.PromqlQuery(context.Background(), req) diff --git a/errors.go b/errors.go index 7863ef2..bde0286 100644 --- a/errors.go +++ b/errors.go @@ -21,6 +21,7 @@ import ( var ( ErrEmptyDatabase = errors.New("name of database should not be empty") ErrEmptyTable = errors.New("name of table should not be be empty") + ErrEmptyInserts = errors.New("at least one insert is required in InsertsRequest") ErrEmptyTimestamp = errors.New("timestamp should not be empty") ErrEmptyQuery = errors.New("query should not be empty, assign Sql, InstantPromql or RangePromql") ErrEmptyKey = errors.New("key should not be empty") diff --git a/go.mod b/go.mod index 814ddee..639edd0 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,14 @@ module github.com/GreptimeTeam/greptimedb-client-go go 1.19 require ( - github.com/GreptimeTeam/greptime-proto v0.0.0-20230424095535-a26c40c004f9 + github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5 github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 github.com/bits-and-blooms/bitset v1.7.0 - github.com/prometheus/common v0.42.0 + github.com/prometheus/common v0.44.0 github.com/sirupsen/logrus v1.8.1 github.com/stoewer/go-strcase v1.3.0 github.com/stretchr/testify v1.8.1 - google.golang.org/grpc v1.54.0 + google.golang.org/grpc v1.55.0 google.golang.org/protobuf v1.30.0 ) @@ -27,7 +27,7 @@ require ( github.com/docker/go-units v0.4.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/flatbuffers v23.3.3+incompatible // indirect + github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/klauspost/compress v1.16.5 // indirect @@ -44,11 +44,11 @@ require ( github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect - golang.org/x/net v0.9.0 // indirect - golang.org/x/sys v0.7.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8533421..dc2f055 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7O github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/GreptimeTeam/greptime-proto v0.0.0-20230424095535-a26c40c004f9 h1:8xbnaDg+hYGkv6Wvr7dEYW+E2X9TtbDJCtRWGMGjE+A= -github.com/GreptimeTeam/greptime-proto v0.0.0-20230424095535-a26c40c004f9/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE= +github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5 h1:kzy6xkRbXs9NBSJiv/OPz89tAkSTx1zdihneosBxiCE= +github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE= github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= @@ -90,8 +90,8 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v23.3.3+incompatible h1:5PJI/WbJkaMTvpGxsHVKG/LurN/KnWXNyGpwSCDgen0= -github.com/google/flatbuffers v23.3.3+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= +github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -150,11 +150,11 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= @@ -233,8 +233,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -264,8 +264,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -307,8 +307,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -317,8 +317,8 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= -google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= -google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= +google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/insert.go b/insert.go index 371fb64..277b09e 100644 --- a/insert.go +++ b/insert.go @@ -18,9 +18,62 @@ import ( greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" ) +type InsertsRequest struct { + header header + inserts []InsertRequest +} + +// WithDatabase helps to specify different database from the default one. +func (r *InsertsRequest) WithDatabase(database string) *InsertsRequest { + r.header = header{ + database: database, + } + return r +} + +// Insert will include one insert into this InsertsRequest +func (r *InsertsRequest) Insert(insert InsertRequest) *InsertsRequest { + if r.inserts == nil { + r.inserts = make([]InsertRequest, 0) + } + + r.inserts = append(r.inserts, insert) + + return r +} + +func (r InsertsRequest) build(cfg *Config) (*greptimepb.GreptimeRequest, error) { + header, err := r.header.build(cfg) + if err != nil { + return nil, err + } + + if len(r.inserts) == 0 { + return nil, ErrEmptyInserts + } + + reqs := make([]*greptimepb.InsertRequest, 0, len(r.inserts)) + for _, insert := range r.inserts { + req, err := insert.build() + if err != nil { + return nil, err + } + reqs = append(reqs, req) + } + + req := greptimepb.GreptimeRequest_Inserts{ + Inserts: &greptimepb.InsertRequests{Inserts: reqs}, + } + + return &greptimepb.GreptimeRequest{ + Header: header, + Request: &req, + }, nil + +} + // InsertRequest insert metric to specified table. You can also specify the database in header. type InsertRequest struct { - header header table string metric Metric } @@ -35,24 +88,11 @@ func (r *InsertRequest) WithMetric(metric Metric) *InsertRequest { return r } -// WithDatabase helps to specify different database from the default one. -func (r *InsertRequest) WithDatabase(database string) *InsertRequest { - r.header = header{ - database: database, - } - return r -} - func (r *InsertRequest) RowCount() uint32 { return uint32(len(r.metric.series)) } -func (r *InsertRequest) build(cfg *Config) (*greptimepb.GreptimeRequest, error) { - header, err := r.header.build(cfg) - if err != nil { - return nil, err - } - +func (r *InsertRequest) build() (*greptimepb.InsertRequest, error) { if isEmptyString(r.table) { return nil, ErrEmptyTable } @@ -62,13 +102,10 @@ func (r *InsertRequest) build(cfg *Config) (*greptimepb.GreptimeRequest, error) return nil, err } - req := greptimepb.GreptimeRequest_Insert{ - Insert: &greptimepb.InsertRequest{ - TableName: r.table, - Columns: columns, - RowCount: r.RowCount(), - RegionNumber: 0, - }, - } - return &greptimepb.GreptimeRequest{Header: header, Request: &req}, nil + return &greptimepb.InsertRequest{ + TableName: r.table, + Columns: columns, + RowCount: r.RowCount(), + RegionNumber: 0, + }, nil } diff --git a/metric.go b/metric.go index 47e843f..c5ab507 100644 --- a/metric.go +++ b/metric.go @@ -146,16 +146,36 @@ func fromColumn(column array.Interface, idx int) (any, error) { return typedColumn.Value(idx), nil case *array.Int32: return typedColumn.Value(idx), nil + case *array.Int16: + return typedColumn.Value(idx), nil + case *array.Int8: + return typedColumn.Value(idx), nil case *array.Uint64: return typedColumn.Value(idx), nil case *array.Uint32: return typedColumn.Value(idx), nil + case *array.Uint16: + return typedColumn.Value(idx), nil + case *array.Uint8: + return typedColumn.Value(idx), nil case *array.Float64: return typedColumn.Value(idx), nil + case *array.Float32: + return typedColumn.Value(idx), nil case *array.String: return typedColumn.Value(idx), nil case *array.Boolean: return typedColumn.Value(idx), nil + case *array.Binary: + return typedColumn.Value(idx), nil + case *array.Time32: + return typedColumn.Value(idx), nil + case *array.Time64: + return typedColumn.Value(idx), nil + case *array.Date32: + return typedColumn.Value(idx), nil + case *array.Date64: + return typedColumn.Value(idx), nil case *array.Timestamp: dataType, ok := column.DataType().(*arrow.TimestampType) if !ok { diff --git a/metric_test.go b/metric_test.go index 9d08e50..af9fcc8 100644 --- a/metric_test.go +++ b/metric_test.go @@ -137,9 +137,9 @@ func TestGreptimeColumn(t *testing.T) { col5 := cols[4] assert.Equal(t, "field1", col5.ColumnName) - assert.Equal(t, greptimepb.ColumnDataType_UINT32, col5.Datatype) + assert.Equal(t, greptimepb.ColumnDataType_UINT8, col5.Datatype) assert.Equal(t, greptimepb.Column_FIELD, col5.SemanticType) - assert.Equal(t, []uint32{8, 8}, col5.Values.U32Values) + assert.Equal(t, []uint32{8, 8}, col5.Values.U8Values) assert.Empty(t, col5.NullMask) col6 := cols[5] @@ -151,16 +151,16 @@ func TestGreptimeColumn(t *testing.T) { col7 := cols[6] assert.Equal(t, "field_name3", col7.ColumnName) - assert.Equal(t, greptimepb.ColumnDataType_STRING, col7.Datatype) + assert.Equal(t, greptimepb.ColumnDataType_BINARY, col7.Datatype) assert.Equal(t, greptimepb.Column_FIELD, col7.SemanticType) - assert.Equal(t, []string{"field3"}, col7.Values.StringValues) + assert.Equal(t, [][]byte{[]byte("field3")}, col7.Values.BinaryValues) assert.Equal(t, []byte{1}, col7.NullMask) col8 := cols[7] assert.Equal(t, "field_name4", col8.ColumnName) - assert.Equal(t, greptimepb.ColumnDataType_FLOAT64, col8.Datatype) + assert.Equal(t, greptimepb.ColumnDataType_FLOAT32, col8.Datatype) assert.Equal(t, greptimepb.Column_FIELD, col8.SemanticType) - assert.Equal(t, []float64{32}, col8.Values.F64Values) + assert.Equal(t, []float32{32}, col8.Values.F32Values) assert.Equal(t, []byte{1}, col8.NullMask) col9 := cols[8] diff --git a/prom/api.go b/prom/api.go index 8cb8d69..5594c40 100644 --- a/prom/api.go +++ b/prom/api.go @@ -66,7 +66,7 @@ func (qr *QueryResult) UnmarshalJSON(b []byte) error { qr.Val = mv default: - err = fmt.Errorf("unexpected value type %q", v.Type) + err = fmt.Errorf("unexpected value type '%s' with data: '%s'", v.Type.String(), string(b)) } return err } diff --git a/query_promql_test.go b/query_promql_test.go index d03ac8a..d53513f 100644 --- a/query_promql_test.go +++ b/query_promql_test.go @@ -59,7 +59,7 @@ func TestInsertAndQueryWithRangePromQL(t *testing.T) { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } - cfg := NewCfg(host).WithPort(port).WithDatabase(database).WithDialOptions(options...) + cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...) client, err := NewClient(cfg) assert.Nil(t, err) @@ -78,9 +78,12 @@ func TestInsertAndQueryWithRangePromQL(t *testing.T) { } insertReq := InsertRequest{} - insertReq.WithDatabase(database).WithTable(table).WithMetric(metric) + insertReq.WithTable(table).WithMetric(metric) - n, err := client.Insert(context.Background(), insertReq) + insertsReq := InsertsRequest{} + insertsReq.WithDatabase(database).Insert(insertReq) + + n, err := client.Insert(context.Background(), insertsReq) assert.Nil(t, err) assert.Equal(t, uint32(len(insertMonitors)), n) diff --git a/request_test.go b/request_test.go index be887dd..a2265e0 100644 --- a/request_test.go +++ b/request_test.go @@ -94,22 +94,44 @@ func TestQueryBuildPromqlRequest(t *testing.T) { } func TestInsertBuilder(t *testing.T) { + cfg := &Config{} r := InsertRequest{} - // empty database - req, err := r.build(&Config{}) - assert.Equal(t, ErrEmptyDatabase, err) - assert.Nil(t, req) - // empty table - r.header = header{"public"} - req, err = r.build(&Config{}) + req, err := r.build() assert.Equal(t, ErrEmptyTable, err) assert.Nil(t, req) // empty series r.WithTable("monitor") - req, err = r.build(&Config{}) + req, err = r.build() assert.Equal(t, ErrNoSeriesInMetric, err) assert.Nil(t, req) + + series := Series{} + series.AddTag("host", "fake host") + series.AddField("memory", 2.3) + series.SetTimestamp(time.Now()) + metric := Metric{} + metric.AddSeries(series) + r.WithMetric(metric) + + rs := InsertsRequest{} + + // empty database + reqs, err := rs.build(cfg) + assert.Equal(t, ErrEmptyDatabase, err) + assert.Nil(t, reqs) + + // empty inserts + rs.WithDatabase("public") + reqs, err = rs.build(cfg) + assert.Equal(t, ErrEmptyInserts, err) + assert.Nil(t, reqs) + + // normal + rs.Insert(r) + reqs, err = rs.build(cfg) + assert.Nil(t, err) + assert.NotNil(t, reqs) } diff --git a/series.go b/series.go index e52179f..2494f6f 100644 --- a/series.go +++ b/series.go @@ -180,12 +180,13 @@ func (s *Series) GetString(key string) (string, bool) { } func (s *Series) GetBytes(key string) ([]byte, bool) { - val, exist := s.GetString(key) + val, exist := s.Get(key) if !exist { return nil, exist } - return []byte(val), true + v, ok := val.([]byte) + return v, ok } // GetTimestamp get timestamp field diff --git a/series_test.go b/series_test.go index 8fe7ad5..40c9324 100644 --- a/series_test.go +++ b/series_test.go @@ -46,11 +46,11 @@ func TestSeries(t *testing.T) { assert.Equal(t, greptimepb.Column_TAG, s.columns["tag3"].semantic) assert.Equal(t, greptimepb.ColumnDataType_FLOAT64, s.columns["tag4"].typ) assert.Equal(t, greptimepb.Column_TAG, s.columns["tag4"].semantic) - assert.Equal(t, greptimepb.ColumnDataType_STRING, s.columns["field1"].typ) + assert.Equal(t, greptimepb.ColumnDataType_BINARY, s.columns["field1"].typ) assert.Equal(t, greptimepb.Column_FIELD, s.columns["field1"].semantic) - assert.Equal(t, greptimepb.ColumnDataType_FLOAT64, s.columns["field2"].typ) + assert.Equal(t, greptimepb.ColumnDataType_FLOAT32, s.columns["field2"].typ) assert.Equal(t, greptimepb.Column_FIELD, s.columns["field2"].semantic) - assert.Equal(t, greptimepb.ColumnDataType_UINT32, s.columns["field3"].typ) + assert.Equal(t, greptimepb.ColumnDataType_UINT8, s.columns["field3"].typ) assert.Equal(t, greptimepb.Column_FIELD, s.columns["field3"].semantic) assert.Equal(t, greptimepb.ColumnDataType_UINT64, s.columns["field4"].typ) assert.Equal(t, greptimepb.Column_FIELD, s.columns["field4"].semantic) @@ -61,9 +61,9 @@ func TestSeries(t *testing.T) { assert.Equal(t, true, s.vals["tag2"]) assert.Equal(t, int32(32), s.vals["tag3"]) assert.Equal(t, float64(32.0), s.vals["tag4"]) - assert.Equal(t, "field val", s.vals["field1"]) - assert.Equal(t, float64(32.0), s.vals["field2"]) - assert.Equal(t, uint32(8), s.vals["field3"]) + assert.Equal(t, []byte("field val"), s.vals["field1"]) + assert.Equal(t, float32(32.0), s.vals["field2"]) + assert.Equal(t, uint8(8), s.vals["field3"]) assert.Equal(t, uint64(64), s.vals["field4"]) // check timestamp diff --git a/stream_client.go b/stream_client.go index eb0c9b2..e3c4865 100644 --- a/stream_client.go +++ b/stream_client.go @@ -43,7 +43,7 @@ func NewStreamClient(cfg *Config) (*StreamClient, error) { return &StreamClient{client: client, cfg: cfg}, nil } -func (c *StreamClient) Send(ctx context.Context, req InsertRequest) error { +func (c *StreamClient) Send(ctx context.Context, req InsertsRequest) error { request, err := req.build(c.cfg) if err != nil { return err diff --git a/stream_client_test.go b/stream_client_test.go index 0b08aae..3c6d422 100644 --- a/stream_client_test.go +++ b/stream_client_test.go @@ -52,7 +52,7 @@ func TestStreamInsert(t *testing.T) { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } - cfg := NewCfg(host).WithPort(port).WithDatabase(database).WithDialOptions(options...).WithCallOptions() + cfg := NewCfg(host).WithPort(grpcPort).WithDatabase(database).WithDialOptions(options...).WithCallOptions() streamClient, err := NewStreamClient(cfg) assert.Nil(t, err) @@ -72,7 +72,11 @@ func TestStreamInsert(t *testing.T) { req := InsertRequest{} req.WithTable(table).WithMetric(metric) - err = streamClient.Send(context.Background(), req) + + reqs := InsertsRequest{} + reqs.Insert(req) + + err = streamClient.Send(context.Background(), reqs) assert.Nil(t, err) } diff --git a/util.go b/util.go index 8b09a6e..14e6c2b 100644 --- a/util.go +++ b/util.go @@ -39,11 +39,11 @@ func convert(v any) (*value, error) { case string: return newValue(t, greptimepb.ColumnDataType_STRING), nil case []byte: - return newValue(string(t), greptimepb.ColumnDataType_STRING), nil + return newValue(t, greptimepb.ColumnDataType_BINARY), nil case float64: return newValue(t, greptimepb.ColumnDataType_FLOAT64), nil case float32: - return newValue(float64(t), greptimepb.ColumnDataType_FLOAT64), nil + return newValue(t, greptimepb.ColumnDataType_FLOAT32), nil case uint: return newValue(uint64(t), greptimepb.ColumnDataType_UINT64), nil case uint64: @@ -51,9 +51,9 @@ func convert(v any) (*value, error) { case uint32: return newValue(t, greptimepb.ColumnDataType_UINT32), nil case uint16: - return newValue(uint32(t), greptimepb.ColumnDataType_UINT32), nil + return newValue(t, greptimepb.ColumnDataType_UINT16), nil case uint8: - return newValue(uint32(t), greptimepb.ColumnDataType_UINT32), nil + return newValue(t, greptimepb.ColumnDataType_UINT8), nil case int: return newValue(int64(t), greptimepb.ColumnDataType_INT64), nil case int64: @@ -61,9 +61,9 @@ func convert(v any) (*value, error) { case int32: return newValue(t, greptimepb.ColumnDataType_INT32), nil case int16: - return newValue(int32(t), greptimepb.ColumnDataType_INT32), nil + return newValue(t, greptimepb.ColumnDataType_INT16), nil case int8: - return newValue(int32(t), greptimepb.ColumnDataType_INT32), nil + return newValue(t, greptimepb.ColumnDataType_INT8), nil // TODO(vinland-avalon): convert with different precision case time.Time: return newValue(t.UnixMilli(), greptimepb.ColumnDataType_TIMESTAMP_MILLISECOND), nil @@ -73,11 +73,11 @@ func convert(v any) (*value, error) { case *string: return newValue(*t, greptimepb.ColumnDataType_STRING), nil case *[]byte: - return newValue(string(*t), greptimepb.ColumnDataType_STRING), nil + return newValue(*t, greptimepb.ColumnDataType_BINARY), nil case *float64: return newValue(*t, greptimepb.ColumnDataType_FLOAT64), nil case *float32: - return newValue(float64(*t), greptimepb.ColumnDataType_FLOAT64), nil + return newValue(*t, greptimepb.ColumnDataType_FLOAT32), nil case *uint: return newValue(uint64(*t), greptimepb.ColumnDataType_UINT64), nil case *uint64: @@ -85,9 +85,9 @@ func convert(v any) (*value, error) { case *uint32: return newValue(*t, greptimepb.ColumnDataType_UINT32), nil case *uint16: - return newValue(uint32(*t), greptimepb.ColumnDataType_UINT32), nil + return newValue(*t, greptimepb.ColumnDataType_UINT16), nil case *uint8: - return newValue(uint32(*t), greptimepb.ColumnDataType_UINT32), nil + return newValue(*t, greptimepb.ColumnDataType_UINT8), nil case *int: return newValue(int64(*t), greptimepb.ColumnDataType_INT64), nil case *int64: @@ -95,9 +95,9 @@ func convert(v any) (*value, error) { case *int32: return newValue(*t, greptimepb.ColumnDataType_INT32), nil case *int16: - return newValue(int32(*t), greptimepb.ColumnDataType_INT32), nil + return newValue(*t, greptimepb.ColumnDataType_INT16), nil case *int8: - return newValue(int32(*t), greptimepb.ColumnDataType_INT32), nil + return newValue(*t, greptimepb.ColumnDataType_INT8), nil // TODO(vinland-avalon): convert with different precision, as `time.Time` abovementioned case *time.Time: return newValue(t.UnixMilli(), greptimepb.ColumnDataType_TIMESTAMP_MILLISECOND), nil diff --git a/util_test.go b/util_test.go index 2e65fe5..8fd1826 100644 --- a/util_test.go +++ b/util_test.go @@ -43,8 +43,8 @@ func TestConvertValue(t *testing.T) { var expectBytes []byte = []byte("bytes") val, err = convert(expectBytes) assert.Nil(t, err) - assert.Equal(t, "bytes", val.val) - assert.Equal(t, greptime.ColumnDataType_STRING, val.typ) + assert.Equal(t, []byte("bytes"), val.val) + assert.Equal(t, greptime.ColumnDataType_BINARY, val.typ) // float64 var expectFloat64 float64 = float64(64.0) @@ -54,12 +54,11 @@ func TestConvertValue(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) // float32 - var originFloat32 float32 = float32(32.0) - var expectFloat32 float64 = float64(32.0) - val, err = convert(originFloat32) + var expectFloat32 float32 = float32(32.0) + val, err = convert(expectFloat32) assert.Nil(t, err) assert.Equal(t, expectFloat32, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) + assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.typ) // uint var originUint uint = uint(64) @@ -70,36 +69,32 @@ func TestConvertValue(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) // uint64 - var originUint64 uint64 = uint64(64) var expectUint64 uint64 = uint64(64) - val, err = convert(originUint64) + val, err = convert(expectUint64) assert.Nil(t, err) assert.Equal(t, expectUint64, val.val) assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) // uint32 - var originUint32 uint32 = uint32(32) var expectUint32 uint32 = uint32(32) - val, err = convert(originUint32) + val, err = convert(expectUint32) assert.Nil(t, err) assert.Equal(t, expectUint32, val.val) assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) // uint16 - var originUint16 uint16 = uint16(16) - var expectUint16 uint32 = uint32(16) - val, err = convert(originUint16) + var expectUint16 uint16 = uint16(16) + val, err = convert(expectUint16) assert.Nil(t, err) assert.Equal(t, expectUint16, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_UINT16, val.typ) // uint8 - var originUint8 uint8 = uint8(8) - var expectUint8 uint32 = uint32(8) - val, err = convert(originUint8) + var expectUint8 uint8 = uint8(8) + val, err = convert(expectUint8) assert.Nil(t, err) assert.Equal(t, expectUint8, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_UINT8, val.typ) // int var originInt int = int(64) @@ -110,36 +105,32 @@ func TestConvertValue(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) // int64 - var originInt64 int64 = int64(64) var expectInt64 int64 = int64(64) - val, err = convert(originInt64) + val, err = convert(expectInt64) assert.Nil(t, err) assert.Equal(t, expectInt64, val.val) assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) // int32 - var originInt32 int32 = int32(32) var expectInt32 int32 = int32(32) - val, err = convert(originInt32) + val, err = convert(expectInt32) assert.Nil(t, err) assert.Equal(t, expectInt32, val.val) assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) // int16 - var originInt16 int16 = int16(16) - var expectInt16 int32 = int32(16) - val, err = convert(originInt16) + var expectInt16 int16 = int16(16) + val, err = convert(expectInt16) assert.Nil(t, err) assert.Equal(t, expectInt16, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_INT16, val.typ) // int8 - var originInt8 int8 = int8(8) - var expectInt8 int32 = int32(8) - val, err = convert(originInt8) + var expectInt8 int8 = int8(8) + val, err = convert(expectInt8) assert.Nil(t, err) assert.Equal(t, expectInt8, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_INT8, val.typ) // time.Time var originTime time.Time = time.UnixMilli(1677571339623) @@ -178,8 +169,8 @@ func TestConvertValuePtr(t *testing.T) { var expectBytes []byte = []byte("bytes") val, err = convert(&expectBytes) assert.Nil(t, err) - assert.Equal(t, "bytes", val.val) - assert.Equal(t, greptime.ColumnDataType_STRING, val.typ) + assert.Equal(t, []byte("bytes"), val.val) + assert.Equal(t, greptime.ColumnDataType_BINARY, val.typ) // float64 var expectFloat64 float64 = float64(64.0) @@ -189,12 +180,11 @@ func TestConvertValuePtr(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) // float32 - var originFloat32 float32 = float32(32.0) - var expectFloat32 float64 = float64(32.0) - val, err = convert(&originFloat32) + var expectFloat32 float32 = float32(32.0) + val, err = convert(&expectFloat32) assert.Nil(t, err) assert.Equal(t, expectFloat32, val.val) - assert.Equal(t, greptime.ColumnDataType_FLOAT64, val.typ) + assert.Equal(t, greptime.ColumnDataType_FLOAT32, val.typ) // uint var originUint uint = uint(64) @@ -205,36 +195,32 @@ func TestConvertValuePtr(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) // uint64 - var originUint64 uint64 = uint64(64) var expectUint64 uint64 = uint64(64) - val, err = convert(&originUint64) + val, err = convert(&expectUint64) assert.Nil(t, err) assert.Equal(t, expectUint64, val.val) assert.Equal(t, greptime.ColumnDataType_UINT64, val.typ) // uint32 - var originUint32 uint32 = uint32(32) var expectUint32 uint32 = uint32(32) - val, err = convert(&originUint32) + val, err = convert(&expectUint32) assert.Nil(t, err) assert.Equal(t, expectUint32, val.val) assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) // uint16 - var originUint16 uint16 = uint16(16) - var expectUint16 uint32 = uint32(16) - val, err = convert(&originUint16) + var expectUint16 uint16 = uint16(16) + val, err = convert(&expectUint16) assert.Nil(t, err) assert.Equal(t, expectUint16, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_UINT16, val.typ) // uint8 - var originUint8 uint8 = uint8(8) - var expectUint8 uint32 = uint32(8) - val, err = convert(&originUint8) + var expectUint8 uint8 = uint8(8) + val, err = convert(&expectUint8) assert.Nil(t, err) assert.Equal(t, expectUint8, val.val) - assert.Equal(t, greptime.ColumnDataType_UINT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_UINT8, val.typ) // int var originInt int = int(64) @@ -245,36 +231,32 @@ func TestConvertValuePtr(t *testing.T) { assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) // int64 - var originInt64 int64 = int64(64) var expectInt64 int64 = int64(64) - val, err = convert(&originInt64) + val, err = convert(&expectInt64) assert.Nil(t, err) assert.Equal(t, expectInt64, val.val) assert.Equal(t, greptime.ColumnDataType_INT64, val.typ) // int32 - var originInt32 int32 = int32(32) var expectInt32 int32 = int32(32) - val, err = convert(&originInt32) + val, err = convert(&expectInt32) assert.Nil(t, err) assert.Equal(t, expectInt32, val.val) assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) // int16 - var originInt16 int16 = int16(16) - var expectInt16 int32 = int32(16) - val, err = convert(&originInt16) + var expectInt16 int16 = int16(16) + val, err = convert(&expectInt16) assert.Nil(t, err) assert.Equal(t, expectInt16, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_INT16, val.typ) // int8 - var originInt8 int8 = int8(8) - var expectInt8 int32 = int32(8) - val, err = convert(&originInt8) + var expectInt8 int8 = int8(8) + val, err = convert(&expectInt8) assert.Nil(t, err) assert.Equal(t, expectInt8, val.val) - assert.Equal(t, greptime.ColumnDataType_INT32, val.typ) + assert.Equal(t, greptime.ColumnDataType_INT8, val.typ) // time.Time var originTime time.Time = time.UnixMilli(1677571339623)