From 41703b1725c42be2ea154e07355301b933f75d0b Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 15 Feb 2024 14:11:02 +0000 Subject: [PATCH 1/3] CBG-3741: Throttle concurrent revs per-replication (#6660) * Throttle concurrent revs (push and pull) to 5 for an individual replicating client * Consider proposeChanges as a changes batch for the inFlightChangesThrottle purposes * Make MaxConcurrentChangesBatches and MaxConcurrentRevs configurable * Add test and stats * Add stat descriptions * Fix config bug that the test found * Fix missing stat init for ISGR - split Connected Client rev throttle stat * PR comments for stats - add pull equivalent stat * Remove pull replication rev throttling (single-threaded) * int -> *int * Move replication throttle config to StartupConfig validation * Add test coverage for StartupConfig Validate for MaxConcurrentRevs * remove test logging --- base/stats.go | 14 ++++++++++ base/stats_descriptions.go | 0 db/blip_connected_client.go | 14 ++++++---- db/blip_handler.go | 55 +++++++++++++++++++++++++++---------- db/blip_sync_context.go | 14 ++++++++++ db/blip_sync_stats.go | 9 +++++- db/database.go | 2 ++ rest/blip_api_crud_test.go | 40 +++++++++++++++++++++++++++ rest/config.go | 17 ++++++++++++ rest/config_flags.go | 7 +++-- rest/config_startup.go | 11 ++++++-- rest/config_test.go | 55 +++++++++++++++++++++++++++++++++++++ rest/server_context.go | 2 ++ rest/utilities_testing.go | 2 ++ 14 files changed, 217 insertions(+), 25 deletions(-) create mode 100644 base/stats_descriptions.go diff --git a/base/stats.go b/base/stats.go index 60d64c7e3e..2c5a1b1297 100644 --- a/base/stats.go +++ b/base/stats.go @@ -448,6 +448,10 @@ type CBLReplicationPushStats struct { ProposeChangeTime *SgwIntStat `json:"propose_change_time"` // Total time spent processing writes. Measures complete request-to-response time for a write. WriteProcessingTime *SgwIntStat `json:"write_processing_time"` + // WriteThrottledCount is the cumulative number of writes that were throttled. + WriteThrottledCount *SgwIntStat `json:"write_throttled_count"` + // WriteThrottledTime is the cumulative time spent throttling writes. + WriteThrottledTime *SgwIntStat `json:"write_throttled_time"` } // CollectionStats are stats that are tracked on a per-collection basis. @@ -1317,6 +1321,14 @@ func (d *DbStats) initCBLReplicationPushStats() error { if err != nil { return err } + resUtil.WriteThrottledCount, err = NewIntStat(SubsystemReplicationPush, "write_throttled_count", StatUnitNoUnits, WriteThrottledCountDesc, StatAddedVersion3dot1dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) + if err != nil { + return err + } + resUtil.WriteThrottledTime, err = NewIntStat(SubsystemReplicationPush, "write_throttled_time", StatUnitNanoseconds, WriteThrottledTimeDesc, StatAddedVersion3dot1dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) + if err != nil { + return err + } d.CBLReplicationPushStats = resUtil return nil @@ -1330,6 +1342,8 @@ func (d *DbStats) unregisterCBLReplicationPushStats() { prometheus.Unregister(d.CBLReplicationPushStats.ProposeChangeCount) prometheus.Unregister(d.CBLReplicationPushStats.ProposeChangeTime) prometheus.Unregister(d.CBLReplicationPushStats.WriteProcessingTime) + prometheus.Unregister(d.CBLReplicationPushStats.WriteThrottledCount) + prometheus.Unregister(d.CBLReplicationPushStats.WriteThrottledTime) } func (d *DbStats) CBLReplicationPush() *CBLReplicationPushStats { diff --git a/base/stats_descriptions.go b/base/stats_descriptions.go new file mode 100644 index 0000000000..e69de29bb2 diff --git a/db/blip_connected_client.go b/db/blip_connected_client.go index 49d0171435..0b90f49943 100644 --- a/db/blip_connected_client.go +++ b/db/blip_connected_client.go @@ -71,12 +71,14 @@ func (bh *blipHandler) handleGetRev(rq *blip.Message) error { // Handles a Connected-Client "putRev" request. func (bh *blipHandler) handlePutRev(rq *blip.Message) error { stats := processRevStats{ - count: bh.replicationStats.HandlePutRevCount, - errorCount: bh.replicationStats.HandlePutRevErrorCount, - deltaRecvCount: bh.replicationStats.HandlePutRevDeltaRecvCount, - bytes: bh.replicationStats.HandlePutRevBytes, - processingTime: bh.replicationStats.HandlePutRevProcessingTime, - docsPurgedCount: bh.replicationStats.HandlePutRevDocsPurgedCount, + count: bh.replicationStats.HandlePutRevCount, + errorCount: bh.replicationStats.HandlePutRevErrorCount, + deltaRecvCount: bh.replicationStats.HandlePutRevDeltaRecvCount, + bytes: bh.replicationStats.HandlePutRevBytes, + processingTime: bh.replicationStats.HandlePutRevProcessingTime, + docsPurgedCount: bh.replicationStats.HandlePutRevDocsPurgedCount, + throttledRevs: bh.replicationStats.HandlePutRevThrottledCount, + throttledRevTime: bh.replicationStats.HandlePutRevThrottledTime, } return bh.processRev(rq, &stats) } diff --git a/db/blip_handler.go b/db/blip_handler.go index f429cf5891..0265703ffb 100644 --- a/db/blip_handler.go +++ b/db/blip_handler.go @@ -51,8 +51,13 @@ var kConnectedClientHandlersByProfile = map[string]blipHandlerFunc{ MessageGraphQL: userBlipHandler((*blipHandler).handleGraphQL), } -// maxInFlightChangesBatches is the maximum number of in-flight changes batches a client is allowed to send without being throttled. -const maxInFlightChangesBatches = 2 +// Replication throttling +const ( + // DefaultMaxConcurrentChangesBatches is the maximum number of in-flight changes batches a client is allowed to send concurrently without being throttled. + DefaultMaxConcurrentChangesBatches = 2 + // DefaultMaxConcurrentRevs is the maximum number of in-flight revisions a client is allowed to send or receive concurrently without being throttled. + DefaultMaxConcurrentRevs = 5 +) type blipHandler struct { *BlipSyncContext @@ -759,6 +764,11 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error { // Handles a "proposeChanges" request, similar to "changes" but in no-conflicts mode func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error { + // we don't know whether this batch of changes has completed because they look like unsolicited revs to us, + // but we can stop clients swarming us with these causing CheckProposedRev work + bh.inFlightChangesThrottle <- struct{}{} + defer func() { <-bh.inFlightChangesThrottle }() + includeConflictRev := false if val := rq.Properties[ProposeChangesConflictsIncludeRev]; val != "" { includeConflictRev = val == trueProperty @@ -905,12 +915,14 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error { } type processRevStats struct { - count *base.SgwIntStat // Increments when rev processed successfully - errorCount *base.SgwIntStat - deltaRecvCount *base.SgwIntStat - bytes *base.SgwIntStat - processingTime *base.SgwIntStat - docsPurgedCount *base.SgwIntStat + count *base.SgwIntStat // Increments when rev processed successfully + errorCount *base.SgwIntStat + deltaRecvCount *base.SgwIntStat + bytes *base.SgwIntStat + processingTime *base.SgwIntStat + docsPurgedCount *base.SgwIntStat + throttledRevs *base.SgwIntStat + throttledRevTime *base.SgwIntStat } // Processes a "rev" request, i.e. client is pushing a revision body @@ -926,6 +938,19 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err } }() + // throttle concurrent revs + if cap(bh.inFlightRevsThrottle) > 0 { + select { + case bh.inFlightRevsThrottle <- struct{}{}: + default: + stats.throttledRevs.Add(1) + throttleStart := time.Now() + bh.inFlightRevsThrottle <- struct{}{} + stats.throttledRevTime.Add(time.Since(throttleStart).Nanoseconds()) + } + defer func() { <-bh.inFlightRevsThrottle }() + } + // addRevisionParams := newAddRevisionParams(rq) revMessage := RevMessage{Message: rq} @@ -1195,12 +1220,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err // Handler for when a rev is received from the client func (bh *blipHandler) handleRev(rq *blip.Message) (err error) { stats := processRevStats{ - count: bh.replicationStats.HandleRevCount, - errorCount: bh.replicationStats.HandleRevErrorCount, - deltaRecvCount: bh.replicationStats.HandleRevDeltaRecvCount, - bytes: bh.replicationStats.HandleRevBytes, - processingTime: bh.replicationStats.HandleRevProcessingTime, - docsPurgedCount: bh.replicationStats.HandleRevDocsPurgedCount, + count: bh.replicationStats.HandleRevCount, + errorCount: bh.replicationStats.HandleRevErrorCount, + deltaRecvCount: bh.replicationStats.HandleRevDeltaRecvCount, + bytes: bh.replicationStats.HandleRevBytes, + processingTime: bh.replicationStats.HandleRevProcessingTime, + docsPurgedCount: bh.replicationStats.HandleRevDocsPurgedCount, + throttledRevs: bh.replicationStats.HandleRevThrottledCount, + throttledRevTime: bh.replicationStats.HandleRevThrottledTime, } return bh.processRev(rq, &stats) } diff --git a/db/blip_sync_context.go b/db/blip_sync_context.go index ea2a62c9fb..a18a5a62a1 100644 --- a/db/blip_sync_context.go +++ b/db/blip_sync_context.go @@ -34,6 +34,15 @@ const ( var ErrClosedBLIPSender = errors.New("use of closed BLIP sender") func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext { + maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches + if db.Options.MaxConcurrentChangesBatches != nil { + maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches + } + maxInFlightRevs := DefaultMaxConcurrentRevs + if db.Options.MaxConcurrentRevs != nil { + maxInFlightRevs = *db.Options.MaxConcurrentRevs + } + bsc := &BlipSyncContext{ blipContext: bc, blipContextDb: db, @@ -43,6 +52,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con sgCanUseDeltas: db.DeltaSyncEnabled(), replicationStats: replicationStats, inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches), + inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs), collections: &blipCollections{}, } if bsc.replicationStats == nil { @@ -106,6 +116,10 @@ type BlipSyncContext struct { // before they've processed the revs for previous batches. Keeping this >1 allows the client to be fed a constant supply of rev messages, // without making Sync Gateway buffer a bunch of stuff in memory too far in advance of the client being able to receive the revs. inFlightChangesThrottle chan struct{} + // inFlightRevsThrottle is a small buffered channel to limit the amount of in-flight revs for this connection. + // Couchbase Lite limits this on the client side with changes batch size (but is usually hard-coded to 200) + // This is defensive measure to ensure a single client cannot use too much memory when replicating, and forces each changes batch to have a reduced amount of parallelism. + inFlightRevsThrottle chan struct{} // fatalErrorCallback is called by the replicator code when the replicator using this blipSyncContext should be // stopped diff --git a/db/blip_sync_stats.go b/db/blip_sync_stats.go index 6286eae453..a40c0dda30 100644 --- a/db/blip_sync_stats.go +++ b/db/blip_sync_stats.go @@ -24,6 +24,8 @@ type BlipSyncStats struct { HandleRevBytes *base.SgwIntStat HandleRevProcessingTime *base.SgwIntStat HandleRevDocsPurgedCount *base.SgwIntStat + HandleRevThrottledCount *base.SgwIntStat + HandleRevThrottledTime *base.SgwIntStat HandleGetRevCount *base.SgwIntStat // Connected Client API HandlePutRevCount *base.SgwIntStat // Connected Client API HandlePutRevErrorCount *base.SgwIntStat // Connected Client API @@ -31,6 +33,8 @@ type BlipSyncStats struct { HandlePutRevBytes *base.SgwIntStat // Connected Client API HandlePutRevProcessingTime *base.SgwIntStat // Connected Client API HandlePutRevDocsPurgedCount *base.SgwIntStat // Connected Client API + HandlePutRevThrottledCount *base.SgwIntStat // Connected Client API + HandlePutRevThrottledTime *base.SgwIntStat // Connected Client API SendRevCount *base.SgwIntStat // sendRev SendRevDeltaRequestedCount *base.SgwIntStat SendRevDeltaSentCount *base.SgwIntStat @@ -72,6 +76,8 @@ func NewBlipSyncStats() *BlipSyncStats { HandleRevBytes: &base.SgwIntStat{}, HandleRevProcessingTime: &base.SgwIntStat{}, HandleRevDocsPurgedCount: &base.SgwIntStat{}, + HandleRevThrottledCount: &base.SgwIntStat{}, + HandleRevThrottledTime: &base.SgwIntStat{}, HandleGetRevCount: &base.SgwIntStat{}, HandlePutRevCount: &base.SgwIntStat{}, HandlePutRevErrorCount: &base.SgwIntStat{}, @@ -133,9 +139,10 @@ func BlipSyncStatsForCBL(dbStats *base.DbStats) *BlipSyncStats { blipStats.HandleRevBytes = dbStats.Database().DocWritesBytesBlip blipStats.HandleRevProcessingTime = dbStats.CBLReplicationPush().WriteProcessingTime - blipStats.HandleRevCount = dbStats.CBLReplicationPush().DocPushCount blipStats.HandleRevErrorCount = dbStats.CBLReplicationPush().DocPushErrorCount + blipStats.HandleRevThrottledCount = dbStats.CBLReplicationPush().WriteThrottledCount + blipStats.HandleRevThrottledTime = dbStats.CBLReplicationPush().WriteThrottledTime blipStats.HandleGetAttachment = dbStats.CBLReplicationPull().AttachmentPullCount blipStats.HandleGetAttachmentBytes = dbStats.CBLReplicationPull().AttachmentPullBytes diff --git a/db/database.go b/db/database.go index b1aa797a4a..c4c4f3cdff 100644 --- a/db/database.go +++ b/db/database.go @@ -177,6 +177,8 @@ type DatabaseContextOptions struct { ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds ConfigPrincipals *ConfigPrincipals LoggingConfig DbLogConfig // Per-database log configuration + MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication + MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication } // DbLogConfig can be used to customise the logging for logs associated with this database. diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index b44ea8002d..54eb031231 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1195,6 +1195,46 @@ func TestBlipSendAndGetRev(t *testing.T) { assert.True(t, deletedValue) } +// Sends many revisions concurrently and ensures that SG limits the processing on the server-side with MaxConcurrentRevs +func TestBlipSendConcurrentRevs(t *testing.T) { + + const ( + maxConcurrentRevs = 10 + concurrentSendRevNum = 50 + ) + rt := NewRestTester(t, &RestTesterConfig{maxConcurrentRevs: base.IntPtr(maxConcurrentRevs)}) + defer rt.Close() + btSpec := BlipTesterSpec{ + connectingUsername: "user1", + connectingPassword: "1234", + } + bt, err := NewBlipTesterFromSpecWithRT(t, &btSpec, rt) + require.NoError(t, err, "Unexpected error creating BlipTester") + defer bt.Close() + + wg := sync.WaitGroup{} + wg.Add(concurrentSendRevNum) + for i := 0; i < concurrentSendRevNum; i++ { + docID := fmt.Sprintf("%s-%d", t.Name(), i) + go func() { + defer wg.Done() + _, _, _, err := bt.SendRev(docID, "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{}) + require.NoError(t, err) + }() + } + + require.NoError(t, WaitWithTimeout(&wg, time.Second*30)) + + throttleCount := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledCount.Value() + throttleTime := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledTime.Value() + throttleDuration := time.Duration(throttleTime) * time.Nanosecond + + assert.Greater(t, throttleCount, int64(0), "Expected throttled revs") + assert.Greater(t, throttleTime, int64(0), "Expected non-zero throttled revs time") + + t.Logf("Throttled revs: %d, Throttled duration: %s", throttleCount, throttleDuration) +} + // Test send and retrieval of a doc with a large numeric value. Ensure proper large number handling. // // Validate deleted handling (includes check for https://github.com/couchbase/sync_gateway/issues/3341) diff --git a/rest/config.go b/rest/config.go index a247737f6f..f115c256f6 100644 --- a/rest/config.go +++ b/rest/config.go @@ -1318,6 +1318,23 @@ func (sc *StartupConfig) Validate(ctx context.Context, isEnterpriseEdition bool) } } + const ( + minConcurrentChangesBatches = 1 + maxConcurrentChangesBatches = 5 + minConcurrentRevs = 5 + maxConcurrentRevs = 200 + ) + if val := sc.Replicator.MaxConcurrentChangesBatches; val != nil { + if *val < minConcurrentChangesBatches || *val > maxConcurrentChangesBatches { + multiError = multiError.Append(fmt.Errorf("max_concurrent_changes_batches: %d outside allowed range: %d-%d", *val, minConcurrentChangesBatches, maxConcurrentChangesBatches)) + } + } + if val := sc.Replicator.MaxConcurrentRevs; val != nil { + if *val < minConcurrentRevs || *val > maxConcurrentRevs { + multiError = multiError.Append(fmt.Errorf("max_concurrent_revs: %d outside allowed range: %d-%d", *val, minConcurrentRevs, maxConcurrentRevs)) + } + } + return multiError.ErrorOrNil() } diff --git a/rest/config_flags.go b/rest/config_flags.go index 088b2d41a8..4820013b0e 100644 --- a/rest/config_flags.go +++ b/rest/config_flags.go @@ -122,8 +122,11 @@ func registerConfigFlags(config *StartupConfig, fs *flag.FlagSet) map[string]con "auth.bcrypt_cost": {&config.Auth.BcryptCost, fs.Int("auth.bcrypt_cost", 0, "Cost to use for bcrypt password hashes")}, - "replicator.max_heartbeat": {&config.Replicator.MaxHeartbeat, fs.String("replicator.max_heartbeat", "", "Max heartbeat value for _changes request")}, - "replicator.blip_compression": {&config.Replicator.BLIPCompression, fs.Int("replicator.blip_compression", 0, "BLIP data compression level (0-9)")}, + "replicator.max_heartbeat": {&config.Replicator.MaxHeartbeat, fs.String("replicator.max_heartbeat", "", "Max heartbeat value for _changes request")}, + "replicator.blip_compression": {&config.Replicator.BLIPCompression, fs.Int("replicator.blip_compression", 0, "BLIP data compression level (0-9)")}, + "replicator.max_concurrent_replications": {&config.Replicator.MaxConcurrentReplications, fs.Int("replicator.max_concurrent_replications", 0, "Maximum number of replication connections to the node")}, + "replicator.max_concurrent_changes_batches": {&config.Replicator.MaxConcurrentChangesBatches, fs.Int("replicator.max_concurrent_changes_batches", 0, "Maximum number of changes batches to process concurrently per replication")}, + "replicator.max_concurrent_revs": {&config.Replicator.MaxConcurrentRevs, fs.Int("replicator.max_concurrent_revs", 0, "Maximum number of revs to process concurrently per replication")}, "unsupported.stats_log_frequency": {&config.Unsupported.StatsLogFrequency, fs.String("unsupported.stats_log_frequency", "", "How often should stats be written to stats logs")}, "unsupported.use_stdlib_json": {&config.Unsupported.UseStdlibJSON, fs.Bool("unsupported.use_stdlib_json", false, "Bypass the jsoniter package and use Go's stdlib instead")}, diff --git a/rest/config_startup.go b/rest/config_startup.go index afb1680ff5..7e01ca739e 100644 --- a/rest/config_startup.go +++ b/rest/config_startup.go @@ -17,6 +17,7 @@ import ( "github.com/couchbase/go-couchbase" "github.com/couchbase/sync_gateway/auth" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" ) const ( @@ -58,6 +59,10 @@ func DefaultStartupConfig(defaultLogFilePath string) StartupConfig { Auth: AuthConfig{ BcryptCost: auth.DefaultBcryptCost, }, + Replicator: ReplicatorConfig{ + MaxConcurrentChangesBatches: base.IntPtr(db.DefaultMaxConcurrentChangesBatches), + MaxConcurrentRevs: base.IntPtr(db.DefaultMaxConcurrentRevs), + }, Unsupported: UnsupportedConfig{ StatsLogFrequency: base.NewConfigDuration(time.Minute), Serverless: ServerlessConfig{ @@ -137,8 +142,10 @@ type AuthConfig struct { } type ReplicatorConfig struct { - MaxHeartbeat *base.ConfigDuration `json:"max_heartbeat,omitempty" help:"Max heartbeat value for _changes request"` - BLIPCompression *int `json:"blip_compression,omitempty" help:"BLIP data compression level (0-9)"` + MaxHeartbeat *base.ConfigDuration `json:"max_heartbeat,omitempty" help:"Max heartbeat value for _changes request"` + BLIPCompression *int `json:"blip_compression,omitempty" help:"BLIP data compression level (0-9)"` + MaxConcurrentChangesBatches *int `json:"max_concurrent_changes_batches,omitempty" help:"Maximum number of changes batches to process concurrently per replication (1-5)"` + MaxConcurrentRevs *int `json:"max_concurrent_revs,omitempty" help:"Maximum number of revs to process concurrently per replication (5-200)"` } type UnsupportedConfig struct { diff --git a/rest/config_test.go b/rest/config_test.go index 1fa6544ddb..78a8765cef 100644 --- a/rest/config_test.go +++ b/rest/config_test.go @@ -26,6 +26,7 @@ import ( "net/http/httptest" "os" "path/filepath" + "regexp" "runtime" "strings" "testing" @@ -2434,6 +2435,60 @@ func TestStartupConfigBcryptCostValidation(t *testing.T) { } } +func TestStartupConfigReplicationThrottleValidation(t *testing.T) { + errContains, err := regexp.Compile(`max_concurrent_revs: \d+ outside allowed range`) + require.NoErrorf(t, err, "regexp.Compile failed for test assertion") + + testCases := []struct { + name string + revsThrottle *int + expectError bool + }{ + { + name: "valid-nil", + revsThrottle: nil, + expectError: false, + }, + { + name: "invalid-low", + revsThrottle: base.IntPtr(4), + expectError: true, + }, + { + name: "Valid-5", + revsThrottle: base.IntPtr(5), + expectError: false, + }, + { + name: "Valid-20", + revsThrottle: base.IntPtr(20), + expectError: false, + }, + { + name: "Valid-200", + revsThrottle: base.IntPtr(200), + expectError: false, + }, + { + name: "invalid-high", + revsThrottle: base.IntPtr(201), + expectError: true, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + sc := StartupConfig{Replicator: ReplicatorConfig{MaxConcurrentRevs: test.revsThrottle}} + err := sc.Validate(base.TestCtx(t), base.IsEnterpriseEdition()) + if test.expectError { + require.Error(t, err) + assert.Regexp(t, errContains, err.Error()) + } else if err != nil { + assert.NotContains(t, err.Error(), errContains) + } + }) + } +} + func Test_validateJavascriptFunction(t *testing.T) { tests := []struct { name string diff --git a/rest/server_context.go b/rest/server_context.go index 83c0c22832..c7f2e7f9ee 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -1243,6 +1243,8 @@ func dbcOptionsFromConfig(ctx context.Context, sc *ServerContext, config *DbConf // UserQueries: config.UserQueries, // behind feature flag (see below) // UserFunctions: config.UserFunctions, // behind feature flag (see below) // GraphQL: config.GraphQL, // behind feature flag (see below) + MaxConcurrentChangesBatches: sc.Config.Replicator.MaxConcurrentChangesBatches, + MaxConcurrentRevs: sc.Config.Replicator.MaxConcurrentRevs, } // Per-database console logging config overrides diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 221759629d..358de1673f 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -72,6 +72,7 @@ type RestTesterConfig struct { numCollections int syncGatewayVersion *base.ComparableVersion // alternate version of Sync Gateway to use on startup allowDbConfigEnvVars *bool + maxConcurrentRevs *int } type collectionConfiguration uint8 @@ -218,6 +219,7 @@ func (rt *RestTester) Bucket() base.Bucket { sc.Bootstrap.ServerTLSSkipVerify = base.BoolPtr(base.TestTLSSkipVerify()) sc.Unsupported.Serverless.Enabled = &rt.serverless sc.Unsupported.AllowDbConfigEnvVars = rt.RestTesterConfig.allowDbConfigEnvVars + sc.Replicator.MaxConcurrentRevs = rt.RestTesterConfig.maxConcurrentRevs if rt.serverless { if !rt.PersistentConfig { rt.TB.Fatalf("Persistent config must be used when running in serverless mode") From cd3642d998cfb4e9df51ca186f83cea8b4022c42 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 16 Feb 2024 15:26:23 +0000 Subject: [PATCH 2/3] fix issues making build fail --- base/stats.go | 4 ++-- base/stats_descriptions.go | 0 rest/config_flags.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) delete mode 100644 base/stats_descriptions.go diff --git a/base/stats.go b/base/stats.go index 2c5a1b1297..d75d6406f2 100644 --- a/base/stats.go +++ b/base/stats.go @@ -1321,11 +1321,11 @@ func (d *DbStats) initCBLReplicationPushStats() error { if err != nil { return err } - resUtil.WriteThrottledCount, err = NewIntStat(SubsystemReplicationPush, "write_throttled_count", StatUnitNoUnits, WriteThrottledCountDesc, StatAddedVersion3dot1dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) + resUtil.WriteThrottledCount, err = NewIntStat(SubsystemReplicationPush, "write_throttled_count", labelKeys, labelVals, prometheus.CounterValue, 0) if err != nil { return err } - resUtil.WriteThrottledTime, err = NewIntStat(SubsystemReplicationPush, "write_throttled_time", StatUnitNanoseconds, WriteThrottledTimeDesc, StatAddedVersion3dot1dot4, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.CounterValue, 0) + resUtil.WriteThrottledTime, err = NewIntStat(SubsystemReplicationPush, "write_throttled_time", labelKeys, labelVals, prometheus.CounterValue, 0) if err != nil { return err } diff --git a/base/stats_descriptions.go b/base/stats_descriptions.go deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/rest/config_flags.go b/rest/config_flags.go index 4820013b0e..ea156fa0c6 100644 --- a/rest/config_flags.go +++ b/rest/config_flags.go @@ -124,7 +124,6 @@ func registerConfigFlags(config *StartupConfig, fs *flag.FlagSet) map[string]con "replicator.max_heartbeat": {&config.Replicator.MaxHeartbeat, fs.String("replicator.max_heartbeat", "", "Max heartbeat value for _changes request")}, "replicator.blip_compression": {&config.Replicator.BLIPCompression, fs.Int("replicator.blip_compression", 0, "BLIP data compression level (0-9)")}, - "replicator.max_concurrent_replications": {&config.Replicator.MaxConcurrentReplications, fs.Int("replicator.max_concurrent_replications", 0, "Maximum number of replication connections to the node")}, "replicator.max_concurrent_changes_batches": {&config.Replicator.MaxConcurrentChangesBatches, fs.Int("replicator.max_concurrent_changes_batches", 0, "Maximum number of changes batches to process concurrently per replication")}, "replicator.max_concurrent_revs": {&config.Replicator.MaxConcurrentRevs, fs.Int("replicator.max_concurrent_revs", 0, "Maximum number of revs to process concurrently per replication")}, From 8f1ed54ba444208a519439149f7b9f8f0c1f57c3 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith Date: Fri, 16 Feb 2024 15:55:44 +0000 Subject: [PATCH 3/3] updates to slow test down --- db/database.go | 4 ++-- rest/blip_api_crud_test.go | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/db/database.go b/db/database.go index c4c4f3cdff..bb7eb537b1 100644 --- a/db/database.go +++ b/db/database.go @@ -177,8 +177,8 @@ type DatabaseContextOptions struct { ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds ConfigPrincipals *ConfigPrincipals LoggingConfig DbLogConfig // Per-database log configuration - MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication - MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication + MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication + MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication } // DbLogConfig can be used to customise the logging for logs associated with this database. diff --git a/rest/blip_api_crud_test.go b/rest/blip_api_crud_test.go index 54eb031231..c8650623a8 100644 --- a/rest/blip_api_crud_test.go +++ b/rest/blip_api_crud_test.go @@ -1202,7 +1202,14 @@ func TestBlipSendConcurrentRevs(t *testing.T) { maxConcurrentRevs = 10 concurrentSendRevNum = 50 ) - rt := NewRestTester(t, &RestTesterConfig{maxConcurrentRevs: base.IntPtr(maxConcurrentRevs)}) + rt := NewRestTester(t, &RestTesterConfig{ + leakyBucketConfig: &base.LeakyBucketConfig{ + UpdateCallback: func(_ string) { + time.Sleep(time.Millisecond * 5) // slow down Walrus - it's too quick to be throttled + }, + }, + maxConcurrentRevs: base.IntPtr(maxConcurrentRevs), + }) defer rt.Close() btSpec := BlipTesterSpec{ connectingUsername: "user1",