Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3843: include collection names in status of resync #6839

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions db/background_mgr_resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
// ======================================================

type ResyncManager struct {
DocsProcessed int
DocsChanged int
lock sync.Mutex
DocsProcessed int
DocsChanged int
ResyncedCollections []string
lock sync.Mutex
}

var _ BackgroundManagerProcessI = &ResyncManager{}
Expand Down Expand Up @@ -64,10 +65,12 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{},
persistClusterStatus()
}

collectionIDs, hasAllCollections, err := getCollectionIds(database, resyncCollections)
collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(database, resyncCollections)
if err != nil {
return err
}
// add collection list to manager for use in status call
r.SetCollectionStatus(collectionNames)
if hasAllCollections {
base.InfofCtx(ctx, base.KeyAll, "running resync against all collections")
} else {
Expand Down Expand Up @@ -99,6 +102,7 @@ func (r *ResyncManager) ResetStatus() {

r.DocsProcessed = 0
r.DocsChanged = 0
r.ResyncedCollections = nil
}

func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) {
Expand All @@ -109,10 +113,18 @@ func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) {
r.DocsChanged = docsChanged
}

func (r *ResyncManager) SetCollectionStatus(collectionNames []string) {
r.lock.Lock()
defer r.lock.Unlock()

r.ResyncedCollections = collectionNames
}

type ResyncManagerResponse struct {
BackgroundManagerStatus
DocsChanged int `json:"docs_changed"`
DocsProcessed int `json:"docs_processed"`
DocsChanged int `json:"docs_changed"`
DocsProcessed int `json:"docs_processed"`
CollectionsProcessing []string `json:"collections_processing,omitempty"`
}

func (r *ResyncManager) GetProcessStatus(backgroundManagerStatus BackgroundManagerStatus) ([]byte, []byte, error) {
Expand All @@ -123,6 +135,7 @@ func (r *ResyncManager) GetProcessStatus(backgroundManagerStatus BackgroundManag
BackgroundManagerStatus: backgroundManagerStatus,
DocsChanged: r.DocsChanged,
DocsProcessed: r.DocsProcessed,
CollectionsProcessing: r.ResyncedCollections,
}

statusJSON, err := base.JSONMarshal(retStatus)
Expand Down
46 changes: 33 additions & 13 deletions db/background_mgr_resync_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
// =====================================================================

type ResyncManagerDCP struct {
DocsProcessed base.AtomicInt
DocsChanged base.AtomicInt
ResyncID string
VBUUIDs []uint64
useXattrs bool
lock sync.RWMutex
DocsProcessed base.AtomicInt
DocsChanged base.AtomicInt
ResyncID string
VBUUIDs []uint64
useXattrs bool
ResyncedCollections []string
lock sync.RWMutex
}

// ResyncCollections contains map of scope names with collection names against which resync needs to run
Expand Down Expand Up @@ -147,10 +148,12 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
}

// Get collectionIds
collectionIDs, hasAllCollections, err := getCollectionIds(db, resyncCollections)
collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(db, resyncCollections)
if err != nil {
return err
}
// add collection list to manager for use in status call
r.SetCollectionStatus(collectionNames)
if hasAllCollections {
base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against all collections", resyncLoggingID)
} else {
Expand Down Expand Up @@ -242,29 +245,36 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface
return nil
}

func getCollectionIds(db *Database, resyncCollections ResyncCollections) ([]uint32, bool, error) {
func getCollectionIdsAndNames(db *Database, resyncCollections ResyncCollections) ([]uint32, bool, []string, error) {
collectionIDs := make([]uint32, 0)
var hasAllCollections bool
var resyncCollectionNames []string

if len(resyncCollections) == 0 {
hasAllCollections = true
for collectionID := range db.CollectionByID {
collectionIDs = append(collectionIDs, collectionID)
}
for _, collectionNames := range db.CollectionNames {
for collName := range collectionNames {
resyncCollectionNames = append(resyncCollectionNames, collName)
}
}
} else {
hasAllCollections = false

for scopeName, collectionsName := range resyncCollections {
for _, collectionName := range collectionsName {
collection, err := db.GetDatabaseCollection(scopeName, collectionName)
if err != nil {
return nil, hasAllCollections, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
return nil, hasAllCollections, nil, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact())
}
collectionIDs = append(collectionIDs, collection.GetCollectionID())
resyncCollectionNames = append(resyncCollectionNames, collectionName)
}
}
}
return collectionIDs, hasAllCollections, nil
return collectionIDs, hasAllCollections, resyncCollectionNames, nil
}

