Skip to content

Commit

Permalink
CBG-3877 Persist HLV to _vv xattr (#6843)
Browse files Browse the repository at this point in the history
Modifies document marshal and unmarshal to support a set of xattrs (_sync, _vv), and does the same for parsing DCP stream events (including user xattr).
  • Loading branch information
adamcfraser authored May 25, 2024
1 parent 72b0624 commit c5b4ee4
Show file tree
Hide file tree
Showing 20 changed files with 329 additions and 216 deletions.
1 change: 1 addition & 0 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const (
SyncPropertyName = "_sync"
// SyncXattrName is used when storing sync data in a document's xattrs.
SyncXattrName = "_sync"
VvXattrName = "_vv"

// Intended to be used in Meta Map and related tests
MetaMapXattrsKey = "xattrs"
Expand Down
3 changes: 2 additions & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
}

// First unmarshal the doc (just its metadata, to save time/memory):
syncData, rawBody, _, rawUserXattr, err := UnmarshalDocumentSyncDataFromFeed(docJSON, event.DataType, collection.userXattrKey(), false)
syncData, rawBody, rawXattrs, err := UnmarshalDocumentSyncDataFromFeed(docJSON, event.DataType, collection.userXattrKey(), false)
if err != nil {
// Avoid log noise related to failed unmarshaling of binary documents.
if event.DataType != base.MemcachedDataTypeRaw {
Expand All @@ -409,6 +409,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
}

// If using xattrs and this isn't an SG write, we shouldn't attempt to cache.
rawUserXattr := rawXattrs[collection.userXattrKey()]
if collection.UseXattrs() {
if syncData == nil {
return
Expand Down
59 changes: 33 additions & 26 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return nil, nil, base.HTTPErrorf(400, "Invalid doc ID")
}
if c.UseXattrs() {
doc, rawBucketDoc, err = c.GetDocWithXattr(ctx, key, unmarshalLevel)
doc, rawBucketDoc, err = c.GetDocWithXattrs(ctx, key, unmarshalLevel)
if err != nil {
return nil, nil, err
}
Expand All @@ -73,7 +73,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
// If existing doc wasn't an SG Write, import the doc.
if !isSgWrite {
var importErr error
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs[base.SyncXattrName], rawBucketDoc.Xattrs[c.userXattrKey()], rawBucketDoc.Cas)
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas)
if importErr != nil {
return nil, nil, importErr
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
return doc, rawBucketDoc, nil
}

func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
Expand All @@ -123,7 +123,7 @@ func (c *DatabaseCollection) GetDocWithXattr(ctx context.Context, key string, un
}

var unmarshalErr error
doc, unmarshalErr = unmarshalDocumentWithXattr(ctx, key, rawBucketDoc.Body, rawBucketDoc.Xattrs[base.SyncXattrName], rawBucketDoc.Xattrs[c.userXattrKey()], rawBucketDoc.Cas, unmarshalLevel)
doc, unmarshalErr = c.unmarshalDocumentWithXattrs(ctx, key, rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, unmarshalLevel)
if unmarshalErr != nil {
return nil, nil, unmarshalErr
}
Expand All @@ -148,11 +148,8 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
return emptySyncData, getErr
}

rawXattr := xattrs[base.SyncXattrName]
rawUserXattr := xattrs[c.userXattrKey()]

// Unmarshal xattr only
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, docid, nil, rawXattr, rawUserXattr, cas, DocUnmarshalSync)
doc, unmarshalErr := c.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, DocUnmarshalSync)
if unmarshalErr != nil {
return emptySyncData, unmarshalErr
}
Expand All @@ -166,7 +163,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
if !isSgWrite {
var importErr error

doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, rawXattr, rawUserXattr, cas)
doc, importErr = c.OnDemandImportForGet(ctx, docid, rawDoc, xattrs, cas)
if importErr != nil {
return emptySyncData, importErr
}
Expand All @@ -193,17 +190,23 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

}

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[db.userXattrKey()], cas, unmarshalLevel)

}

// This gets *just* the Sync Metadata (_sync field) rather than the entire doc, for efficiency
// reasons. Unlike GetDocSyncData it does not check for on-demand import; this means it does not
// need to read the doc body from the bucket.
func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid string, level DocumentUnmarshalLevel) (syncData SyncData, err error) {
if db.UseXattrs() {
var xattrs map[string][]byte
var cas uint64
xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName})
xattrs, cas, err = db.dataStore.GetXattrs(ctx, docid, []string{base.SyncXattrName, base.VvXattrName})
if err == nil {
var doc *Document
doc, err = unmarshalDocumentWithXattr(ctx, docid, nil, xattrs[base.SyncXattrName], nil, cas, level)
doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, nil, xattrs, cas, level)
if err == nil {
syncData = doc.SyncData
}
Expand Down Expand Up @@ -235,14 +238,14 @@ func (db *DatabaseCollection) GetDocSyncDataNoImport(ctx context.Context, docid
return
}

// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// OnDemandImportForGet. Attempts to import the doc based on the provided id, contents and cas. ImportDocRaw does cas retry handling
// if the document gets updated after the initial retrieval attempt that triggered this.
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, rawXattr []byte, rawUserXattr []byte, cas uint64) (docOut *Document, err error) {
func (c *DatabaseCollection) OnDemandImportForGet(ctx context.Context, docid string, rawDoc []byte, xattrs map[string][]byte, cas uint64) (docOut *Document, err error) {
isDelete := rawDoc == nil
importDb := DatabaseCollectionWithUser{DatabaseCollection: c, user: nil}
var importErr error

docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, rawXattr, rawUserXattr, isDelete, cas, nil, ImportOnDemand)
docOut, importErr = importDb.ImportDocRaw(ctx, docid, rawDoc, xattrs, isDelete, cas, nil, ImportOnDemand)
if importErr == base.ErrImportCancelledFilter {
// If the import was cancelled due to filter, treat as not found
return nil, base.HTTPErrorf(404, "Not imported")
Expand Down Expand Up @@ -1076,7 +1079,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, newDocHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs[base.SyncXattrName], existingDoc.Xattrs[db.userXattrKey()], existingDoc.Cas, DocUnmarshalRev)
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
if unmarshalErr != nil {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
}
Expand Down Expand Up @@ -2204,15 +2207,15 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
}
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
// Be careful: this block can be invoked multiple times if there are races!
currentXattr := currentXattrs[base.SyncXattrName]
currentUserXattr := currentXattrs[db.userXattrKey()]
if doc, err = unmarshalDocumentWithXattr(ctx, docid, currentValue, currentXattr, currentUserXattr, cas, DocUnmarshalAll); err != nil {

if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
}
prevCurrentRev = doc.CurrentRev

// Check whether Sync Data originated in body
if currentXattr == nil && doc.Sequence > 0 {
currentSyncXattr := currentXattrs[base.SyncXattrName]
if currentSyncXattr == nil && doc.Sequence > 0 {
doc.inlineSyncData = true
}

Expand Down Expand Up @@ -2249,18 +2252,22 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

// Return the new raw document value for the bucket to store.
doc.SetCrc32cUserXattrHash()
var rawXattr, rawDocBody []byte
rawDocBody, rawXattr, err = doc.MarshalWithXattr()

var rawSyncXattr, rawVvXattr, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, err = doc.MarshalWithXattrs()
if err != nil {
return updatedDoc, err
}
if !isImport {
updatedDoc.Doc = rawDocBody
docBytes = len(updatedDoc.Doc)
}
updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawXattr}
updatedDoc.Xattrs = map[string][]byte{base.SyncXattrName: rawSyncXattr, base.VvXattrName: rawVvXattr}

// Warn when sync data is larger than a configured threshold
if db.unsupportedOptions() != nil && db.unsupportedOptions().WarningThresholds != nil {
if xattrBytesThreshold := db.unsupportedOptions().WarningThresholds.XattrSize; xattrBytesThreshold != nil {
xattrBytes = len(rawXattr)
xattrBytes = len(rawSyncXattr)
if uint32(xattrBytes) >= *xattrBytesThreshold {
db.dbStats().Database().WarnXattrSizeCount.Add(1)
base.WarnfCtx(ctx, "Doc id: %v sync metadata size: %d bytes exceeds %d bytes for sync metadata warning threshold", base.UD(doc.ID), xattrBytes, *xattrBytesThreshold)
Expand Down Expand Up @@ -2815,7 +2822,7 @@ func (c *DatabaseCollection) checkForUpgrade(ctx context.Context, key string, un
return nil, nil
}

doc, rawDocument, err := c.GetDocWithXattr(ctx, key, unmarshalLevel)
doc, rawDocument, err := c.GetDocWithXattrs(ctx, key, unmarshalLevel)
if err != nil || doc == nil || !doc.HasValidSyncData() {
return nil, nil
}
Expand Down Expand Up @@ -2968,8 +2975,8 @@ const (
xattrMacroCas = "cas" // SyncData.Cas
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
xattrMacroCurrentRevVersion = "rev.ver" // SyncDataJSON.RevAndVersion.CurrentVersion
versionVectorVrsMacro = "_vv.ver" // PersistedHybridLogicalVector.Version
versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
versionVectorVrsMacro = "ver" // PersistedHybridLogicalVector.Version
versionVectorCVCASMacro = "cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
)

func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec {
Expand Down
2 changes: 1 addition & 1 deletion db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestHasAttachmentsFlagForLegacyAttachments(t *testing.T) {
require.NoError(t, err)

// Get the existing bucket doc
_, existingBucketDoc, err := collection.GetDocWithXattr(ctx, docID, DocUnmarshalAll)
_, existingBucketDoc, err := collection.GetDocWithXattrs(ctx, docID, DocUnmarshalAll)
require.NoError(t, err)

// Migrate document metadata from document body to system xattr.
Expand Down
7 changes: 4 additions & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
if currentValue == nil || len(currentValue) == 0 {
return sgbucket.UpdatedDoc{}, base.ErrUpdateCancel
}
doc, err := unmarshalDocumentWithXattr(ctx, docid, currentValue, currentXattrs[base.SyncXattrName], currentXattrs[db.userXattrKey()], cas, DocUnmarshalAll)
doc, err := db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll)
if err != nil {
return sgbucket.UpdatedDoc{}, err
}
Expand All @@ -1808,11 +1808,12 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
updatedDoc.UpdateExpiry(*updatedExpiry)
}
doc.SetCrc32cUserXattrHash()
_, rawXattr, err := updatedDoc.MarshalWithXattr()
_, rawSyncXattr, rawVvXattr, err := updatedDoc.MarshalWithXattrs()
return sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
base.SyncXattrName: rawXattr,
base.SyncXattrName: rawSyncXattr,
base.VvXattrName: rawVvXattr,
},
Expiry: updatedExpiry,
}, err
Expand Down
2 changes: 1 addition & 1 deletion db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions {

// syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName}
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
Expand Down
Loading

0 comments on commit c5b4ee4

Please sign in to comment.