From 107b0dc2f7882fae28e4284e6db651f638047970 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 13 Dec 2024 15:44:11 -0500 Subject: [PATCH] CBG-4417 construct missing CV entry from HLV if not present --- topologytest/couchbase_lite_mock_peer_test.go | 6 +- topologytest/couchbase_server_peer_test.go | 89 +++++++++++-------- topologytest/hlv_test.go | 14 --- topologytest/multi_actor_conflict_test.go | 5 +- topologytest/multi_actor_no_conflict_test.go | 6 +- topologytest/peer_test.go | 16 ++-- topologytest/sync_gateway_peer_test.go | 2 +- topologytest/version_test.go | 45 +++++----- 8 files changed, 90 insertions(+), 93 deletions(-) diff --git a/topologytest/couchbase_lite_mock_peer_test.go b/topologytest/couchbase_lite_mock_peer_test.go index 446a0c8a47..1a918cc42f 100644 --- a/topologytest/couchbase_lite_mock_peer_test.go +++ b/topologytest/couchbase_lite_mock_peer_test.go @@ -70,12 +70,14 @@ func (p *CouchbaseLiteMockPeer) CreateDocument(dsName sgbucket.DataStoreName, do // WriteDocument writes a document to the peer. The test will fail if the write does not succeed. func (p *CouchbaseLiteMockPeer) WriteDocument(_ sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion { + p.TB().Logf("%s: Writing document %s", p, docID) // this isn't yet collection aware, using single default collection client := p.getSingleBlipClient() // set an HLV here. docVersion, err := client.btcRunner.PushRev(client.ID(), docID, rest.EmptyDocVersion(), body) require.NoError(client.btcRunner.TB(), err) - docMetadata := DocMetadataFromDocVersion(docID, docVersion) + // FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing + docMetadata := DocMetadataFromDocVersion(client.btc.TB(), docID, docVersion) return BodyAndVersion{ docMeta: docMetadata, body: body, @@ -95,7 +97,7 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(_ sgbucket.DataStoreName, docI var data []byte require.EventuallyWithT(p.TB(), func(c *assert.CollectT) { var found bool - data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV()}) + data, found = client.btcRunner.GetVersion(client.ID(), docID, rest.DocVersion{CV: docVersion.CV(c)}) if !assert.True(c, found, "Could not find docID:%+v on %p\nVersion %#v", docID, p, docVersion) { return } diff --git a/topologytest/couchbase_server_peer_test.go b/topologytest/couchbase_server_peer_test.go index ec59be37ef..23d8dc4b48 100644 --- a/topologytest/couchbase_server_peer_test.go +++ b/topologytest/couchbase_server_peer_test.go @@ -22,6 +22,11 @@ import ( "github.com/stretchr/testify/require" ) +// dummySystemXattr is created for XDCR testing. This prevents a document echo after an initial write. The dummy xattr also means that the document will always have xattrs when deleting it, which is necessary for WriteUpdateWithXattrs. +const dummySystemXattr = "_dummysystemxattr" + +var metadataXattrNames = []string{base.VvXattrName, base.MouXattrName, base.SyncXattrName, dummySystemXattr} + // CouchbaseServerPeer represents an instance of a backing server (bucket). This is rosmar unless SG_TEST_BACKING_STORE=couchbase is set. type CouchbaseServerPeer struct { tb testing.TB @@ -96,20 +101,19 @@ func (p *CouchbaseServerPeer) GetDocument(dsName sgbucket.DataStoreName, docID s // CreateDocument creates a document on the peer. The test will fail if the document already exists. func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion { - p.tb.Logf("%s: Creating document %s in bucket %s", p, docID, p.bucket.GetName()) + p.tb.Logf("%s: Creating document %s", p, docID) // create document with xattrs to prevent XDCR from doing a round trip replication in this scenario: // CBS1: write document (cas1, no _vv) // CBS1->CBS2: XDCR replication // CBS2->CBS1: XDCR replication, creates a new _vv - cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{"userxattr": []byte(`{"dummy": "xattr"}`)}, nil, nil) + cas, err := p.getCollection(dsName).WriteWithXattrs(p.Context(), docID, 0, 0, body, map[string][]byte{dummySystemXattr: []byte(`{"dummy": "xattr"}`)}, nil, nil) require.NoError(p.tb, err) + implicitHLV := db.NewHybridLogicalVector() + require.NoError(p.tb, implicitHLV.AddVersion(db.Version{SourceID: p.SourceID(), Value: cas})) docMetadata := DocMetadata{ - DocID: docID, - Cas: cas, - ImplicitCV: &db.Version{ - SourceID: p.SourceID(), - Value: cas, - }, + DocID: docID, + Cas: cas, + ImplicitHLV: implicitHLV, } return BodyAndVersion{ docMeta: docMetadata, @@ -121,23 +125,16 @@ func (p *CouchbaseServerPeer) CreateDocument(dsName sgbucket.DataStoreName, docI // WriteDocument writes a document to the peer. The test will fail if the write does not succeed. func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) BodyAndVersion { p.tb.Logf("%s: Writing document %s", p, docID) + var lastXattrs map[string][]byte // write the document LWW, ignoring any in progress writes - callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) { - return body, nil, false, nil + callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + lastXattrs = xattrs + return sgbucket.UpdatedDoc{Doc: body}, nil } - cas, err := p.getCollection(dsName).Update(docID, 0, callback) + cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback) require.NoError(p.tb, err) - docMetadata := DocMetadata{ - DocID: docID, - // FIXME: this should actually probably show the HLV persisted, and then also the implicit CV - Cas: cas, - ImplicitCV: &db.Version{ - SourceID: p.SourceID(), - Value: cas, - }, - } return BodyAndVersion{ - docMeta: docMetadata, + docMeta: getDocVersion(docID, p, cas, lastXattrs), body: body, updatePeer: p.name, } @@ -146,19 +143,15 @@ func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID // DeleteDocument deletes a document on the peer. The test will fail if the document does not exist. func (p *CouchbaseServerPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) DocMetadata { // delete the document, ignoring any in progress writes. We are allowed to delete a document that does not exist. - callback := func(_ []byte) (updated []byte, expiry *uint32, shouldDelete bool, err error) { - return nil, nil, true, nil + var lastXattrs map[string][]byte + // write the document LWW, ignoring any in progress writes + callback := func(_ []byte, xattrs map[string][]byte, _ uint64) (sgbucket.UpdatedDoc, error) { + lastXattrs = xattrs + return sgbucket.UpdatedDoc{Doc: nil, IsTombstone: true, Xattrs: xattrs}, nil } - cas, err := p.getCollection(dsName).Update(docID, 0, callback) + cas, err := p.getCollection(dsName).WriteUpdateWithXattrs(p.Context(), docID, metadataXattrNames, 0, nil, nil, callback) require.NoError(p.tb, err) - return DocMetadata{ - DocID: docID, - Cas: cas, - ImplicitCV: &db.Version{ - SourceID: p.SourceID(), - Value: cas, - }, - } + return getDocVersion(docID, p, cas, lastXattrs) } // WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s. @@ -191,14 +184,14 @@ func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, d var err error var xattrs map[string][]byte var cas uint64 - docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, []string{base.VvXattrName}) + docBytes, xattrs, cas, err = p.getCollection(dsName).GetWithXattrs(p.Context(), docID, metadataXattrNames) if !assert.NoError(c, err) { return } // have to use p.tb instead of c because of the assert.CollectT doesn't implement TB version = getDocVersion(docID, p, cas, xattrs) - assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes)) + assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s\nexpected: %#v\nactual: %#v\n body: %#v\n", docID, p, expected, version, string(docBytes)) }, totalWaitTime, pollInterval) return docBytes @@ -285,6 +278,20 @@ func (p *CouchbaseServerPeer) UpdateTB(tb *testing.T) { p.tb = tb } +// useImplicitHLV returns true if the document's HLV is not up to date and an HLV should be composed of current sourceID and cas. +func useImplicitHLV(doc DocMetadata) bool { + if doc.HLV == nil { + return true + } + if doc.HLV.CurrentVersionCAS == doc.Cas { + return false + } + if doc.Mou == nil { + return true + } + return doc.Mou.CAS() != doc.Cas +} + // getDocVersion returns a DocVersion from a cas and xattrs with _vv (hlv) and _sync (RevTreeID). func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte) DocMetadata { docVersion := DocMetadata{ @@ -298,11 +305,15 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte hlvBytes, ok := xattrs[base.VvXattrName] if ok { require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.HLV)) - } else { - docVersion.ImplicitCV = &db.Version{ - SourceID: peer.SourceID(), - Value: cas, + } + if useImplicitHLV(docVersion) { + if docVersion.HLV == nil { + docVersion.ImplicitHLV = db.NewHybridLogicalVector() + } else { + require.NoError(peer.TB(), json.Unmarshal(hlvBytes, &docVersion.ImplicitHLV)) + docVersion.ImplicitHLV = docVersion.HLV } + require.NoError(peer.TB(), docVersion.ImplicitHLV.AddVersion(db.Version{SourceID: peer.SourceID(), Value: cas})) } sync, ok := xattrs[base.SyncXattrName] if ok { @@ -315,7 +326,7 @@ func getDocVersion(docID string, peer Peer, cas uint64, xattrs map[string][]byte // getBodyAndVersion returns the body and version of a document from a sgbucket.DataStore. func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (DocMetadata, db.Body) { - docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, []string{base.VvXattrName}) + docBytes, xattrs, cas, err := collection.GetWithXattrs(peer.Context(), docID, metadataXattrNames) require.NoError(peer.TB(), err) // get hlv to construct DocVersion var body db.Body diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 3fd89b28d2..63172ef770 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -58,20 +58,6 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, pee } } -// waitForVersionAndBodyOnNonActivePeers waits for a document to reach a specific version on all non-active peers. This is stub until CBG-4417 is implemented. -func waitForVersionAndBodyOnNonActivePeers(t *testing.T, dsName base.ScopeAndCollectionName, docID string, peers Peers, expectedVersion BodyAndVersion) { - for peerName := range peers.SortedPeers() { - if peerName == expectedVersion.updatePeer { - // skip peer the write came from - continue - } - peer := peers[peerName] - t.Logf("waiting for doc version %#v on %s, update written from %s", expectedVersion, peer, expectedVersion.updatePeer) - body := peer.WaitForDocVersion(dsName, docID, expectedVersion.docMeta) - requireBodyEqual(t, expectedVersion.body, body) - } -} - func waitForDeletion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string, deleteActor string) { for peerName, peer := range peers { if peer.Type() == PeerTypeCouchbaseLite { diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index 9ef4f2522f..b30fca941f 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -69,8 +69,7 @@ func TestMultiActorConflictUpdate(t *testing.T) { docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - // FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists - waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion) + waitForVersionAndBody(t, collectionName, peers, docID, docVersion) }) } } @@ -155,7 +154,7 @@ func TestMultiActorConflictResurrect(t *testing.T) { lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, lastWriteVersion) + waitForVersionAndBody(t, collectionName, peers, docID, lastWriteVersion) }) } } diff --git a/topologytest/multi_actor_no_conflict_test.go b/topologytest/multi_actor_no_conflict_test.go index 96be958b1e..2b41ff7591 100644 --- a/topologytest/multi_actor_no_conflict_test.go +++ b/topologytest/multi_actor_no_conflict_test.go @@ -58,8 +58,7 @@ func TestMultiActorUpdate(t *testing.T) { for peerName := range peers.SortedPeers() { docID := getDocID(t) + "_" + peerName docBodyAndVersion := docVersionList[peerName] - // FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists - waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docBodyAndVersion) + waitForVersionAndBody(t, collectionName, peers, docID, docBodyAndVersion) } }) @@ -126,8 +125,7 @@ func TestMultiActorResurrect(t *testing.T) { for updatePeerName := range peers { docID := getDocID(t) + "_" + updatePeerName docVersion := docVersionList[updatePeerName] - // FIXME: CBG-4417 this can be replaced with waitForVersionAndBody when implicit HLV exists - waitForVersionAndBodyOnNonActivePeers(t, collectionName, docID, peers, docVersion) + waitForVersionAndBody(t, collectionName, peers, docID, docVersion) } }) } diff --git a/topologytest/peer_test.go b/topologytest/peer_test.go index e270651c37..ca0cc48eb8 100644 --- a/topologytest/peer_test.go +++ b/topologytest/peer_test.go @@ -358,7 +358,7 @@ func TestPeerImplementation(t *testing.T) { updateBody := []byte(`{"op": "update"}`) updateVersion := peer.WriteDocument(collectionName, docID, updateBody) require.NotEmpty(t, updateVersion.docMeta.CV) - require.NotEqual(t, updateVersion.docMeta.CV(), createVersion.docMeta.CV()) + require.NotEqual(t, updateVersion.docMeta.CV(t), createVersion.docMeta.CV(t)) if tc.peerOption.Type == PeerTypeCouchbaseServer { require.Empty(t, updateVersion.docMeta.RevTreeID) } else { @@ -374,9 +374,9 @@ func TestPeerImplementation(t *testing.T) { // Delete deleteVersion := peer.DeleteDocument(collectionName, docID) - require.NotEmpty(t, deleteVersion.CV()) - require.NotEqual(t, deleteVersion.CV(), updateVersion.docMeta.CV()) - require.NotEqual(t, deleteVersion.CV(), createVersion.docMeta.CV()) + require.NotEmpty(t, deleteVersion.CV(t)) + require.NotEqual(t, deleteVersion.CV(t), updateVersion.docMeta.CV(t)) + require.NotEqual(t, deleteVersion.CV(t), createVersion.docMeta.CV(t)) if tc.peerOption.Type == PeerTypeCouchbaseServer { require.Empty(t, deleteVersion.RevTreeID) } else { @@ -390,10 +390,10 @@ func TestPeerImplementation(t *testing.T) { resurrectionBody := []byte(`{"op": "resurrection"}`) resurrectionVersion := peer.WriteDocument(collectionName, docID, resurrectionBody) - require.NotEmpty(t, resurrectionVersion.docMeta.CV()) - require.NotEqual(t, resurrectionVersion.docMeta.CV(), deleteVersion.CV()) - require.NotEqual(t, resurrectionVersion.docMeta.CV(), updateVersion.docMeta.CV()) - require.NotEqual(t, resurrectionVersion.docMeta.CV(), createVersion.docMeta.CV()) + require.NotEmpty(t, resurrectionVersion.docMeta.CV(t)) + require.NotEqual(t, resurrectionVersion.docMeta.CV(t), deleteVersion.CV(t)) + require.NotEqual(t, resurrectionVersion.docMeta.CV(t), updateVersion.docMeta.CV(t)) + require.NotEqual(t, resurrectionVersion.docMeta.CV(t), createVersion.docMeta.CV(t)) if tc.peerOption.Type == PeerTypeCouchbaseServer { require.Empty(t, resurrectionVersion.docMeta.RevTreeID) } else { diff --git a/topologytest/sync_gateway_peer_test.go b/topologytest/sync_gateway_peer_test.go index 3ebbf0f48e..42d43fceb8 100644 --- a/topologytest/sync_gateway_peer_test.go +++ b/topologytest/sync_gateway_peer_test.go @@ -139,7 +139,7 @@ func (p *SyncGatewayPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID // Only assert on CV since RevTreeID might not be present if this was a Couchbase Server write bodyBytes, err := doc.BodyBytes(ctx) assert.NoError(c, err) - assert.Equal(c, expected.CV(), version.CV(), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes)) + assert.Equal(c, expected.CV(c), version.CV(c), "Could not find matching CV on %s for peer %s (sourceID:%s)\nexpected: %#v\nactual: %#v\n body: %+v\n", docID, p, p.SourceID(), expected, version, string(bodyBytes)) }, totalWaitTime, pollInterval) return doc.Body(ctx) } diff --git a/topologytest/version_test.go b/topologytest/version_test.go index 4da7040c33..0ae24fa70a 100644 --- a/topologytest/version_test.go +++ b/topologytest/version_test.go @@ -10,34 +10,32 @@ package topologytest import ( "fmt" + "testing" "github.com/couchbase/sync_gateway/db" "github.com/couchbase/sync_gateway/rest" + "github.com/stretchr/testify/require" ) // DocMetadata is a struct that contains metadata about a document. It contains the relevant information for testing versions of documents, as well as debugging information. type DocMetadata struct { - DocID string // DocID is the document ID - RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present - HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present - Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present - Cas uint64 // Cas is the cas value of the document - ImplicitCV *db.Version // ImplicitCV is the version of the document, if there was no HLV + DocID string // DocID is the document ID + RevTreeID string // RevTreeID is the rev treee ID of a document, may be empty not present + HLV *db.HybridLogicalVector // HLV is the hybrid logical vector of the document, may not be present + Mou *db.MetadataOnlyUpdate // Mou is the metadata only update of the document, may not be present + Cas uint64 // Cas is the cas value of the document + ImplicitHLV *db.HybridLogicalVector // ImplicitHLV is the version of the document, if there was no HLV } // CV returns the current version of the document. -func (v DocMetadata) CV() db.Version { - if v.HLV == nil { - // If there is no HLV, then the version is implicit from the current ver@sourceID - if v.ImplicitCV == nil { - return db.Version{} - } - return *v.ImplicitCV - } - return db.Version{ - SourceID: v.HLV.SourceID, - Value: v.HLV.Version, +func (v DocMetadata) CV(t require.TestingT) db.Version { + if v.ImplicitHLV != nil { + return *v.ImplicitHLV.ExtractCurrentVersionFromHLV() + } else if v.HLV != nil { + return *v.HLV.ExtractCurrentVersionFromHLV() } + require.FailNow(t, "no hlv available %#v", v) + return db.Version{} } // DocMetadataFromDocument returns a DocVersion from the given document. @@ -52,14 +50,17 @@ func DocMetadataFromDocument(doc *db.Document) DocMetadata { } func (v DocMetadata) GoString() string { - return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitCV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitCV) + return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV) } // DocMetadataFromDocVersion returns metadata DocVersion from the given document and version. -func DocMetadataFromDocVersion(docID string, version rest.DocVersion) DocMetadata { +func DocMetadataFromDocVersion(t testing.TB, docID string, version rest.DocVersion) DocMetadata { + // FIXME: CBG-4257, this should read the existing HLV on doc, until this happens, pv is always missing + hlv := db.NewHybridLogicalVector() + require.NoError(t, hlv.AddVersion(version.CV)) return DocMetadata{ - DocID: docID, - RevTreeID: version.RevTreeID, - ImplicitCV: &version.CV, + DocID: docID, + RevTreeID: version.RevTreeID, + ImplicitHLV: hlv, } }