From ad0e2e3a3eda65fe57a8af284a195fe1a5a6923a Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 22 May 2024 10:17:53 +0100 Subject: [PATCH 1/3] CBG-3843: change to add collection names resync is running against in the status of resync --- db/background_mgr_resync.go | 17 +++-- db/background_mgr_resync_dcp.go | 38 ++++++---- rest/adminapitest/admin_api_test.go | 107 ++++++++++++++++++++++++++++ rest/utilities_testing.go | 15 ++++ 4 files changed, 158 insertions(+), 19 deletions(-) diff --git a/db/background_mgr_resync.go b/db/background_mgr_resync.go index 2e2baf86e1..8f27e921bf 100644 --- a/db/background_mgr_resync.go +++ b/db/background_mgr_resync.go @@ -21,9 +21,10 @@ import ( // ====================================================== type ResyncManager struct { - DocsProcessed int - DocsChanged int - lock sync.Mutex + DocsProcessed int + DocsChanged int + ResyncedCollections []string + lock sync.Mutex } var _ BackgroundManagerProcessI = &ResyncManager{} @@ -64,10 +65,12 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{}, persistClusterStatus() } - collectionIDs, hasAllCollections, err := getCollectionIds(database, resyncCollections) + collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(database, resyncCollections) if err != nil { return err } + // add collection list to manager for use in status call + r.ResyncedCollections = collectionNames if hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "running resync against all collections") } else { @@ -111,8 +114,9 @@ func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) { type ResyncManagerResponse struct { BackgroundManagerStatus - DocsChanged int `json:"docs_changed"` - DocsProcessed int `json:"docs_processed"` + DocsChanged int `json:"docs_changed"` + DocsProcessed int `json:"docs_processed"` + CollectionsProcessing []string `json:"collections_processing,omitempty"` } func (r *ResyncManager) GetProcessStatus(backgroundManagerStatus BackgroundManagerStatus) ([]byte, []byte, error) { @@ -123,6 +127,7 @@ func (r *ResyncManager) GetProcessStatus(backgroundManagerStatus BackgroundManag BackgroundManagerStatus: backgroundManagerStatus, DocsChanged: r.DocsChanged, DocsProcessed: r.DocsProcessed, + CollectionsProcessing: r.ResyncedCollections, } statusJSON, err := base.JSONMarshal(retStatus) diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index ee9001573f..fb4e7fc0fd 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -25,12 +25,13 @@ import ( // ===================================================================== type ResyncManagerDCP struct { - DocsProcessed base.AtomicInt - DocsChanged base.AtomicInt - ResyncID string - VBUUIDs []uint64 - useXattrs bool - lock sync.RWMutex + DocsProcessed base.AtomicInt + DocsChanged base.AtomicInt + ResyncID string + VBUUIDs []uint64 + useXattrs bool + ResyncedCollections []string + lock sync.RWMutex } // ResyncCollections contains map of scope names with collection names against which resync needs to run @@ -147,10 +148,12 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface } // Get collectionIds - collectionIDs, hasAllCollections, err := getCollectionIds(db, resyncCollections) + collectionIDs, hasAllCollections, collectionNames, err := getCollectionIdsAndNames(db, resyncCollections) if err != nil { return err } + // add collection list to manager for use in status call + r.ResyncedCollections = collectionNames if hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against all collections", resyncLoggingID) } else { @@ -242,15 +245,21 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface return nil } -func getCollectionIds(db *Database, resyncCollections ResyncCollections) ([]uint32, bool, error) { +func getCollectionIdsAndNames(db *Database, resyncCollections ResyncCollections) ([]uint32, bool, []string, error) { collectionIDs := make([]uint32, 0) var hasAllCollections bool + var resyncCollectionNames []string if len(resyncCollections) == 0 { hasAllCollections = true for collectionID := range db.CollectionByID { collectionIDs = append(collectionIDs, collectionID) } + for _, collectionNames := range db.CollectionNames { + for collName := range collectionNames { + resyncCollectionNames = append(resyncCollectionNames, collName) + } + } } else { hasAllCollections = false @@ -258,13 +267,14 @@ func getCollectionIds(db *Database, resyncCollections ResyncCollections) ([]uint for _, collectionName := range collectionsName { collection, err := db.GetDatabaseCollection(scopeName, collectionName) if err != nil { - return nil, hasAllCollections, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact()) + return nil, hasAllCollections, nil, fmt.Errorf("failed to find ID for collection %s.%s", base.MD(scopeName).Redact(), base.MD(collectionName).Redact()) } collectionIDs = append(collectionIDs, collection.GetCollectionID()) + resyncCollectionNames = append(resyncCollectionNames, collectionName) } } } - return collectionIDs, hasAllCollections, nil + return collectionIDs, hasAllCollections, resyncCollectionNames, nil } func (r *ResyncManagerDCP) ResetStatus() { @@ -285,9 +295,10 @@ func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) { type ResyncManagerResponseDCP struct { BackgroundManagerStatus - ResyncID string `json:"resync_id"` - DocsChanged int64 `json:"docs_changed"` - DocsProcessed int64 `json:"docs_processed"` + ResyncID string `json:"resync_id"` + DocsChanged int64 `json:"docs_changed"` + DocsProcessed int64 `json:"docs_processed"` + CollectionsProcessing []string `json:"collections_processing,omitempty"` } func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus) ([]byte, []byte, error) { @@ -299,6 +310,7 @@ func (r *ResyncManagerDCP) GetProcessStatus(status BackgroundManagerStatus) ([]b ResyncID: r.ResyncID, DocsChanged: r.DocsChanged.Value(), DocsProcessed: r.DocsProcessed.Value(), + CollectionsProcessing: r.ResyncedCollections, } meta := AttachmentManagerMeta{ diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index 6f63ea8fc6..c70a1cc3bd 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -691,6 +691,113 @@ func TestDBOfflineSingleResyncUsingDCPStream(t *testing.T) { assert.Equal(t, int64(2000), rt.GetDatabase().DbStats.Database().SyncFunctionCount.Value()) } +func TestDCPResyncCollectionsStatus(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("This test doesn't works with walrus") + } + base.TestRequiresCollections(t) + + testCases := []struct { + name string + collectionNames []string + expectedResult []string + specifyCollection bool + }{ + { + name: "collections_specified", + collectionNames: []string{"sg_test_0"}, + expectedResult: []string{"sg_test_0"}, + specifyCollection: true, + }, + { + name: "collections_not_specified", + expectedResult: []string{"sg_test_0", "sg_test_1", "sg_test_2"}, + collectionNames: nil, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + rt := rest.NewRestTesterMultipleCollections(t, nil, 3) + defer rt.Close() + + _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManagerDCP) + require.True(t, ok) + + rt.TakeDbOffline() + + if !testCase.specifyCollection { + resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + rest.RequireStatus(t, resp, http.StatusOK) + } else { + payload := `{"scopes": {"sg_test_0":["sg_test_0"]}}` + resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", payload) + rest.RequireStatus(t, resp, http.StatusOK) + } + + statusResponse := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) + + rest.RequireCollections(t, statusResponse.CollectionsProcessing, testCase.expectedResult) + }) + } +} + +func TestQueryResyncCollectionsStatus(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("This test doesn't works with walrus") + } + base.TestRequiresCollections(t) + + testCases := []struct { + name string + collectionNames []string + expectedResult []string + specifyCollection bool + }{ + { + name: "collections_specified", + collectionNames: []string{"sg_test_0"}, + expectedResult: []string{"sg_test_0"}, + specifyCollection: true, + }, + { + name: "collections_not_specified", + expectedResult: []string{"sg_test_0", "sg_test_1", "sg_test_2"}, + collectionNames: nil, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + rt := rest.NewRestTesterMultipleCollections(t, &rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + Unsupported: &db.UnsupportedOptions{ + UseQueryBasedResyncManager: true, + }, + }, + }, + }, 3) + defer rt.Close() + + _, ok := (rt.GetDatabase().ResyncManager.Process).(*db.ResyncManager) + require.True(t, ok) + + rt.TakeDbOffline() + + if !testCase.specifyCollection { + resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", "") + rest.RequireStatus(t, resp, http.StatusOK) + } else { + payload := `{"scopes": {"sg_test_0":["sg_test_0"]}}` + resp := rt.SendAdminRequest("POST", "/db/_resync?action=start", payload) + rest.RequireStatus(t, resp, http.StatusOK) + } + + statusResponse := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted) + + rest.RequireCollections(t, statusResponse.CollectionsProcessing, testCase.expectedResult) + }) + } +} + func TestResyncQueryBased(t *testing.T) { base.LongRunningTest(t) diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 0cec7f0c3d..ab75b23a85 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2670,3 +2670,18 @@ func RequireGocbDCPResync(t *testing.T) { t.Skip("This test only works against Couchbase Server since rosmar has no support for DCP resync") } } + +// RequireCollections Iterates through two lists of collections and asserts they both contain the same collections +func RequireCollections(t testing.TB, collectionNamesFromStatus, collectionSpecified []string) { + require.Equal(t, len(collectionSpecified), len(collectionNamesFromStatus)) + for _, collName := range collectionNamesFromStatus { + var found bool + for _, name := range collectionSpecified { + if name == collName { + found = true + break + } + } + require.True(t, found) + } +} From d8029c7a6385dac5f75d0022032c3a12f4a70075 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Wed, 22 May 2024 10:52:51 +0100 Subject: [PATCH 2/3] fix for data race --- db/background_mgr_resync.go | 10 +++++++++- db/background_mgr_resync_dcp.go | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/db/background_mgr_resync.go b/db/background_mgr_resync.go index 8f27e921bf..eeadadea68 100644 --- a/db/background_mgr_resync.go +++ b/db/background_mgr_resync.go @@ -70,7 +70,7 @@ func (r *ResyncManager) Run(ctx context.Context, options map[string]interface{}, return err } // add collection list to manager for use in status call - r.ResyncedCollections = collectionNames + r.SetCollectionStatus(collectionNames) if hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "running resync against all collections") } else { @@ -102,6 +102,7 @@ func (r *ResyncManager) ResetStatus() { r.DocsProcessed = 0 r.DocsChanged = 0 + r.ResyncedCollections = nil } func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) { @@ -112,6 +113,13 @@ func (r *ResyncManager) SetStats(docsProcessed, docsChanged int) { r.DocsChanged = docsChanged } +func (r *ResyncManager) SetCollectionStatus(collectionNames []string) { + r.lock.Lock() + defer r.lock.Unlock() + + r.ResyncedCollections = collectionNames +} + type ResyncManagerResponse struct { BackgroundManagerStatus DocsChanged int `json:"docs_changed"` diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index fb4e7fc0fd..7f4bb29c44 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -153,7 +153,7 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]interface return err } // add collection list to manager for use in status call - r.ResyncedCollections = collectionNames + r.SetCollectionStatus(collectionNames) if hasAllCollections { base.InfofCtx(ctx, base.KeyAll, "[%s] running resync against all collections", resyncLoggingID) } else { @@ -283,6 +283,7 @@ func (r *ResyncManagerDCP) ResetStatus() { r.DocsProcessed.Set(0) r.DocsChanged.Set(0) + r.ResyncedCollections = nil } func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) { @@ -293,6 +294,13 @@ func (r *ResyncManagerDCP) SetStatus(docChanged, docProcessed int64) { r.DocsProcessed.Set(docProcessed) } +func (r *ResyncManagerDCP) SetCollectionStatus(collectionNames []string) { + r.lock.Lock() + defer r.lock.Unlock() + + r.ResyncedCollections = collectionNames +} + type ResyncManagerResponseDCP struct { BackgroundManagerStatus ResyncID string `json:"resync_id"` From a36085a1a79a4f680e5cfb58362e6af0f77bca9b Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 24 May 2024 15:16:18 +0100 Subject: [PATCH 3/3] use assert.elements match --- rest/adminapitest/admin_api_test.go | 4 ++-- rest/utilities_testing.go | 15 --------------- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/rest/adminapitest/admin_api_test.go b/rest/adminapitest/admin_api_test.go index c70a1cc3bd..be08e6b9ce 100644 --- a/rest/adminapitest/admin_api_test.go +++ b/rest/adminapitest/admin_api_test.go @@ -736,7 +736,7 @@ func TestDCPResyncCollectionsStatus(t *testing.T) { statusResponse := rt.WaitForResyncDCPStatus(db.BackgroundProcessStateCompleted) - rest.RequireCollections(t, statusResponse.CollectionsProcessing, testCase.expectedResult) + assert.ElementsMatch(t, statusResponse.CollectionsProcessing, testCase.expectedResult) }) } } @@ -793,7 +793,7 @@ func TestQueryResyncCollectionsStatus(t *testing.T) { statusResponse := rt.WaitForResyncStatus(db.BackgroundProcessStateCompleted) - rest.RequireCollections(t, statusResponse.CollectionsProcessing, testCase.expectedResult) + assert.ElementsMatch(t, statusResponse.CollectionsProcessing, testCase.expectedResult) }) } } diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index ab75b23a85..0cec7f0c3d 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2670,18 +2670,3 @@ func RequireGocbDCPResync(t *testing.T) { t.Skip("This test only works against Couchbase Server since rosmar has no support for DCP resync") } } - -// RequireCollections Iterates through two lists of collections and asserts they both contain the same collections -func RequireCollections(t testing.TB, collectionNamesFromStatus, collectionSpecified []string) { - require.Equal(t, len(collectionSpecified), len(collectionNamesFromStatus)) - for _, collName := range collectionNamesFromStatus { - var found bool - for _, name := range collectionSpecified { - if name == collName { - found = true - break - } - } - require.True(t, found) - } -}