From 3dec55e95804d9a7f8c2711bf94221d6120f41ed Mon Sep 17 00:00:00 2001 From: DerekBum Date: Tue, 24 Oct 2023 19:59:23 +0300 Subject: [PATCH] api: support `IPROTO_WATCH_ONCE` request type Add support of `IPROTO_WATCH_ONCE` request type. It works only for Tarantool version >= 3.0.0. Part of #337 --- CHANGELOG.md | 1 + example_test.go | 26 ++++++++++++++++++++- export_test.go | 6 +++++ go.mod | 2 +- go.sum | 4 ++-- protocol.go | 11 +++++++-- protocol_test.go | 1 + request.go | 36 +++++++++++++++++++++++++++++ request_test.go | 18 +++++++++++++++ tarantool_test.go | 53 +++++++++++++++++++++++++++++++++++++++++-- test_helpers/utils.go | 8 +++++++ 11 files changed, 158 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea6c49f40..106143d1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support `fetch_latest_metadata` option for crud requests with metadata (#335) - Support `noreturn` option for data change crud requests (#335) - Support `crud.schema` request (#336) +- Support `IPROTO_WATCH_ONCE` request type for Tarantool version >= 3.0.0 (#337) ### Changed diff --git a/example_test.go b/example_test.go index 77b2cff24..5557e7a4c 100644 --- a/example_test.go +++ b/example_test.go @@ -626,12 +626,13 @@ func ExampleProtocolVersion() { fmt.Println("Connector client protocol feature:", f) } // Output: - // Connector client protocol version: 4 + // Connector client protocol version: 6 // Connector client protocol feature: StreamsFeature // Connector client protocol feature: TransactionsFeature // Connector client protocol feature: ErrorExtensionFeature // Connector client protocol feature: WatchersFeature // Connector client protocol feature: PaginationFeature + // Connector client protocol feature: WatchOnceFeature } func getTestTxnOpts() tarantool.Opts { @@ -1230,3 +1231,26 @@ func ExampleConnection_CloseGraceful_force() { // Result: // connection closed by client (0x4001) } + +func ExampleWatchOnceRequest() { + const key = "foo" + const value = "bar" + + // WatchOnce request present in Tarantool since version 3.0 + isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0) + if err != nil || isLess { + return + } + + conn := exampleConnect(opts) + defer conn.Close() + + conn.Do(tarantool.NewBroadcastRequest(key).Value(value)).Get() + + resp, err := conn.Do(tarantool.NewWatchOnceRequest(key)).Get() + if err != nil { + fmt.Printf("Failed to execute the request: %s\n", err) + } else { + fmt.Println(resp.Data) + } +} diff --git a/export_test.go b/export_test.go index fc5d90c34..38211a4fc 100644 --- a/export_test.go +++ b/export_test.go @@ -120,3 +120,9 @@ func RefImplRollbackBody(enc *msgpack.Encoder) error { func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error { return fillId(enc, protocolInfo) } + +// RefImplWatchOnceBody is reference implementation for filling of an watchOnce +// request's body. +func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error { + return fillWatchOnce(enc, key) +} diff --git a/go.mod b/go.mod index 22cb7aee3..440597972 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/shopspring/decimal v1.3.1 github.com/stretchr/testify v1.7.1 - github.com/tarantool/go-iproto v0.1.0 + github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca github.com/vmihailenco/msgpack/v5 v5.3.5 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect diff --git a/go.sum b/go.sum index 44d00984f..038e8af34 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tarantool/go-iproto v0.1.0 h1:zHN9AA8LDawT+JBD0/Nxgr/bIsWkkpDzpcMuaNPSIAQ= -github.com/tarantool/go-iproto v0.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= +github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 h1:YrsRc1sDZ6HOZccvM2eJ3Nu2TMBq7NMZMsaT5KCu5qU= +github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca h1:oOrBh73tDDyooIXajfr+0pfnM+89404ClAhJpTTHI7E= github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A= github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= diff --git a/protocol.go b/protocol.go index 63424283b..b947c6cbb 100644 --- a/protocol.go +++ b/protocol.go @@ -12,7 +12,7 @@ import ( type ProtocolVersion uint64 // ProtocolVersion type stores a Tarantool protocol feature. -type ProtocolFeature uint64 +type ProtocolFeature iproto.Feature // ProtocolInfo type aggregates Tarantool protocol version and features info. type ProtocolInfo struct { @@ -52,6 +52,8 @@ const ( // PaginationFeature represents support of pagination // (supported by connector). PaginationFeature ProtocolFeature = 4 + // WatchOnceFeature represents support of WatchOnce request types. + WatchOnceFeature ProtocolFeature = 6 ) // String returns the name of a Tarantool feature. @@ -68,6 +70,8 @@ func (ftr ProtocolFeature) String() string { return "WatchersFeature" case PaginationFeature: return "PaginationFeature" + case WatchOnceFeature: + return "WatchOnceFeature" default: return fmt.Sprintf("Unknown feature (code %d)", ftr) } @@ -79,7 +83,7 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{ // introduced in master 948e5cd (possible 2.10.5 or 2.11.0). // Support of protocol version on connector side was introduced in // 1.10.0. - Version: ProtocolVersion(4), + Version: ProtocolVersion(6), // Streams and transactions were introduced in protocol version 1 // (Tarantool 2.10.0), in connector since 1.7.0. // Error extension type was introduced in protocol @@ -88,12 +92,15 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{ // connector since 1.10.0. // Pagination were introduced in protocol version 4 (Tarantool 2.11.0), in // connector since 1.11.0. + // WatchOnce request type was introduces in protocol version 6 + // (Tarantool 3.0.0), in connector since 2.0.0. Features: []ProtocolFeature{ StreamsFeature, TransactionsFeature, ErrorExtensionFeature, WatchersFeature, PaginationFeature, + WatchOnceFeature, }, } diff --git a/protocol_test.go b/protocol_test.go index 6aa94463b..2ec4b0bc3 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -32,6 +32,7 @@ func TestFeatureStringRepresentation(t *testing.T) { require.Equal(t, ErrorExtensionFeature.String(), "ErrorExtensionFeature") require.Equal(t, WatchersFeature.String(), "WatchersFeature") require.Equal(t, PaginationFeature.String(), "PaginationFeature") + require.Equal(t, WatchOnceFeature.String(), "WatchOnceFeature") require.Equal(t, ProtocolFeature(15532).String(), "Unknown feature (code 15532)") } diff --git a/request.go b/request.go index 917c1fb6a..3332486f4 100644 --- a/request.go +++ b/request.go @@ -166,6 +166,16 @@ func fillPing(enc *msgpack.Encoder) error { return enc.EncodeMapLen(0) } +func fillWatchOnce(enc *msgpack.Encoder, key string) error { + if err := enc.EncodeMapLen(1); err != nil { + return err + } + if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil { + return err + } + return enc.EncodeString(key) +} + // Ping sends empty request to Tarantool to check connection. // // Deprecated: the method will be removed in the next major version, @@ -1354,3 +1364,29 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest { req.ctx = ctx return req } + +// WatchOnceRequest synchronously fetches the value currently associated with a +// specified notification key without subscribing to changes. +type WatchOnceRequest struct { + baseRequest + key string +} + +// NewWatchOnceRequest returns a new watchOnceRequest. +func NewWatchOnceRequest(key string) *WatchOnceRequest { + req := new(WatchOnceRequest) + req.rtype = iproto.IPROTO_WATCH_ONCE + req.key = key + return req +} + +// Body fills an msgpack.Encoder with the watchOnce request body. +func (req *WatchOnceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error { + return fillWatchOnce(enc, req.key) +} + +// Context sets a passed context to the request. +func (req *WatchOnceRequest) Context(ctx context.Context) *WatchOnceRequest { + req.ctx = ctx + return req +} diff --git a/request_test.go b/request_test.go index f44b12d52..da825b707 100644 --- a/request_test.go +++ b/request_test.go @@ -196,6 +196,7 @@ func TestRequestsTypes(t *testing.T) { {req: NewRollbackRequest(), rtype: iproto.IPROTO_ROLLBACK}, {req: NewIdRequest(validProtocolInfo), rtype: iproto.IPROTO_ID}, {req: NewBroadcastRequest(validKey), rtype: iproto.IPROTO_CALL}, + {req: NewWatchOnceRequest(validKey), rtype: iproto.IPROTO_WATCH_ONCE}, } for _, test := range tests { @@ -231,6 +232,7 @@ func TestRequestsAsync(t *testing.T) { {req: NewRollbackRequest(), async: false}, {req: NewIdRequest(validProtocolInfo), async: false}, {req: NewBroadcastRequest(validKey), async: false}, + {req: NewWatchOnceRequest(validKey), async: false}, } for _, test := range tests { @@ -265,6 +267,7 @@ func TestRequestsCtx_default(t *testing.T) { {req: NewRollbackRequest(), expected: nil}, {req: NewIdRequest(validProtocolInfo), expected: nil}, {req: NewBroadcastRequest(validKey), expected: nil}, + {req: NewWatchOnceRequest(validKey), expected: nil}, } for _, test := range tests { @@ -300,6 +303,7 @@ func TestRequestsCtx_setter(t *testing.T) { {req: NewRollbackRequest().Context(ctx), expected: ctx}, {req: NewIdRequest(validProtocolInfo).Context(ctx), expected: ctx}, {req: NewBroadcastRequest(validKey).Context(ctx), expected: ctx}, + {req: NewWatchOnceRequest(validKey).Context(ctx), expected: ctx}, } for _, test := range tests { @@ -823,3 +827,17 @@ func TestBroadcastRequestSetters(t *testing.T) { req := NewBroadcastRequest(validKey).Value(value) assertBodyEqual(t, refBuf.Bytes(), req) } + +func TestWatchOnceRequestDefaultValues(t *testing.T) { + var refBuf bytes.Buffer + + refEnc := msgpack.NewEncoder(&refBuf) + err := RefImplWatchOnceBody(refEnc, validKey) + if err != nil { + t.Errorf("An unexpected RefImplCallBody() error: %q", err.Error()) + return + } + + req := NewWatchOnceRequest(validKey) + assertBodyEqual(t, refBuf.Bytes(), req) +} diff --git a/tarantool_test.go b/tarantool_test.go index bde3060c7..abb454eaa 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -2606,6 +2606,53 @@ func TestConnectionDoSelectRequest(t *testing.T) { testConnectionDoSelectRequestCheck(t, resp, err, false, 10, 1010) } +func TestConnectionDoWatchOnceRequest(t *testing.T) { + test_helpers.SkipIfWatchOnceUnsupported(t) + + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + _, err := conn.Do(NewBroadcastRequest("hello").Value("world")).Get() + if err != nil { + t.Fatalf("Failed to create a broadcast : %s", err.Error()) + } + + resp, err := conn.Do(NewWatchOnceRequest("hello")).Get() + if err != nil { + t.Fatalf("Failed to WatchOnce: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code) + } + if len(resp.Data) < 1 || resp.Data[0] != "world" { + t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data) + } +} + +func TestConnectionDoWatchOnceOnEmptyKey(t *testing.T) { + watchOnceNotSupported, err := test_helpers.IsTarantoolVersionLess(3, 0, 0) + if err != nil { + log.Fatalf("Could not check the Tarantool version") + } + if watchOnceNotSupported { + return + } + + conn := test_helpers.ConnectWithValidation(t, server, opts) + defer conn.Close() + + resp, err := conn.Do(NewWatchOnceRequest("notexists!")).Get() + if err != nil { + t.Fatalf("Failed to WatchOnce: %s", err.Error()) + } + if resp.Code != OkCode { + t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code) + } + if len(resp.Data) > 0 { + t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data) + } +} + func TestConnectionDoSelectRequest_fetch_pos(t *testing.T) { test_helpers.SkipIfPaginationUnsupported(t) @@ -3247,13 +3294,14 @@ func TestConnectionProtocolInfoSupported(t *testing.T) { require.Equal(t, clientProtocolInfo, ProtocolInfo{ - Version: ProtocolVersion(4), + Version: ProtocolVersion(6), Features: []ProtocolFeature{ StreamsFeature, TransactionsFeature, ErrorExtensionFeature, WatchersFeature, PaginationFeature, + WatchOnceFeature, }, }) @@ -3364,13 +3412,14 @@ func TestConnectionProtocolInfoUnsupported(t *testing.T) { require.Equal(t, clientProtocolInfo, ProtocolInfo{ - Version: ProtocolVersion(4), + Version: ProtocolVersion(6), Features: []ProtocolFeature{ StreamsFeature, TransactionsFeature, ErrorExtensionFeature, WatchersFeature, PaginationFeature, + WatchOnceFeature, }, }) diff --git a/test_helpers/utils.go b/test_helpers/utils.go index 898ae84e3..d2a941775 100644 --- a/test_helpers/utils.go +++ b/test_helpers/utils.go @@ -192,6 +192,14 @@ func SkipIfPaginationUnsupported(t *testing.T) { SkipIfFeatureUnsupported(t, "pagination", 2, 11, 0) } +// SkipIfWatchOnceUnsupported skips test run if Tarantool without WatchOnce +// request type is used. +func SkipIfWatchOnceUnsupported(t *testing.T) { + t.Helper() + + SkipIfFeatureUnsupported(t, "watch once", 3, 0, 0) +} + // CheckEqualBoxErrors checks equivalence of tarantool.BoxError objects. // // Tarantool errors are not comparable by nature: