Skip to content

Commit

Permalink
feat: support response header (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan authored Jul 25, 2023
1 parent 2ccbdac commit 841e5a5
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 158 deletions.
25 changes: 6 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,12 @@ func NewClient(cfg *Config) (*Client, error) {
}

// Insert helps to insert multiple rows of multiple tables into greptimedb
func (c *Client) Insert(ctx context.Context, req InsertsRequest) (uint32, error) {
func (c *Client) Insert(ctx context.Context, req InsertsRequest) (*greptimepb.GreptimeResponse, error) {
request, err := req.build(c.cfg)
if err != nil {
return 0, err
}

resp, err := c.greptimeClient.Handle(ctx, request)
if err != nil {
return 0, err
return nil, err
}

return resp.GetAffectedRows().Value, nil
return c.greptimeClient.Handle(ctx, request, c.cfg.CallOptions...)
}

// Query helps to retrieve data from greptimedb
Expand All @@ -87,8 +81,7 @@ func (c *Client) Query(ctx context.Context, req QueryRequest) (*Metric, error) {
if err != nil {
return nil, err
}

sr, err := c.flightClient.DoGet(ctx, &flight.Ticket{Ticket: b})
sr, err := c.flightClient.DoGet(ctx, &flight.Ticket{Ticket: b}, c.cfg.CallOptions...)
if err != nil {
return nil, err
}
Expand All @@ -102,16 +95,10 @@ func (c *Client) Query(ctx context.Context, req QueryRequest) (*Metric, error) {
}

// PromqlQuery helps to retrieve data from greptimedb via InstantQuery or RangeQuery
func (c *Client) PromqlQuery(ctx context.Context, req QueryRequest) ([]byte, error) {
func (c *Client) PromqlQuery(ctx context.Context, req QueryRequest) (*greptimepb.PromqlResponse, error) {
request, err := req.buildPromqlRequest(c.cfg)
if err != nil {
return nil, err
}

resp, err := c.promqlClient.Handle(ctx, request)
if err != nil {
return nil, err
}

return resp.GetBody(), nil
return c.promqlClient.Handle(ctx, request, c.cfg.CallOptions...)
}
90 changes: 55 additions & 35 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

func init() {
repo := "greptime/greptimedb"
tag := "0.3.0-alpha"
tag := "0.3.2"

var err error
pool, err := dockertest.NewPool("")
Expand Down Expand Up @@ -186,11 +186,13 @@ func TestInsertAndQueryWithSql(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
reqs.WithDatabase(database).Append(req)

n, err := client.Insert(context.Background(), reqs)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(len(insertMonitors)), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(len(insertMonitors)), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down Expand Up @@ -252,11 +254,13 @@ func TestPrecisionSecond(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
reqs.WithDatabase(database).Append(req)

n, err := client.Insert(context.Background(), reqs)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

queryReq := QueryRequest{}
queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database)
Expand Down Expand Up @@ -305,11 +309,13 @@ func TestNilInColumn(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
reqs.WithDatabase(database).Append(req)

n, err := client.Insert(context.Background(), reqs)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(len(insertMonitors)), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(len(insertMonitors)), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down Expand Up @@ -359,10 +365,12 @@ func TestNoNeedAuth(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err := client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

queryReq := QueryRequest{}
queryReq.WithSql(fmt.Sprintf("SELECT * FROM %s", table)).WithDatabase(database)
Expand All @@ -389,10 +397,12 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err := client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// insert again but with different type
series = Series{}
Expand All @@ -404,8 +414,8 @@ func TestInsertSameColumnWithDifferentType(t *testing.T) {
req = InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs = InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err = client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
_, err = client.Insert(context.Background(), reqs)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "Type of column count does not match type in schema, expect Int64(Int64Type), given Float64(Float64Type)")
}
Expand All @@ -425,10 +435,12 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err := client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// insert again but with different type
series = Series{}
Expand All @@ -441,8 +453,8 @@ func TestInsertTimestampWithDifferentPrecision(t *testing.T) {
req = InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs = InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err = client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
_, err = client.Insert(context.Background(), reqs)
assert.NotNil(t, err)
assert.ErrorContains(t, err, "Type of column ts does not match type in schema, expect Timestamp(Second(TimestampSecondType)), given Timestamp(Millisecond(TimestampMillisecondType))")
}
Expand All @@ -463,10 +475,12 @@ func TestGetNonMatchedTypeColumn(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err := client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down Expand Up @@ -517,10 +531,12 @@ func TestGetNotExistColumn(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
n, err := client.Insert(context.Background(), reqs)
reqs.WithDatabase(database).Append(req)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down Expand Up @@ -648,11 +664,13 @@ func TestDataTypes(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
reqs.WithDatabase(database).Append(req)

n, err := client.Insert(context.Background(), reqs)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down Expand Up @@ -857,11 +875,13 @@ func TestCreateTableInAdvance(t *testing.T) {
req := InsertRequest{}
req.WithTable(table).WithMetric(metric)
reqs := InsertsRequest{}
reqs.WithDatabase(database).Insert(req)
reqs.WithDatabase(database).Append(req)

n, err := client.Insert(context.Background(), reqs)
resp, err := client.Insert(context.Background(), reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(1), n)
assert.True(t, ParseRespHeader(resp).IsSuccess())
assert.False(t, ParseRespHeader(resp).IsRateLimited())
assert.Equal(t, uint32(1), resp.GetAffectedRows().GetValue())

// Query with metric
queryReq := QueryRequest{}
Expand Down
16 changes: 8 additions & 8 deletions doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func insert() {
// You can insert data of different tables into greptimedb in one InsertsRequest.
// This insertsRequest includes two InsertRequest of two different tables
insertsRequest.
Insert(constructInsertRequest(monitorTable)).
Insert(constructInsertRequest("temperatures"))
Append(constructInsertRequest(monitorTable)).
Append(constructInsertRequest("temperatures"))

// if you want to insert into different table in one request, you can construct
// another InsertRequest, and include it via: insertsRequest.Insert(insertRequest)
Expand All @@ -99,12 +99,12 @@ func insert() {
// insertsRequest.WithDatabase("your database")

// Fire the real Inserts request and Get the affected number of rows
n, err := client.Insert(context.Background(), insertsRequest)
resp, err := client.Insert(context.Background(), insertsRequest)
if err != nil {
fmt.Printf("fail to insert, err: %+v\n", err)
return
}
fmt.Printf("AffectedRows: %d\n", n)
fmt.Printf("AffectedRows: %d\n", resp.GetAffectedRows().GetValue())
}

// queryViaSql via Sql
Expand Down Expand Up @@ -161,9 +161,9 @@ func queryViaInstantPromql() {
}

// you can use prom package to unmarshal the response as you want
result, err := prom.UnmarshalApiResponse(resp)
result, err := prom.UnmarshalApiResponse(resp.GetBody())
if err != nil {
fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp), err)
fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err)
return
}
fmt.Printf("%s:\n%+v\n", result.Type, result.Val)
Expand All @@ -184,9 +184,9 @@ func queryViaRangePromql() {
}

// you can use prom package to unmarshal the response as you want
result, err := prom.UnmarshalApiResponse(resp)
result, err := prom.UnmarshalApiResponse(resp.GetBody())
if err != nil {
fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp), err)
fmt.Printf("failed to unmarshal instant promql, body: %s, err: %+v", string(resp.GetBody()), err)
return
}
fmt.Printf("%s:\n%+v\n", result.Type, result.Val)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/GreptimeTeam/greptimedb-client-go
go 1.20

require (
github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5
github.com/GreptimeTeam/greptime-proto v0.3.0
github.com/apache/arrow/go/v13 v13.0.0-20230606035815-e2ae492b6324
github.com/bits-and-blooms/bitset v1.7.0
github.com/ory/dockertest/v3 v3.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5 h1:kzy6xkRbXs9NBSJiv/OPz89tAkSTx1zdihneosBxiCE=
github.com/GreptimeTeam/greptime-proto v0.0.0-20230602083745-4398d20c56d5/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE=
github.com/GreptimeTeam/greptime-proto v0.3.0 h1:VLCNFW0yI12HMHSH8OucL153Cx1x1QUJKIzoSuBa07U=
github.com/GreptimeTeam/greptime-proto v0.3.0/go.mod h1:jk5XBR9qIbSBiDF2Gix1KALyIMCVktcpx91AayOWxmE=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg=
github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE=
Expand Down
38 changes: 35 additions & 3 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

package greptime

import greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
import (
greptimepb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1"
)

type header struct {
type reqHeader struct {
database string
}

func (h *header) build(cfg *Config) (*greptimepb.RequestHeader, error) {
func (h *reqHeader) build(cfg *Config) (*greptimepb.RequestHeader, error) {
if isEmptyString(h.database) {
h.database = cfg.Database
}
Expand All @@ -36,3 +38,33 @@ func (h *header) build(cfg *Config) (*greptimepb.RequestHeader, error) {

return header, nil
}

type RespHeader struct {
Code uint32
Msg string
}

func (h RespHeader) IsSuccess() bool {
return h.Code == 0
}

func (h RespHeader) IsRateLimited() bool {
return h.Code == 6001
}

func (h RespHeader) IsNil() bool {
return h.Code == 0 && isEmptyString(h.Msg)
}

type getRespHeader interface {
GetHeader() *greptimepb.ResponseHeader
}

func ParseRespHeader[T getRespHeader](r T) RespHeader {
header := &RespHeader{}
if r.GetHeader() != nil && r.GetHeader().Status != nil {
header.Code = r.GetHeader().Status.StatusCode
header.Msg = r.GetHeader().Status.ErrMsg
}
return *header
}
2 changes: 1 addition & 1 deletion header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestHeaderBuild(t *testing.T) {
h := &header{}
h := &reqHeader{}

gh, err := h.build(&Config{})
assert.ErrorIs(t, err, ErrEmptyDatabase)
Expand Down
Loading

0 comments on commit 841e5a5

Please sign in to comment.