Skip to content

Commit

Permalink
api: support IPROTO_WATCH_ONCE request type
Browse files Browse the repository at this point in the history
Add support of `IPROTO_WATCH_ONCE` request type.
It works only for Tarantool version >= 3.0.0-alpha1.

Part of #337
  • Loading branch information
DerekBum authored and oleg-jukovec committed Oct 30, 2023
1 parent a422262 commit 3f7860e
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support `fetch_latest_metadata` option for crud requests with metadata (#335)
- Support `noreturn` option for data change crud requests (#335)
- Support `crud.schema` request (#336)
- Support `IPROTO_WATCH_ONCE` request type for Tarantool
version >= 3.0.0-alpha1 (#337)

### Changed

Expand Down
26 changes: 25 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,13 @@ func ExampleProtocolVersion() {
fmt.Println("Connector client protocol feature:", f)
}
// Output:
// Connector client protocol version: 4
// Connector client protocol version: 6
// Connector client protocol feature: StreamsFeature
// Connector client protocol feature: TransactionsFeature
// Connector client protocol feature: ErrorExtensionFeature
// Connector client protocol feature: WatchersFeature
// Connector client protocol feature: PaginationFeature
// Connector client protocol feature: WatchOnceFeature
}

func getTestTxnOpts() tarantool.Opts {
Expand Down Expand Up @@ -1230,3 +1231,26 @@ func ExampleConnection_CloseGraceful_force() {
// Result:
// <nil> connection closed by client (0x4001)
}

func ExampleWatchOnceRequest() {
const key = "foo"
const value = "bar"

// WatchOnce request present in Tarantool since version 3.0
isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
if err != nil || isLess {
return
}

conn := exampleConnect(opts)
defer conn.Close()

conn.Do(tarantool.NewBroadcastRequest(key).Value(value)).Get()

resp, err := conn.Do(tarantool.NewWatchOnceRequest(key)).Get()
if err != nil {
fmt.Printf("Failed to execute the request: %s\n", err)
} else {
fmt.Println(resp.Data)
}
}
6 changes: 6 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,9 @@ func RefImplRollbackBody(enc *msgpack.Encoder) error {
func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error {
return fillId(enc, protocolInfo)
}

// RefImplWatchOnceBody is reference implementation for filling of an watchOnce
// request's body.
func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error {
return fillWatchOnce(enc, key)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.7.1
github.com/tarantool/go-iproto v0.1.0
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca
github.com/vmihailenco/msgpack/v5 v5.3.5
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tarantool/go-iproto v0.1.0 h1:zHN9AA8LDawT+JBD0/Nxgr/bIsWkkpDzpcMuaNPSIAQ=
github.com/tarantool/go-iproto v0.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 h1:YrsRc1sDZ6HOZccvM2eJ3Nu2TMBq7NMZMsaT5KCu5qU=
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca h1:oOrBh73tDDyooIXajfr+0pfnM+89404ClAhJpTTHI7E=
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
Expand Down
11 changes: 9 additions & 2 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type ProtocolVersion uint64

// ProtocolVersion type stores a Tarantool protocol feature.
type ProtocolFeature uint64
type ProtocolFeature iproto.Feature

// ProtocolInfo type aggregates Tarantool protocol version and features info.
type ProtocolInfo struct {
Expand Down Expand Up @@ -52,6 +52,8 @@ const (
// PaginationFeature represents support of pagination
// (supported by connector).
PaginationFeature ProtocolFeature = 4
// WatchOnceFeature represents support of WatchOnce request types.
WatchOnceFeature ProtocolFeature = 6
)

// String returns the name of a Tarantool feature.
Expand All @@ -68,6 +70,8 @@ func (ftr ProtocolFeature) String() string {
return "WatchersFeature"
case PaginationFeature:
return "PaginationFeature"
case WatchOnceFeature:
return "WatchOnceFeature"
default:
return fmt.Sprintf("Unknown feature (code %d)", ftr)
}
Expand All @@ -79,7 +83,7 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
// introduced in master 948e5cd (possible 2.10.5 or 2.11.0).
// Support of protocol version on connector side was introduced in
// 1.10.0.
Version: ProtocolVersion(4),
Version: ProtocolVersion(6),
// Streams and transactions were introduced in protocol version 1
// (Tarantool 2.10.0), in connector since 1.7.0.
// Error extension type was introduced in protocol
Expand All @@ -88,12 +92,15 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
// connector since 1.10.0.
// Pagination were introduced in protocol version 4 (Tarantool 2.11.0), in
// connector since 1.11.0.
// WatchOnce request type was introduces in protocol version 6
// (Tarantool 3.0.0), in connector since 2.0.0.
Features: []ProtocolFeature{
StreamsFeature,
TransactionsFeature,
ErrorExtensionFeature,
WatchersFeature,
PaginationFeature,
WatchOnceFeature,
},
}

Expand Down
1 change: 1 addition & 0 deletions protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestFeatureStringRepresentation(t *testing.T) {
require.Equal(t, ErrorExtensionFeature.String(), "ErrorExtensionFeature")
require.Equal(t, WatchersFeature.String(), "WatchersFeature")
require.Equal(t, PaginationFeature.String(), "PaginationFeature")
require.Equal(t, WatchOnceFeature.String(), "WatchOnceFeature")

require.Equal(t, ProtocolFeature(15532).String(), "Unknown feature (code 15532)")
}
36 changes: 36 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ func fillPing(enc *msgpack.Encoder) error {
return enc.EncodeMapLen(0)
}

func fillWatchOnce(enc *msgpack.Encoder, key string) error {
if err := enc.EncodeMapLen(1); err != nil {
return err
}
if err := enc.EncodeUint(uint64(iproto.IPROTO_EVENT_KEY)); err != nil {
return err
}
return enc.EncodeString(key)
}

// Ping sends empty request to Tarantool to check connection.
//
// Deprecated: the method will be removed in the next major version,
Expand Down Expand Up @@ -1354,3 +1364,29 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
req.ctx = ctx
return req
}

// WatchOnceRequest synchronously fetches the value currently associated with a
// specified notification key without subscribing to changes.
type WatchOnceRequest struct {
baseRequest
key string
}

// NewWatchOnceRequest returns a new watchOnceRequest.
func NewWatchOnceRequest(key string) *WatchOnceRequest {
req := new(WatchOnceRequest)
req.rtype = iproto.IPROTO_WATCH_ONCE
req.key = key
return req
}

// Body fills an msgpack.Encoder with the watchOnce request body.
func (req *WatchOnceRequest) Body(res SchemaResolver, enc *msgpack.Encoder) error {
return fillWatchOnce(enc, req.key)
}

// Context sets a passed context to the request.
func (req *WatchOnceRequest) Context(ctx context.Context) *WatchOnceRequest {
req.ctx = ctx
return req
}
18 changes: 18 additions & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestRequestsTypes(t *testing.T) {
{req: NewRollbackRequest(), rtype: iproto.IPROTO_ROLLBACK},
{req: NewIdRequest(validProtocolInfo), rtype: iproto.IPROTO_ID},
{req: NewBroadcastRequest(validKey), rtype: iproto.IPROTO_CALL},
{req: NewWatchOnceRequest(validKey), rtype: iproto.IPROTO_WATCH_ONCE},
}

for _, test := range tests {
Expand Down Expand Up @@ -231,6 +232,7 @@ func TestRequestsAsync(t *testing.T) {
{req: NewRollbackRequest(), async: false},
{req: NewIdRequest(validProtocolInfo), async: false},
{req: NewBroadcastRequest(validKey), async: false},
{req: NewWatchOnceRequest(validKey), async: false},
}

for _, test := range tests {
Expand Down Expand Up @@ -265,6 +267,7 @@ func TestRequestsCtx_default(t *testing.T) {
{req: NewRollbackRequest(), expected: nil},
{req: NewIdRequest(validProtocolInfo), expected: nil},
{req: NewBroadcastRequest(validKey), expected: nil},
{req: NewWatchOnceRequest(validKey), expected: nil},
}

for _, test := range tests {
Expand Down Expand Up @@ -300,6 +303,7 @@ func TestRequestsCtx_setter(t *testing.T) {
{req: NewRollbackRequest().Context(ctx), expected: ctx},
{req: NewIdRequest(validProtocolInfo).Context(ctx), expected: ctx},
{req: NewBroadcastRequest(validKey).Context(ctx), expected: ctx},
{req: NewWatchOnceRequest(validKey).Context(ctx), expected: ctx},
}

for _, test := range tests {
Expand Down Expand Up @@ -823,3 +827,17 @@ func TestBroadcastRequestSetters(t *testing.T) {
req := NewBroadcastRequest(validKey).Value(value)
assertBodyEqual(t, refBuf.Bytes(), req)
}

func TestWatchOnceRequestDefaultValues(t *testing.T) {
var refBuf bytes.Buffer

refEnc := msgpack.NewEncoder(&refBuf)
err := RefImplWatchOnceBody(refEnc, validKey)
if err != nil {
t.Errorf("An unexpected RefImplCallBody() error: %q", err.Error())
return
}

req := NewWatchOnceRequest(validKey)
assertBodyEqual(t, refBuf.Bytes(), req)
}
53 changes: 51 additions & 2 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2606,6 +2606,53 @@ func TestConnectionDoSelectRequest(t *testing.T) {
testConnectionDoSelectRequestCheck(t, resp, err, false, 10, 1010)
}

func TestConnectionDoWatchOnceRequest(t *testing.T) {
test_helpers.SkipIfWatchOnceUnsupported(t)

conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

_, err := conn.Do(NewBroadcastRequest("hello").Value("world")).Get()
if err != nil {
t.Fatalf("Failed to create a broadcast : %s", err.Error())
}

resp, err := conn.Do(NewWatchOnceRequest("hello")).Get()
if err != nil {
t.Fatalf("Failed to WatchOnce: %s", err.Error())
}
if resp.Code != OkCode {
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
}
if len(resp.Data) < 1 || resp.Data[0] != "world" {
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
}
}

func TestConnectionDoWatchOnceOnEmptyKey(t *testing.T) {
watchOnceNotSupported, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
if err != nil {
log.Fatalf("Could not check the Tarantool version")
}
if watchOnceNotSupported {
return
}

conn := test_helpers.ConnectWithValidation(t, server, opts)
defer conn.Close()

resp, err := conn.Do(NewWatchOnceRequest("notexists!")).Get()
if err != nil {
t.Fatalf("Failed to WatchOnce: %s", err.Error())
}
if resp.Code != OkCode {
t.Errorf("Failed to WatchOnce: wrong code returned %d", resp.Code)
}
if len(resp.Data) > 0 {
t.Errorf("Failed to WatchOnce: wrong value returned %v", resp.Data)
}
}

func TestConnectionDoSelectRequest_fetch_pos(t *testing.T) {
test_helpers.SkipIfPaginationUnsupported(t)

Expand Down Expand Up @@ -3247,13 +3294,14 @@ func TestConnectionProtocolInfoSupported(t *testing.T) {
require.Equal(t,
clientProtocolInfo,
ProtocolInfo{
Version: ProtocolVersion(4),
Version: ProtocolVersion(6),
Features: []ProtocolFeature{
StreamsFeature,
TransactionsFeature,
ErrorExtensionFeature,
WatchersFeature,
PaginationFeature,
WatchOnceFeature,
},
})

Expand Down Expand Up @@ -3364,13 +3412,14 @@ func TestConnectionProtocolInfoUnsupported(t *testing.T) {
require.Equal(t,
clientProtocolInfo,
ProtocolInfo{
Version: ProtocolVersion(4),
Version: ProtocolVersion(6),
Features: []ProtocolFeature{
StreamsFeature,
TransactionsFeature,
ErrorExtensionFeature,
WatchersFeature,
PaginationFeature,
WatchOnceFeature,
},
})

Expand Down
8 changes: 8 additions & 0 deletions test_helpers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ func SkipIfPaginationUnsupported(t *testing.T) {
SkipIfFeatureUnsupported(t, "pagination", 2, 11, 0)
}

// SkipIfWatchOnceUnsupported skips test run if Tarantool without WatchOnce
// request type is used.
func SkipIfWatchOnceUnsupported(t *testing.T) {
t.Helper()

SkipIfFeatureUnsupported(t, "watch once", 3, 0, 0)
}

// CheckEqualBoxErrors checks equivalence of tarantool.BoxError objects.
//
// Tarantool errors are not comparable by nature:
Expand Down

0 comments on commit 3f7860e

Please sign in to comment.