Skip to content

Commit

Permalink
CBG-3213 Version support for channel removals (#6650)
Browse files Browse the repository at this point in the history
* CBG-3213 Version support for channel removals

Adds cv (source and version) to removals in _sync.channels (ChannelMap).  Uses RevAndVersion to support query (the same approached used for _sync.rev).

Required moving RevAndVersion to channels package for usage within ChannelMap.

Changes in crud.go required to support the case where the removal version needs to be set via macro expansion.

* Use standard function to update testBackingStore document channels
  • Loading branch information
adamcfraser authored Jan 26, 2024
1 parent d86ba4f commit 41f4d64
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 102 deletions.
53 changes: 47 additions & 6 deletions channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package channels
import (
"fmt"
"time"

"github.com/couchbase/sync_gateway/base"
)

// LogEntry
Expand Down Expand Up @@ -74,18 +76,18 @@ func (l LogEntry) String() string {

type ChannelMap map[string]*ChannelRemoval
type ChannelRemoval struct {
Seq uint64 `json:"seq,omitempty"`
RevID string `json:"rev"`
Deleted bool `json:"del,omitempty"`
Seq uint64 `json:"seq,omitempty"`
Rev RevAndVersion `json:"rev"`
Deleted bool `json:"del,omitempty"`
}

func (channelMap ChannelMap) ChannelsRemovedAtSequence(seq uint64) (ChannelMap, string) {
func (channelMap ChannelMap) ChannelsRemovedAtSequence(seq uint64) (ChannelMap, RevAndVersion) {
var channelsRemoved = make(ChannelMap)
var revIdRemoved string
var revIdRemoved RevAndVersion
for channel, removal := range channelMap {
if removal != nil && removal.Seq == seq {
channelsRemoved[channel] = removal
revIdRemoved = removal.RevID // Will be the same RevID for each removal
revIdRemoved = removal.Rev // Will be the same Rev for each removal
}
}
return channelsRemoved, revIdRemoved
Expand All @@ -100,3 +102,42 @@ func (channelMap ChannelMap) KeySet() []string {
}
return result
}

// RevAndVersion is used to store both revTreeID and currentVersion in a single property, for backwards compatibility
// with existing indexes using rev. When only RevTreeID is specified, is marshalled/unmarshalled as a string. Otherwise
// marshalled normally.
type RevAndVersion struct {
RevTreeID string `json:"rev,omitempty"`
CurrentSource string `json:"src,omitempty"`
CurrentVersion string `json:"vrs,omitempty"` // String representation of version
}

// RevAndVersionJSON aliases RevAndVersion to support conditional unmarshalling from either string (revTreeID) or
// map (RevAndVersion) representations
type RevAndVersionJSON RevAndVersion

// Marshals RevAndVersion as simple string when only RevTreeID is specified - otherwise performs standard
// marshalling
func (rv RevAndVersion) MarshalJSON() (data []byte, err error) {

if rv.CurrentSource == "" {
return base.JSONMarshal(rv.RevTreeID)
}
return base.JSONMarshal(RevAndVersionJSON(rv))
}

// Unmarshals either from string (legacy, revID only) or standard RevAndVersion unmarshalling.
func (rv *RevAndVersion) UnmarshalJSON(data []byte) error {

if len(data) == 0 {
return nil
}
switch data[0] {
case '"':
return base.JSONUnmarshal(data, &rv.RevTreeID)
case '{':
return base.JSONUnmarshal(data, (*RevAndVersionJSON)(rv))
default:
return fmt.Errorf("unrecognized JSON format for RevAndVersion: %s", data)
}
}
8 changes: 5 additions & 3 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (entry *LogEntry) SetDeleted() {
entry.Flags |= channels.Deleted
}

func (entry *LogEntry) SetRevAndVersion(rv RevAndVersion) {
func (entry *LogEntry) SetRevAndVersion(rv channels.RevAndVersion) {
entry.RevID = rv.RevTreeID
if rv.CurrentSource != "" {
entry.SourceID = rv.CurrentSource
Expand Down Expand Up @@ -491,9 +491,11 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {

// if the doc was removed from one or more channels at this sequence
// Set the removed flag and removed channel set on the LogEntry
if channelRemovals, atRevId := syncData.Channels.ChannelsRemovedAtSequence(seq); len(channelRemovals) > 0 {
if channelRemovals, atRev := syncData.Channels.ChannelsRemovedAtSequence(seq); len(channelRemovals) > 0 {
change.DocID = docID
change.RevID = atRevId
change.RevID = atRev.RevTreeID
change.SourceID = atRev.CurrentSource
change.Version = base.HexCasToUint64(atRev.CurrentVersion)
change.Channels = channelRemovals
}

Expand Down
2 changes: 1 addition & 1 deletion db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) {
sync["recent_sequences"] = []uint64{1, 2, 3}

cm := make(channels.ChannelMap)
cm["A"] = &channels.ChannelRemoval{Seq: 2, RevID: "2-e99405a23fa102238fa8c3fd499b15bc"}
cm["A"] = &channels.ChannelRemoval{Seq: 2, Rev: channels.RevAndVersion{RevTreeID: "2-e99405a23fa102238fa8c3fd499b15bc"}}
sync["channels"] = cm

history := sync["history"].(map[string]interface{})
Expand Down
9 changes: 6 additions & 3 deletions db/changes_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
)

// One "changes" row in a channelsViewResult
type channelsViewRow struct {
ID string
Key []interface{} // Actually [channelName, sequence]
Value struct {
Rev RevAndVersion
Rev channels.RevAndVersion
Flags uint8
}
}
Expand Down Expand Up @@ -66,8 +67,10 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter
}
entry.SetRevAndVersion(queryRow.Rev)

if queryRow.RemovalRev != "" {
entry.RevID = queryRow.RemovalRev
if queryRow.RemovalRev != nil {
entry.RevID = queryRow.RemovalRev.RevTreeID
entry.Version = base.HexCasToUint64(queryRow.RemovalRev.CurrentVersion)
entry.SourceID = queryRow.RemovalRev.CurrentSource
if queryRow.RemovalDel {
entry.SetDeleted()
}
Expand Down
50 changes: 40 additions & 10 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,27 @@ func (db *DatabaseCollectionWithUser) IsIllegalConflict(ctx context.Context, doc
return true
}

func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, docExists bool, doc *Document, allowImport bool, previousDocSequenceIn uint64, unusedSequences []uint64, callback updateAndReturnDocCallback, expiry *uint32) (retSyncFuncExpiry *uint32, retNewRevID string, retStoredDoc *Document, retOldBodyJSON string, retUnusedSequences []uint64, changedAccessPrincipals []string, changedRoleAccessUsers []string, createNewRevIDSkipped bool, err error) {
func (col *DatabaseCollectionWithUser) documentUpdateFunc(
ctx context.Context,
docExists bool,
doc *Document,
allowImport bool,
previousDocSequenceIn uint64,
unusedSequences []uint64,
callback updateAndReturnDocCallback,
expiry *uint32,
docUpdateEvent DocUpdateType,
) (
retSyncFuncExpiry *uint32,
retNewRevID string,
retStoredDoc *Document,
retOldBodyJSON string,
retUnusedSequences []uint64,
changedAccessPrincipals []string,
changedRoleAccessUsers []string,
createNewRevIDSkipped bool,
revokedChannelsRequiringExpansion []string,
err error) {

err = validateExistingDoc(doc, allowImport, docExists)
if err != nil {
Expand Down Expand Up @@ -1963,6 +1983,14 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d
return
}

// The callback has updated the HLV for mutations coming from CBL. Update the HLV so that the current version is set before
// we call updateChannels, which needs to set the current version for removals
// update the HLV values
doc, err = col.updateHLV(doc, docUpdateEvent)
if err != nil {
return
}

if doc.CurrentRev != prevCurrentRev || createNewRevIDSkipped {
// Most of the time this update will change the doc's current rev. (The exception is
// if the new rev is a conflict that doesn't win the revid comparison.) If so, we
Expand All @@ -1974,7 +2002,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d
return
}
}
_, err = doc.updateChannels(ctx, channelSet)
_, revokedChannelsRequiringExpansion, err = doc.updateChannels(ctx, channelSet)
if err != nil {
return
}
Expand All @@ -1999,7 +2027,7 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(ctx context.Context, d

doc.ClusterUUID = col.serverUUID()
doc.TimeSaved = time.Now()
return updatedExpiry, newRevID, newDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err
return updatedExpiry, newRevID, newDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, revokedChannelsRequiringExpansion, err
}

// Function type for the callback passed into updateAndReturnDoc
Expand Down Expand Up @@ -2049,7 +2077,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
}
prevCurrentRev = doc.CurrentRev
docExists := currentValue != nil
syncFuncExpiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err = db.documentUpdateFunc(ctx, docExists, doc, allowImport, docSequence, unusedSequences, callback, expiry)
syncFuncExpiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, _, err = db.documentUpdateFunc(ctx, docExists, doc, allowImport, docSequence, unusedSequences, callback, expiry, docUpdateEvent)
if err != nil {
return
}
Expand Down Expand Up @@ -2103,7 +2131,8 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
}

docExists := currentValue != nil
syncFuncExpiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, err = db.documentUpdateFunc(ctx, docExists, doc, allowImport, docSequence, unusedSequences, callback, expiry)
var revokedChannelsRequiringExpansion []string
syncFuncExpiry, newRevID, storedDoc, oldBodyJSON, unusedSequences, changedAccessPrincipals, changedRoleAccessUsers, createNewRevIDSkipped, revokedChannelsRequiringExpansion, err = db.documentUpdateFunc(ctx, docExists, doc, allowImport, docSequence, unusedSequences, callback, expiry, docUpdateEvent)
if err != nil {
return
}
Expand All @@ -2119,14 +2148,11 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
return
}

// update the HLV values
doc, err = db.updateHLV(doc, docUpdateEvent)
if err != nil {
return
}
// update the mutate in options based on the above logic
updatedSpec = doc.SyncData.HLV.computeMacroExpansions()

updatedSpec = appendRevocationMacroExpansions(updatedSpec, revokedChannelsRequiringExpansion)

deleteDoc = currentRevFromHistory.Deleted

// Return the new raw document value for the bucket to store.
Expand Down Expand Up @@ -2828,3 +2854,7 @@ func xattrCurrentVersionPath(xattrKey string) string {
func xattrCurrentVersionCASPath(xattrKey string) string {
return xattrKey + "." + versionVectorCVCASMacro
}

func xattrRevokedChannelVersionPath(xattrKey string, channelName string) string {
return xattrKey + ".channels." + channelName + "." + xattrMacroCurrentRevVersion
}
2 changes: 1 addition & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
forceUpdate = true
}

changedChannels, err := doc.updateChannels(ctx, channels)
changedChannels, _, err := doc.updateChannels(ctx, channels)
changed = len(doc.Access.updateAccess(ctx, doc, access)) +
len(doc.RoleAccess.updateAccess(ctx, doc, roles)) +
len(changedChannels)
Expand Down
Loading

0 comments on commit 41f4d64

Please sign in to comment.