Skip to content

Commit

Permalink
CBG-3715 add RevNo to FeedEvent, and populate for DCPClient (#6818)
Browse files Browse the repository at this point in the history
  • Loading branch information
torcolvin authored May 14, 2024
1 parent d5afc6c commit 1140f36
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 4 deletions.
4 changes: 4 additions & 0 deletions base/dcp_client_stream_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type mutationEvent struct {
value []byte
seq uint64
cas uint64
revNo uint64
flags uint32
expiry uint32
collection uint32
Expand All @@ -63,6 +64,7 @@ func (e mutationEvent) asFeedEvent() sgbucket.FeedEvent {
Value: e.value,
DataType: e.datatype,
Cas: e.cas,
RevNo: e.revNo,
VbNo: e.vbID,
TimeReceived: time.Now(),
}
Expand All @@ -74,6 +76,7 @@ type deletionEvent struct {
value []byte
seq uint64
cas uint64
revNo uint64
collection uint32
datatype uint8
streamEventCommon
Expand All @@ -88,6 +91,7 @@ func (e deletionEvent) asFeedEvent() sgbucket.FeedEvent {
Value: e.value,
DataType: e.datatype,
Cas: e.cas,
RevNo: e.revNo,
VbNo: e.vbID,
TimeReceived: time.Now(),
}
Expand Down
3 changes: 2 additions & 1 deletion base/dcp_client_stream_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (dc *DCPClient) Mutation(mutation gocbcore.DcpMutation) {
streamID: mutation.StreamID,
},
seq: mutation.SeqNo,
revNo: mutation.RevNo,
flags: mutation.Flags,
expiry: mutation.Expiry,
cas: mutation.Cas,
Expand All @@ -58,14 +59,14 @@ func (dc *DCPClient) Deletion(deletion gocbcore.DcpDeletion) {
if dc.filteredKey(deletion.Key) {
return
}

e := deletionEvent{
streamEventCommon: streamEventCommon{
vbID: deletion.VbID,
streamID: deletion.StreamID,
},
seq: deletion.SeqNo,
cas: deletion.Cas,
revNo: deletion.RevNo,
datatype: deletion.Datatype,
collection: deletion.CollectionID,
key: deletion.Key,
Expand Down
90 changes: 90 additions & 0 deletions base/dcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,3 +666,93 @@ func getCollectionIDs(t *testing.T, bucket *TestBucket) []uint32 {
return collectionIDs

}

func TestDCPFeedEventTypes(t *testing.T) {
TestRequiresGocbDCPClient(t)

ctx := TestCtx(t)
bucket := GetTestBucket(t)
defer bucket.Close(ctx)

collection := bucket.GetSingleDataStore()

// start one shot feed
var collectionIDs []uint32
if collection.IsSupported(sgbucket.BucketStoreFeatureCollections) {
collectionIDs = append(collectionIDs, collection.GetCollectionID())
}

clientOptions := DCPClientOptions{
CollectionIDs: collectionIDs,
CheckpointPrefix: DefaultMetadataKeys.DCPCheckpointPrefix(t.Name()),
}

gocbv2Bucket, err := AsGocbV2Bucket(bucket.Bucket)
require.NoError(t, err)

testOver := make(chan struct{})
docID := t.Name()
var dcpMutationCas uint64
var dcpMutationRevNo uint64
var dcpDeletionCas uint64
var dcpDeletionRevNo uint64
// create callback
callback := func(event sgbucket.FeedEvent) bool {
fmt.Printf("!! event: %+v\n", event)
// other doc events can happen from previous tests
if docID != string(event.Key) {
return true
}
switch event.Opcode {
case sgbucket.FeedOpMutation:
dcpMutationCas = event.Cas
dcpMutationRevNo = event.RevNo
require.NotEqual(t, uint64(0), dcpMutationCas)
require.NotEqual(t, uint64(0), dcpMutationRevNo)
case sgbucket.FeedOpDeletion:
defer close(testOver)

dcpDeletionCas = event.Cas
dcpDeletionRevNo = event.RevNo
// FIXME: I am surprised these values are zero
require.NotEqual(t, uint64(0), dcpDeletionCas)
require.NotEqual(t, uint64(0), dcpDeletionRevNo)
}
return true
}

dcpClient, err := NewDCPClient(ctx, t.Name(), callback, clientOptions, gocbv2Bucket)
require.NoError(t, err)

doneChan, startErr := dcpClient.Start()
require.NoError(t, startErr)

defer func() {
_ = dcpClient.Close() // extra close in case of early exit
}()
xattrName := "_xattr1"
xattrBody := []byte(`{"an": "xattr"}`)
writeMutationCas, err := collection.WriteWithXattrs(ctx, docID, 0, 0, []byte(`{"foo":"bar"}`), map[string][]byte{xattrName: xattrBody}, nil)
require.NoError(t, err)

deleteMutationCas, err := collection.Remove(docID, writeMutationCas)
require.NoError(t, err)

timeout := time.After(time.Second * 5)
select {
case <-testOver:
require.NoError(t, dcpClient.Close())
case <-timeout:
t.Fatalf("timeout waiting for doc write/deletion to complete")
}
require.NoError(t, <-doneChan)

xattrs, _, err := collection.GetXattrs(ctx, docID, []string{"_xattr1"})
require.NoError(t, err)
require.JSONEq(t, string(xattrBody), string(xattrs[xattrName]))

require.Equal(t, writeMutationCas, dcpMutationCas)
require.Equal(t, deleteMutationCas, dcpDeletionCas)
require.NotEqual(t, dcpMutationRevNo, dcpDeletionRevNo)

}
7 changes: 7 additions & 0 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,13 @@ func TestRequiresDCPResync(t testing.TB) {
}
}

// TestRequiresGocbDCPClient will skip the current test if using rosmar.
func TestRequiresGocbDCPClient(t testing.TB) {
if UnitTestUrlIsWalrus() {
t.Skip("rosmar doesn't support base.DCPClient")
}
}

// RequireDocNotFoundError asserts that the given error represents a document not found error.
func RequireDocNotFoundError(t testing.TB, e error) {
require.True(t, IsDocNotFoundError(e), fmt.Sprintf("Expected error to be a doc not found error, but was: %v", e))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/couchbase/gocbcore/v10 v10.4.1
github.com/couchbase/gomemcached v0.2.1
github.com/couchbase/goutils v0.1.2
github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4
github.com/couchbase/sg-bucket v0.0.0-20240514135815-5f5e7aa8625c
github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338
github.com/couchbaselabs/gocbconnstr v1.0.5
github.com/couchbaselabs/rosmar v0.0.0-20240417141520-4127f7d4c389
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/couchbase/goprotostellar v1.0.2 h1:yoPbAL9sCtcyZ5e/DcU5PRMOEFaJrF9awX
github.com/couchbase/goprotostellar v1.0.2/go.mod h1:5/yqVnZlW2/NSbAWu1hPJCFBEwjxgpe0PFFOlRixnp4=
github.com/couchbase/goutils v0.1.2 h1:gWr8B6XNWPIhfalHNog3qQKfGiYyh4K4VhO3P2o9BCs=
github.com/couchbase/goutils v0.1.2/go.mod h1:h89Ek/tiOxxqjz30nPPlwZdQbdB8BwgnuBxeoUe/ViE=
github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4 h1:UzmQ2oaCUEPkD/7l1e3s7U9OA4BFptp/Oi5UgaAVwBg=
github.com/couchbase/sg-bucket v0.0.0-20240510005938-766555de45c4/go.mod h1:5me3TJLTPfR0s3aMJZcPoTu5FT8oelaINz5l7Q3cApE=
github.com/couchbase/sg-bucket v0.0.0-20240514135815-5f5e7aa8625c h1:ftZeu97TaEWxBAINW2AaGNk3Icyg+/6XHs7OHnR23qQ=
github.com/couchbase/sg-bucket v0.0.0-20240514135815-5f5e7aa8625c/go.mod h1:IQisEdcLRfS/pjSgmqG/8gerVm0Q7GrvpQtMIZ7oYt4=
github.com/couchbase/tools-common/cloud v1.0.0 h1:SQZIccXoedbrThehc/r9BJbpi/JhwJ8X00PDjZ2gEBE=
github.com/couchbase/tools-common/cloud v1.0.0/go.mod h1:6KVlRpbcnDWrvickUJ+xpqCWx1vgYYlEli/zL4xmZAg=
github.com/couchbase/tools-common/fs v1.0.0 h1:HFA4xCF/r3BtZShFJUxzVvGuXtDkqGnaPzYJP3Kp1mw=
Expand Down

0 comments on commit 1140f36

Please sign in to comment.