func (r *ResyncManagerDCP) ResetStatus() {
Expand All @@ -273,6 +283,7 @@ func (r *ResyncManagerDCP) ResetStatus() {

r.DocsProcessed.Set(0)
r.DocsChanged.Set(0)
r.ResyncedCollections = nil
}

func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) {
Expand All @@ -283,11 +294,19 @@ func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) {
r.DocsProcessed.Set(docProcessed)
}

func (r *ResyncManagerDCP) SetCollectionStatus(collectionNames []string) {
r.lock.Lock()
defer r.lock.Unlock()

r.ResyncedCollections = collectionNames
}

type ResyncManagerResponseDCP struct {
BackgroundManagerStatus
ResyncID string `json:"resync_id"`
DocsChanged int64 `json:"docs_changed"`
DocsProcessed int64 `json:"docs_processed"`
ResyncID string `json:"resync_id"`
DocsChanged int64 `json:"docs_changed"`
DocsProcessed int64 `json:"docs_processed"`
CollectionsProcessing []string `json:"collections_processing,omitempty"`
}

func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus) ([]byte, []byte, error) {
Expand All @@ -299,6 +318,7 @@ func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus) ([]b
ResyncID: r.ResyncID,
DocsChanged: r.DocsChanged.Value(),
DocsProcessed: r.DocsProcessed.Value(),
CollectionsProcessing: r.ResyncedCollections,
}

meta := AttachmentManagerMeta{
Expand Down
107 changes: 107 additions & 0 deletions rest/adminapitest/admin_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,113 @@ func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) {
assert.Equal(t, int64(2000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value())
}

func TestDCPResyncCollectionsStatus(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test doesn't works with walrus")
}
base.TestRequiresCollections(t)

testCases := []struct {
name string
collectionNames []string
expectedResult []string
specifyCollection bool
}{
{
name: "collections_specified",
collectionNames: []string{"sg_test_0"},
expectedResult: []string{"sg_test_0"},
specifyCollection: true,
},
{
name: "collections_not_specified",
expectedResult: []string{"sg_test_0", "sg_test_1", "sg_test_2"},
collectionNames: nil,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
rt := rest.NewRestTesterMultipleCollections(t, nil, 3)
defer rt.Close()

_, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP)
require.True(t, ok)

rt.TakeDbOffline()

if !testCase.specifyCollection {
resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", "")
rest.RequireStatus(t, resp, http.StatusOK)
} else {
payload := `{"scopes": {"sg_test_0":["sg_test_0"]}}`
resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", payload)
rest.RequireStatus(t, resp, http.StatusOK)
}

statusResponse := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted)

assert.ElementsMatch(t, statusResponse.CollectionsProcessing, testCase.expectedResult)
})
}
}

func TestQueryResyncCollectionsStatus(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test doesn't works with walrus")
}
base.TestRequiresCollections(t)

testCases := []struct {
name string
collectionNames []string
expectedResult []string
specifyCollection bool
}{
{
name: "collections_specified",
collectionNames: []string{"sg_test_0"},
expectedResult: []string{"sg_test_0"},
specifyCollection: true,
},
{
name: "collections_not_specified",
expectedResult: []string{"sg_test_0", "sg_test_1", "sg_test_2"},
collectionNames: nil,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
rt := rest.NewRestTesterMultipleCollections(t, &rest.RestTesterConfig{
DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{
Unsupported: &db.UnsupportedOptions{
UseQueryBasedResyncManager: true,
},
},
},
}, 3)
defer rt.Close()

_, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager)
require.True(t, ok)

rt.TakeDbOffline()

if !testCase.specifyCollection {
resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", "")
rest.RequireStatus(t, resp, http.StatusOK)
} else {
payload := `{"scopes": {"sg_test_0":["sg_test_0"]}}`
resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", payload)
rest.RequireStatus(t, resp, http.StatusOK)
}

statusResponse := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted)

assert.ElementsMatch(t, statusResponse.CollectionsProcessing, testCase.expectedResult)
})
}
}

func TestResyncQueryBased(t *testing.T) {
base.LongRunningTest(t)

Expand Down
Loading