Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4206: read/write attachments to global sync xattr #7107

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ const (
// SyncPropertyName is used when storing sync data inline in a document.
SyncPropertyName = "_sync"
// SyncXattrName is used when storing sync data in a document's xattrs.
SyncXattrName = "_sync"
VvXattrName = "_vv"
SyncXattrName = "_sync"
VvXattrName = "_vv"
GlobalXattrName = "_globalSync"

// MouXattrName is used when storing metadata-only update information in a document's xattrs.
MouXattrName = "_mou"
Expand Down
4 changes: 2 additions & 2 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
}
var doc1DCPBytes []byte
if base.TestUseXattrs() {
body, syncXattr, _, _, err := doc1.MarshalWithXattrs()
body, syncXattr, _, _, _, err := doc1.MarshalWithXattrs()
require.NoError(t, err)
doc1DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand All @@ -1487,7 +1487,7 @@ func TestLateArrivingSequenceTriggersOnChange(t *testing.T) {
var dataType sgbucket.FeedDataType = base.MemcachedDataTypeJSON
if base.TestUseXattrs() {
dataType |= base.MemcachedDataTypeXattr
body, syncXattr, _, _, err := doc2.MarshalWithXattrs()
body, syncXattr, _, _, _, err := doc2.MarshalWithXattrs()
require.NoError(t, err)
doc2DCPBytes = sgbucket.EncodeValueWithXattrs(body, sgbucket.Xattr{Name: base.SyncXattrName, Value: syncXattr})
} else {
Expand Down
19 changes: 13 additions & 6 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *DatabaseCollection) GetDocumentWithRaw(ctx context.Context, docid strin
func (c *DatabaseCollection) GetDocWithXattrs(ctx context.Context, key string, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, rawBucketDoc *sgbucket.BucketDocument, err error) {
rawBucketDoc = &sgbucket.BucketDocument{}
var getErr error
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
rawBucketDoc.Body, rawBucketDoc.Xattrs, rawBucketDoc.Cas, getErr = c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
if getErr != nil {
return nil, nil, getErr
}
Expand All @@ -143,7 +143,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (
if c.UseXattrs() {
// Retrieve doc and xattr from bucket, unmarshal only xattr.
// Triggers on-demand import when document xattr doesn't match cas.
rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncAndUserXattrKeys())
rawDoc, xattrs, cas, getErr := c.dataStore.GetWithXattrs(ctx, key, c.syncGlobalSyncAndUserXattrKeys())
if getErr != nil {
return emptySyncData, getErr
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (c *DatabaseCollection) GetDocSyncData(ctx context.Context, docid string) (

// unmarshalDocumentWithXattrs populates individual xattrs on unmarshalDocumentWithXattrs from a provided xattrs map
func (db *DatabaseCollection) unmarshalDocumentWithXattrs(ctx context.Context, docid string, data []byte, xattrs map[string][]byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], cas, unmarshalLevel)
return unmarshalDocumentWithXattrs(ctx, docid, data, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[db.userXattrKey()], xattrs[base.VirtualXattrRevSeqNo], xattrs[base.GlobalXattrName], cas, unmarshalLevel)

}

Expand Down Expand Up @@ -2239,7 +2239,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if expiry != nil {
initialExpiry = *expiry
}
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
casOut, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), initialExpiry, existingDoc, opts, func(currentValue []byte, currentXattrs map[string][]byte, cas uint64) (updatedDoc sgbucket.UpdatedDoc, err error) {
// Be careful: this block can be invoked multiple times if there are races!
if doc, err = db.unmarshalDocumentWithXattrs(ctx, docid, currentValue, currentXattrs, cas, DocUnmarshalAll); err != nil {
return
Expand Down Expand Up @@ -2295,8 +2295,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
// Return the new raw document value for the bucket to store.
doc.SetCrc32cUserXattrHash()

var rawSyncXattr, rawMouXattr, rawVvXattr, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, err = doc.MarshalWithXattrs()
var rawSyncXattr, rawMouXattr, rawVvXattr, rawGlobalSync, rawDocBody []byte
rawDocBody, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalSync, err = doc.MarshalWithXattrs()
if err != nil {
return updatedDoc, err
}
Expand All @@ -2310,6 +2310,13 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
if rawMouXattr != nil && db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
}
if rawGlobalSync != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalSync
} else {
if currentXattrs[base.GlobalXattrName] != nil && !isNewDocCreation {
updatedDoc.XattrsToDelete = append(updatedDoc.XattrsToDelete, base.GlobalXattrName)
}
}

// Warn when sync data is larger than a configured threshold
if db.unsupportedOptions() != nil && db.unsupportedOptions().WarningThresholds != nil {
Expand Down
7 changes: 5 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
doc.metadataOnlyUpdate = computeMetadataOnlyUpdate(doc.Cas, doc.RevSeqNo, doc.metadataOnlyUpdate)
}

_, rawSyncXattr, rawVvXattr, rawMouXattr, err := updatedDoc.MarshalWithXattrs()
_, rawSyncXattr, rawVvXattr, rawMouXattr, rawGlobalXattr, err := updatedDoc.MarshalWithXattrs()
updatedDoc := sgbucket.UpdatedDoc{
Doc: nil, // Resync does not require document body update
Xattrs: map[string][]byte{
Expand All @@ -1871,12 +1871,15 @@ func (db *DatabaseCollectionWithUser) resyncDocument(ctx context.Context, docid,
if db.useMou() {
updatedDoc.Xattrs[base.MouXattrName] = rawMouXattr
}
if rawGlobalXattr != nil {
updatedDoc.Xattrs[base.GlobalXattrName] = rawGlobalXattr
}
return updatedDoc, err
}
opts := &sgbucket.MutateInOptions{
MacroExpansion: macroExpandSpec(base.SyncXattrName),
}
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
_, err = db.dataStore.WriteUpdateWithXattrs(ctx, key, db.syncGlobalSyncMouRevSeqNoAndUserXattrKeys(), 0, nil, opts, writeUpdateFunc)
} else {
_, err = db.dataStore.Update(key, 0, func(currentValue []byte) ([]byte, *uint32, bool, error) {
// Be careful: this block can be invoked multiple times if there are races!
Expand Down
12 changes: 6 additions & 6 deletions db/database_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,21 +237,21 @@ func (c *DatabaseCollection) unsupportedOptions() *UnsupportedOptions {
return c.dbCtx.Options.UnsupportedOptions
}

// syncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
// syncGlobalSyncAndUserXattrKeys returns the xattr keys for the user and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName, base.GlobalXattrName}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
return xattrKeys
}

// syncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncMouRevSeqNoAndUserXattrKeys() []string {
// syncGlobalSyncMouRevSeqNoAndUserXattrKeys returns the xattr keys for the user, mou, revSeqNo and sync xattrs.
func (c *DatabaseCollection) syncGlobalSyncMouRevSeqNoAndUserXattrKeys() []string {
xattrKeys := []string{base.SyncXattrName, base.VvXattrName}
if c.useMou() {
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo)
xattrKeys = append(xattrKeys, base.MouXattrName, base.VirtualXattrRevSeqNo, base.GlobalXattrName)
}
userXattrKey := c.userXattrKey()
if userXattrKey != "" {
Expand Down
57 changes: 44 additions & 13 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (sd *SyncData) HashRedact(salt string) SyncData {
// Document doesn't do any locking - document instances aren't intended to be shared across multiple goroutines.
type Document struct {
SyncData // Sync metadata
GlobalSyncData // Global sync metadata, this will hold non cluster specific sync metadata to be copied by XDCR
_body Body // Marshalled document body. Unmarshalled lazily - should be accessed using Body()
_rawBody []byte // Raw document body, as retrieved from the bucket. Marshaled lazily - should be accessed using BodyBytes()
ID string `json:"-"` // Doc id. (We're already using a custom MarshalJSON for *document that's based on body, so the json:"-" probably isn't needed here)
Expand All @@ -203,6 +204,10 @@ type Document struct {
RevSeqNo uint64 // Server rev seq no for a document
}

type GlobalSyncData struct {
GlobalAttachments AttachmentsMeta `json:"attachments_meta,omitempty"`
}

type historyOnlySyncData struct {
revOnlySyncData
History RevTree `json:"history"`
Expand Down Expand Up @@ -410,14 +415,14 @@ func unmarshalDocument(docid string, data []byte) (*Document, error) {
return doc, nil
}

func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, documentXattr []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {
func unmarshalDocumentWithXattrs(ctx context.Context, docid string, data, syncXattrData, hlvXattrData, mouXattrData, userXattrData, virtualXattr []byte, globalSyncData []byte, cas uint64, unmarshalLevel DocumentUnmarshalLevel) (doc *Document, err error) {

if len(syncXattrData) == 0 && len(hlvXattrData) == 0 {
// If no xattr data, unmarshal as standard doc
doc, err = unmarshalDocument(docid, data)
} else {
doc = NewDocument(docid)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, documentXattr, unmarshalLevel)
err = doc.UnmarshalWithXattrs(ctx, data, syncXattrData, hlvXattrData, virtualXattr, globalSyncData, unmarshalLevel)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -466,7 +471,7 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
var xattrValues map[string][]byte
var hlv *HybridLogicalVector
if dataType&base.MemcachedDataTypeXattr != 0 {
xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName}
xattrKeys := []string{base.SyncXattrName, base.MouXattrName, base.VvXattrName, base.GlobalXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
Expand Down Expand Up @@ -532,7 +537,7 @@ func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, da
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], cas, DocUnmarshalAll)
return unmarshalDocumentWithXattrs(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[base.VvXattrName], xattrs[base.MouXattrName], xattrs[userXattrKey], xattrs[base.VirtualXattrRevSeqNo], nil, cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {
Expand Down Expand Up @@ -1095,7 +1100,7 @@ func (doc *Document) MarshalJSON() (data []byte, err error) {
// unmarshalLevel is anything less than the full document + metadata, the raw data is retained for subsequent
// lazy unmarshalling as needed.
// Must handle cases where document body and hlvXattrData are present without syncXattrData for all DocumentUnmarshalLevel
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, documentXattr []byte, unmarshalLevel DocumentUnmarshalLevel) error {
func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrData, hlvXattrData, virtualXattr []byte, globalSyncData []byte, unmarshalLevel DocumentUnmarshalLevel) error {
if doc.ID == "" {
base.WarnfCtx(ctx, "Attempted to unmarshal document without ID set")
return errors.New("Document was unmarshalled without ID set")
Expand All @@ -1117,9 +1122,9 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
}
if documentXattr != nil {
if virtualXattr != nil {
var revSeqNo string
err := base.JSONUnmarshal(documentXattr, &revSeqNo)
err := base.JSONUnmarshal(virtualXattr, &revSeqNo)
if err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal doc virtual revSeqNo xattr during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalAll/Sync). Error: %v", base.UD(doc.ID), err))
}
Expand All @@ -1131,6 +1136,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
doc.RevSeqNo = revNo
}
}
if len(globalSyncData) > 0 {
if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil {
base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData)
}
doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments
}
doc._rawBody = data
// Unmarshal body if requested and present
if unmarshalLevel == DocUnmarshalAll && len(data) > 0 {
Expand All @@ -1151,6 +1162,12 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
return pkgerrors.WithStack(base.RedactErrorf("Failed to unmarshal HLV during UnmarshalWithXattrs() doc with id: %s (DocUnmarshalNoHistory). Error: %v", base.UD(doc.ID), err))
}
}
if len(globalSyncData) > 0 {
if err := base.JSONUnmarshal(globalSyncData, &doc.GlobalSyncData); err != nil {
base.WarnfCtx(ctx, "Failed to unmarshal globalSync xattr for key %v, globalSync will be ignored. Err: %v globalSync:%s", base.UD(doc.ID), err, globalSyncData)
}
doc.SyncData.Attachments = doc.GlobalSyncData.GlobalAttachments
}
doc._rawBody = data
case DocUnmarshalHistory:
if syncXattrData != nil {
Expand Down Expand Up @@ -1211,7 +1228,7 @@ func (doc *Document) UnmarshalWithXattrs(ctx context.Context, data, syncXattrDat
}

// MarshalWithXattrs marshals the Document into body, and sync, vv and mou xattrs for persistence.
func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXattr []byte, err error) {
func (doc *Document) MarshalWithXattrs() (data, syncXattr, vvXattr, mouXattr, globalXattr []byte, err error) {
// Grab the rawBody if it's already marshalled, otherwise unmarshal the body
if doc._rawBody != nil {
if !doc.IsDeleted() {
Expand All @@ -1228,31 +1245,45 @@ func (doc *Document) MarshalWithXattrs() (data []byte, syncXattr, vvXattr, mouXa
if !deleted {
data, err = base.JSONMarshal(body)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc body with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
}
}
if doc.SyncData.HLV != nil {
vvXattr, err = base.JSONMarshal(&doc.SyncData.HLV)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc vv with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
// assign any attachments we have stored in document sync data to global sync data
// then nil the sync data attachments to prevent marshalling of it
doc.GlobalSyncData.GlobalAttachments = doc.Attachments
doc.Attachments = nil

syncXattr, err = base.JSONMarshal(doc.SyncData)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
}

if doc.metadataOnlyUpdate != nil {
mouXattr, err = base.JSONMarshal(doc.metadataOnlyUpdate)
if err != nil {
return nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err))
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc MouData with id: %s. Error: %v", base.UD(doc.ID), err))
}
}
// marshal global xattrs if there are attachments defined
if len(doc.GlobalSyncData.GlobalAttachments) > 0 {
globalXattr, err = base.JSONMarshal(doc.GlobalSyncData)
if err != nil {
return nil, nil, nil, nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattrs() doc GlobalXattr with id: %s. Error: %v", base.UD(doc.ID), err))
}
// restore attachment meta to sync data post global xattr construction
doc.Attachments = make(AttachmentsMeta)
doc.Attachments = doc.GlobalSyncData.GlobalAttachments
}

return data, syncXattr, vvXattr, mouXattr, nil
return data, syncXattr, vvXattr, mouXattr, globalXattr, nil
}

// computeMetadataOnlyUpdate computes a new metadataOnlyUpdate based on the existing document's CAS and metadataOnlyUpdate
Expand Down
Loading
Loading