Skip to content

Commit

Permalink
CBG-3877 Persist HLV to _vv xattr (#6843)
Browse files Browse the repository at this point in the history
Modifies document marshal and unmarshal to support a set of xattrs (_sync, _vv), and does the same for parsing DCP stream events (including user xattr).
  • Loading branch information
adamcfraser committed Nov 27, 2024
1 parent 59b292c commit 3df720e
Show file tree
Hide file tree
Showing 19 changed files with 299 additions and 160 deletions.
1 change: 1 addition & 0 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const (
SyncPropertyName = "_sync"
// SyncXattrName is used when storing sync data in a document's xattrs.
SyncXattrName = "_sync"
VvXattrName = "_vv"

// MouXattrName is used when storing metadata-only update information in a document's xattrs.
MouXattrName = "_mou"
Expand Down
4 changes: 2 additions & 2 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
}
var doc1DCPBytes []byte
if base.TestUseXattrs() {
body, syncXattr, _, err := doc1.MarshalWithXattrs()
body, syncXattr, _, _, err := doc1.MarshalWithXattrs()
require.NoError(t, err)
doc1DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand All @@ -1482,7 +1482,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
var dataType sgbucket.FeedDataType = base.MemcachedDataTypeJSON
if base.TestUseXattrs() {
dataType |= base.MemcachedDataTypeXattr
body, syncXattr, _, err := doc2.MarshalWithXattrs()
body, syncXattr, _, _, err := doc2.MarshalWithXattrs()
require.NoError(t, err)
doc2DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand Down
32 changes: 22 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return nil, nil, base.HTTPErrorf(400, "Invalid doc ID")
}
if c.UseXattrs() {
doc, rawBucketDoc, err = c.GetDocWithXattr(ctx, key, unmarshalLevel)
doc, rawBucketDoc, err = c.GetDocWithXattrs(ctx, key, unmarshalLevel)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return doc, rawBucketDoc, nil
}

func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
Expand Down Expand Up @@ -190,14 +190,20 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

}

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel)

}

// This gets *just* the Sync Metadata (_sync field) rather than the entire doc, for efficiency
// reasons. Unlike GetDocSyncData it does not check for on-demand import; this means it does not
// need to read the doc body from the bucket.
func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid string, level DocumentUnmarshalLevel) (syncData SyncData, err error) {
if db.UseXattrs() {
var xattrs map[string][]byte
var cas uint64
xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName})
xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName, base.VvXattrName})
if err == nil {
var doc *Document
doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, level)
Expand Down Expand Up @@ -232,7 +238,7 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid
return
}

// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// if the document gets updated after the initial retrieval attempt that triggered this.
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
isDelete := rawDoc == nil
Expand Down Expand Up @@ -2274,13 +2280,19 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

// Return the new raw document value for the bucket to store.
doc.SetCrc32cUserXattrHash()
var rawSyncXattr, rawMouXattr, rawDocBody []byte
rawDocBody, rawSyncXattr, rawMouXattr, err = doc.MarshalWithXattrs()

var rawSyncXattr, rawMouXattr, rawVvXattr, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, err = doc.MarshalWithXattrs()
if err != nil {
return updatedDoc, err
}

if len(rawDocBody) > 0 {
updatedDoc.Doc = rawDocBody
docBytes = len(updatedDoc.Doc)
}
updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr}

updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr, base.VvXattrName: rawVvXattr}
if rawMouXattr != nil && db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
}
Expand Down Expand Up @@ -2898,7 +2910,7 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un
return nil, nil
}

doc, rawDocument, err := c.GetDocWithXattr(ctx, key, unmarshalLevel)
doc, rawDocument, err := c.GetDocWithXattrs(ctx, key, unmarshalLevel)
if err != nil || doc == nil || !doc.HasValidSyncData() {
return nil, nil
}
Expand Down Expand Up @@ -3051,8 +3063,8 @@ const (
xattrMacroCas = "cas" // SyncData.Cas
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
xattrMacroCurrentRevVersion = "rev.ver" // SyncDataJSON.RevAndVersion.CurrentVersion
versionVectorVrsMacro = "_vv.ver" // PersistedHybridLogicalVector.Version
versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version
versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS

expandMacroCASValue = "expand" // static value that indicates that a CAS macro expansion should be applied to a property
)
Expand Down
2 changes: 1 addition & 1 deletion db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestHasAttachmentsFlagForLegacyAttachments(t *testing.T) {
require.NoError(t, err)

// Get the existing bucket doc
_, existingBucketDoc, err := collection.GetDocWithXattr(ctx, docID, DocUnmarshalAll)
_, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, docID, DocUnmarshalAll)
require.NoError(t, err)

// Migrate document metadata from document body to system xattr.
Expand Down
5 changes: 3 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1874,11 +1874,12 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.metadataOnlyUpdate)
}

_, rawXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs()
_, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawXattr,
base.SyncXattrName: rawSyncXattr,
base.VvXattrName: rawVvXattr,
},
Expiry: updatedExpiry,
}
Expand Down
4 changes: 2 additions & 2 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions {

// syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName}
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
Expand All @@ -249,7 +249,7 @@ func (c *DatabaseCollection) syncAndUserXattrKeys() []string {

// syncMouAndUserXattrKeys returns the xattr keys for the user, mou and sync xattrs.
func (c *DatabaseCollection) syncMouAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName}
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName)
}
Expand Down
Loading

0 comments on commit 3df720e

Please sign in to comment.