Skip to content

Commit

Permalink
CBG-3212: add api to fetch a document by its CV value (#6579)
Browse files Browse the repository at this point in the history
* CBG-3212: add api to fetch a document by its CV value

* test fix

* rebased SourceAndVersion -> Version rename

* Update currentRevChannels on CV revcache load and doc.updateChannels

* fix spelling

* Remove currentRevChannels

* Move common GetRev/GetCV work into documentRevisionForRequest function

* Pass revision.RevID into authorizeUserForChannels

* Update db/crud.go

Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>

---------

Co-authored-by: Ben Brooks <ben.brooks@couchbase.com>
Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
  • Loading branch information
3 people authored and adamcfraser committed Aug 9, 2024
1 parent 6e0827c commit 2e3b5e3
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 47 deletions.
4 changes: 2 additions & 2 deletions db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestDocDeletionFromChannelCoalescedRemoved(t *testing.T) {
func TestCVPopulationOnChangeEntry(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID

Expand Down Expand Up @@ -561,7 +561,7 @@ func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyDCP, base.KeyCache, base.KeyHTTP)
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)
Expand Down
52 changes: 41 additions & 11 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
// No rev ID given, so load active revision
revision, err = db.revisionCache.GetActive(ctx, docid)
}

if err != nil {
return DocumentRevision{}, err
}

return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
}

// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
// ensure only one of cv or revID is specified
if cv != nil && revID != nil {
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
}
var requestedVersion string
if revID != nil {
requestedVersion = *revID
} else if cv != nil {
requestedVersion = cv.String()
}

if revision.BodyBytes == nil {
if db.ForceAPIForbiddenErrors() {
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
return DocumentRevision{}, ErrForbidden
}
return DocumentRevision{}, ErrMissing
Expand All @@ -340,16 +355,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
}

isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
if !isAuthorized {
if revid == "" {
// client just wanted active revision, not a specific one
if requestedVersion == "" {
return DocumentRevision{}, ErrForbidden
}
if db.ForceAPIForbiddenErrors() {
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
return DocumentRevision{}, ErrForbidden
}
return redactedRev, nil
return redactedRevision, nil
}

// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
Expand All @@ -358,13 +374,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
return DocumentRevision{}, ErrMissing
}

if revision.Deleted && revid == "" {
if revision.Deleted && requestedVersion == "" {
return DocumentRevision{}, ErrDeleted
}

return revision, nil
}

func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
if cv != nil {
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, RevCacheOmitDelta)
} else {
revision, err = db.revisionCache.GetActive(ctx, docid)
}
if err != nil {
return DocumentRevision{}, err
}

return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
}

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
Expand Down Expand Up @@ -396,7 +425,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
if fromRevision.Delta != nil {
if fromRevision.Delta.ToRevID == toRevID {

isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand All @@ -419,7 +448,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
}

deleted := toRevision.Deleted
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand Down Expand Up @@ -478,7 +507,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, nil
}

func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {

if col.user != nil {
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
Expand All @@ -490,6 +519,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
RevID: revID,
History: history,
Deleted: isDeleted,
CV: cv,
}
if isDeleted {
// Deletions are denoted by the deleted message property during 2.x replication
Expand Down Expand Up @@ -1045,7 +1075,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
if existingDoc != nil {
doc, unmarshalErr := db.unmarshalDocumentWithXattrs(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattrs, existingDoc.Cas, DocUnmarshalRev)
if unmarshalErr != nil {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
}
matchRev = doc.CurrentRev
}
Expand Down
179 changes: 179 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1957,3 +1958,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
}

// TestGetCVWithDocResidentInCache:
// - Two test cases, one with doc a user will have access to, one without
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
// - Assert that the doc the user has access to is corrected fetched
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
func TestGetCVWithDocResidentInCache(t *testing.T) {
const docID = "doc1"

testCases := []struct {
name string
docChannels []string
access bool
}{
{
name: "getCVWithUserAccess",
docChannels: []string{"A"},
access: true,
},
{
name: "getCVWithoutUserAccess",
docChannels: []string{"B"},
access: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

// create doc with the channels for the test case
docBody := Body{"channels": testCase.docChannels}
rev, doc, err := collection.Put(ctx, docID, docBody)
require.NoError(t, err)

vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, docID, sv, true)
require.NoError(t, err)
if testCase.access {
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
} else {
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
}
})
}
}

// TestGetByCVForDocNotResidentInCache:
// - Setup db with rev cache size of 1
// - Put two docs forcing eviction of the first doc
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
// - Assert the doc revision fetched is correct to the first doc we created
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
Size: 1,
},
})
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

const (
doc1ID = "doc1"
doc2ID = "doc2"
)

revBody := Body{"channels": []string{"A"}}
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
require.NoError(t, err)

// put another doc that should evict first doc from cache
_, _, err = collection.Put(ctx, doc2ID, revBody)
require.NoError(t, err)

// get by CV should force a load from bucket and have a cache miss
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
require.NoError(t, err)

// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, doc1ID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
}

// TestGetCVActivePathway:
// - Two test cases, one with doc a user will have access to, one without
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
// - Assert doc that is created is fetched correctly when user has access to doc
// - Assert that correct error is returned when user has no access to the doc
func TestGetCVActivePathway(t *testing.T) {
const docID = "doc1"

testCases := []struct {
name string
docChannels []string
access bool
}{
{
name: "activeFetchWithUserAccess",
docChannels: []string{"A"},
access: true,
},
{
name: "activeFetchWithoutUserAccess",
docChannels: []string{"B"},
access: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

// test get active path by specifying nil cv
revBody := Body{"channels": testCase.docChannels}
rev, doc, err := collection.Put(ctx, docID, revBody)
require.NoError(t, err)
revision, err := collection.GetCV(ctx, docID, nil, true)

if testCase.access == true {
require.NoError(t, err)
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
} else {
require.Error(t, err)
assert.ErrorContains(t, err, ErrForbidden.Error())
assert.Equal(t, DocumentRevision{}, revision)
}
})
}
}
2 changes: 1 addition & 1 deletion db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,7 +1839,7 @@ func TestChannelQuery(t *testing.T) {

db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)
_, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) {
channel(doc.channels);
}`)
Expand Down
Loading

0 comments on commit 2e3b5e3

Please sign in to comment.