Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3776: [3.1.4 Backport] Configurable revs parallelism limit #6692

Merged
merged 3 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,10 @@ type CBLReplicationPushStats struct {
ProposeChangeTime *SgwIntStat `json:"propose_change_time"`
// Total time spent processing writes. Measures complete request-to-response time for a write.
WriteProcessingTime *SgwIntStat `json:"write_processing_time"`
// WriteThrottledCount is the cumulative number of writes that were throttled.
WriteThrottledCount *SgwIntStat `json:"write_throttled_count"`
// WriteThrottledTime is the cumulative time spent throttling writes.
WriteThrottledTime *SgwIntStat `json:"write_throttled_time"`
}

// CollectionStats are stats that are tracked on a per-collection basis.
Expand Down Expand Up @@ -1317,6 +1321,14 @@ func (d *DbStats) initCBLReplicationPushStats() error {
if err != nil {
return err
}
resUtil.WriteThrottledCount, err = NewIntStat(SubsystemReplicationPush, "write_throttled_count", labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}
resUtil.WriteThrottledTime, err = NewIntStat(SubsystemReplicationPush, "write_throttled_time", labelKeys, labelVals, prometheus.CounterValue, 0)
if err != nil {
return err
}

d.CBLReplicationPushStats = resUtil
return nil
Expand All @@ -1330,6 +1342,8 @@ func (d *DbStats) unregisterCBLReplicationPushStats() {
prometheus.Unregister(d.CBLReplicationPushStats.ProposeChangeCount)
prometheus.Unregister(d.CBLReplicationPushStats.ProposeChangeTime)
prometheus.Unregister(d.CBLReplicationPushStats.WriteProcessingTime)
prometheus.Unregister(d.CBLReplicationPushStats.WriteThrottledCount)
prometheus.Unregister(d.CBLReplicationPushStats.WriteThrottledTime)
}

func (d *DbStats) CBLReplicationPush() *CBLReplicationPushStats {
Expand Down
14 changes: 8 additions & 6 deletions db/blip_connected_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ func (bh *blipHandler) handleGetRev(rq *blip.Message) error {
// Handles a Connected-Client "putRev" request.
func (bh *blipHandler) handlePutRev(rq *blip.Message) error {
stats := processRevStats{
count: bh.replicationStats.HandlePutRevCount,
errorCount: bh.replicationStats.HandlePutRevErrorCount,
deltaRecvCount: bh.replicationStats.HandlePutRevDeltaRecvCount,
bytes: bh.replicationStats.HandlePutRevBytes,
processingTime: bh.replicationStats.HandlePutRevProcessingTime,
docsPurgedCount: bh.replicationStats.HandlePutRevDocsPurgedCount,
count: bh.replicationStats.HandlePutRevCount,
errorCount: bh.replicationStats.HandlePutRevErrorCount,
deltaRecvCount: bh.replicationStats.HandlePutRevDeltaRecvCount,
bytes: bh.replicationStats.HandlePutRevBytes,
processingTime: bh.replicationStats.HandlePutRevProcessingTime,
docsPurgedCount: bh.replicationStats.HandlePutRevDocsPurgedCount,
throttledRevs: bh.replicationStats.HandlePutRevThrottledCount,
throttledRevTime: bh.replicationStats.HandlePutRevThrottledTime,
}
return bh.processRev(rq, &stats)
}
Expand Down
55 changes: 41 additions & 14 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ var kConnectedClientHandlersByProfile = map[string]blipHandlerFunc{
MessageGraphQL: userBlipHandler((*blipHandler).handleGraphQL),
}

// maxInFlightChangesBatches is the maximum number of in-flight changes batches a client is allowed to send without being throttled.
const maxInFlightChangesBatches = 2
// Replication throttling
const (
// DefaultMaxConcurrentChangesBatches is the maximum number of in-flight changes batches a client is allowed to send concurrently without being throttled.
DefaultMaxConcurrentChangesBatches = 2
// DefaultMaxConcurrentRevs is the maximum number of in-flight revisions a client is allowed to send or receive concurrently without being throttled.
DefaultMaxConcurrentRevs = 5
)

type blipHandler struct {
*BlipSyncContext
Expand Down Expand Up @@ -759,6 +764,11 @@ func (bh *blipHandler) handleChanges(rq *blip.Message) error {
// Handles a "proposeChanges" request, similar to "changes" but in no-conflicts mode
func (bh *blipHandler) handleProposeChanges(rq *blip.Message) error {

// we don't know whether this batch of changes has completed because they look like unsolicited revs to us,
// but we can stop clients swarming us with these causing CheckProposedRev work
bh.inFlightChangesThrottle <- struct{}{}
defer func() { <-bh.inFlightChangesThrottle }()

includeConflictRev := false
if val := rq.Properties[ProposeChangesConflictsIncludeRev]; val != "" {
includeConflictRev = val == trueProperty
Expand Down Expand Up @@ -905,12 +915,14 @@ func (bh *blipHandler) handleNoRev(rq *blip.Message) error {
}

type processRevStats struct {
count *base.SgwIntStat // Increments when rev processed successfully
errorCount *base.SgwIntStat
deltaRecvCount *base.SgwIntStat
bytes *base.SgwIntStat
processingTime *base.SgwIntStat
docsPurgedCount *base.SgwIntStat
count *base.SgwIntStat // Increments when rev processed successfully
errorCount *base.SgwIntStat
deltaRecvCount *base.SgwIntStat
bytes *base.SgwIntStat
processingTime *base.SgwIntStat
docsPurgedCount *base.SgwIntStat
throttledRevs *base.SgwIntStat
throttledRevTime *base.SgwIntStat
}

// Processes a "rev" request, i.e. client is pushing a revision body
Expand All @@ -926,6 +938,19 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
}
}()

// throttle concurrent revs
if cap(bh.inFlightRevsThrottle) > 0 {
select {
case bh.inFlightRevsThrottle <- struct{}{}:
default:
stats.throttledRevs.Add(1)
throttleStart := time.Now()
bh.inFlightRevsThrottle <- struct{}{}
stats.throttledRevTime.Add(time.Since(throttleStart).Nanoseconds())
}
defer func() { <-bh.inFlightRevsThrottle }()
}

// addRevisionParams := newAddRevisionParams(rq)
revMessage := RevMessage{Message: rq}

Expand Down Expand Up @@ -1195,12 +1220,14 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// Handler for when a rev is received from the client
func (bh *blipHandler) handleRev(rq *blip.Message) (err error) {
stats := processRevStats{
count: bh.replicationStats.HandleRevCount,
errorCount: bh.replicationStats.HandleRevErrorCount,
deltaRecvCount: bh.replicationStats.HandleRevDeltaRecvCount,
bytes: bh.replicationStats.HandleRevBytes,
processingTime: bh.replicationStats.HandleRevProcessingTime,
docsPurgedCount: bh.replicationStats.HandleRevDocsPurgedCount,
count: bh.replicationStats.HandleRevCount,
errorCount: bh.replicationStats.HandleRevErrorCount,
deltaRecvCount: bh.replicationStats.HandleRevDeltaRecvCount,
bytes: bh.replicationStats.HandleRevBytes,
processingTime: bh.replicationStats.HandleRevProcessingTime,
docsPurgedCount: bh.replicationStats.HandleRevDocsPurgedCount,
throttledRevs: bh.replicationStats.HandleRevThrottledCount,
throttledRevTime: bh.replicationStats.HandleRevThrottledTime,
}
return bh.processRev(rq, &stats)
}
Expand Down
14 changes: 14 additions & 0 deletions db/blip_sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ const (
var ErrClosedBLIPSender = errors.New("use of closed BLIP sender")

func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, contextID string, replicationStats *BlipSyncStats) *BlipSyncContext {
maxInFlightChangesBatches := DefaultMaxConcurrentChangesBatches
if db.Options.MaxConcurrentChangesBatches != nil {
maxInFlightChangesBatches = *db.Options.MaxConcurrentChangesBatches
}
maxInFlightRevs := DefaultMaxConcurrentRevs
if db.Options.MaxConcurrentRevs != nil {
maxInFlightRevs = *db.Options.MaxConcurrentRevs
}

bsc := &BlipSyncContext{
blipContext: bc,
blipContextDb: db,
Expand All @@ -43,6 +52,7 @@ func NewBlipSyncContext(ctx context.Context, bc *blip.Context, db *Database, con
sgCanUseDeltas: db.DeltaSyncEnabled(),
replicationStats: replicationStats,
inFlightChangesThrottle: make(chan struct{}, maxInFlightChangesBatches),
inFlightRevsThrottle: make(chan struct{}, maxInFlightRevs),
collections: &blipCollections{},
}
if bsc.replicationStats == nil {
Expand Down Expand Up @@ -106,6 +116,10 @@ type BlipSyncContext struct {
// before they've processed the revs for previous batches. Keeping this >1 allows the client to be fed a constant supply of rev messages,
// without making Sync Gateway buffer a bunch of stuff in memory too far in advance of the client being able to receive the revs.
inFlightChangesThrottle chan struct{}
// inFlightRevsThrottle is a small buffered channel to limit the amount of in-flight revs for this connection.
// Couchbase Lite limits this on the client side with changes batch size (but is usually hard-coded to 200)
// This is defensive measure to ensure a single client cannot use too much memory when replicating, and forces each changes batch to have a reduced amount of parallelism.
inFlightRevsThrottle chan struct{}

// fatalErrorCallback is called by the replicator code when the replicator using this blipSyncContext should be
// stopped
Expand Down
9 changes: 8 additions & 1 deletion db/blip_sync_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ type BlipSyncStats struct {
HandleRevBytes *base.SgwIntStat
HandleRevProcessingTime *base.SgwIntStat
HandleRevDocsPurgedCount *base.SgwIntStat
HandleRevThrottledCount *base.SgwIntStat
HandleRevThrottledTime *base.SgwIntStat
HandleGetRevCount *base.SgwIntStat // Connected Client API
HandlePutRevCount *base.SgwIntStat // Connected Client API
HandlePutRevErrorCount *base.SgwIntStat // Connected Client API
HandlePutRevDeltaRecvCount *base.SgwIntStat // Connected Client API
HandlePutRevBytes *base.SgwIntStat // Connected Client API
HandlePutRevProcessingTime *base.SgwIntStat // Connected Client API
HandlePutRevDocsPurgedCount *base.SgwIntStat // Connected Client API
HandlePutRevThrottledCount *base.SgwIntStat // Connected Client API
HandlePutRevThrottledTime *base.SgwIntStat // Connected Client API
SendRevCount *base.SgwIntStat // sendRev
SendRevDeltaRequestedCount *base.SgwIntStat
SendRevDeltaSentCount *base.SgwIntStat
Expand Down Expand Up @@ -72,6 +76,8 @@ func NewBlipSyncStats() *BlipSyncStats {
HandleRevBytes: &base.SgwIntStat{},
HandleRevProcessingTime: &base.SgwIntStat{},
HandleRevDocsPurgedCount: &base.SgwIntStat{},
HandleRevThrottledCount: &base.SgwIntStat{},
HandleRevThrottledTime: &base.SgwIntStat{},
HandleGetRevCount: &base.SgwIntStat{},
HandlePutRevCount: &base.SgwIntStat{},
HandlePutRevErrorCount: &base.SgwIntStat{},
Expand Down Expand Up @@ -133,9 +139,10 @@ func BlipSyncStatsForCBL(dbStats *base.DbStats) *BlipSyncStats {

blipStats.HandleRevBytes = dbStats.Database().DocWritesBytesBlip
blipStats.HandleRevProcessingTime = dbStats.CBLReplicationPush().WriteProcessingTime

blipStats.HandleRevCount = dbStats.CBLReplicationPush().DocPushCount
blipStats.HandleRevErrorCount = dbStats.CBLReplicationPush().DocPushErrorCount
blipStats.HandleRevThrottledCount = dbStats.CBLReplicationPush().WriteThrottledCount
blipStats.HandleRevThrottledTime = dbStats.CBLReplicationPush().WriteThrottledTime

blipStats.HandleGetAttachment = dbStats.CBLReplicationPull().AttachmentPullCount
blipStats.HandleGetAttachmentBytes = dbStats.CBLReplicationPull().AttachmentPullBytes
Expand Down
2 changes: 2 additions & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ type DatabaseContextOptions struct {
ChangesRequestPlus bool // Sets the default value for request_plus, for non-continuous changes feeds
ConfigPrincipals *ConfigPrincipals
LoggingConfig DbLogConfig // Per-database log configuration
MaxConcurrentChangesBatches *int // Maximum number of changes batches to process concurrently per replication
MaxConcurrentRevs *int // Maximum number of revs to process concurrently per replication
}

// DbLogConfig can be used to customise the logging for logs associated with this database.
Expand Down
47 changes: 47 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,53 @@ func TestBlipSendAndGetRev(t *testing.T) {
assert.True(t, deletedValue)
}

// Sends many revisions concurrently and ensures that SG limits the processing on the server-side with MaxConcurrentRevs
func TestBlipSendConcurrentRevs(t *testing.T) {

const (
maxConcurrentRevs = 10
concurrentSendRevNum = 50
)
rt := NewRestTester(t, &RestTesterConfig{
leakyBucketConfig: &base.LeakyBucketConfig{
UpdateCallback: func(_ string) {
time.Sleep(time.Millisecond * 5) // slow down Walrus - it's too quick to be throttled
},
},
maxConcurrentRevs: base.IntPtr(maxConcurrentRevs),
})
defer rt.Close()
btSpec := BlipTesterSpec{
connectingUsername: "user1",
connectingPassword: "1234",
}
bt, err := NewBlipTesterFromSpecWithRT(t, &btSpec, rt)
require.NoError(t, err, "Unexpected error creating BlipTester")
defer bt.Close()

wg := sync.WaitGroup{}
wg.Add(concurrentSendRevNum)
for i := 0; i < concurrentSendRevNum; i++ {
docID := fmt.Sprintf("%s-%d", t.Name(), i)
go func() {
defer wg.Done()
_, _, _, err := bt.SendRev(docID, "1-abc", []byte(`{"key": "val", "channels": ["user1"]}`), blip.Properties{})
require.NoError(t, err)
}()
}

require.NoError(t, WaitWithTimeout(&wg, time.Second*30))

throttleCount := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledCount.Value()
throttleTime := rt.GetDatabase().DbStats.CBLReplicationPush().WriteThrottledTime.Value()
throttleDuration := time.Duration(throttleTime) * time.Nanosecond

assert.Greater(t, throttleCount, int64(0), "Expected throttled revs")
assert.Greater(t, throttleTime, int64(0), "Expected non-zero throttled revs time")

t.Logf("Throttled revs: %d, Throttled duration: %s", throttleCount, throttleDuration)
}

// Test send and retrieval of a doc with a large numeric value. Ensure proper large number handling.
//
// Validate deleted handling (includes check for https://github.com/couchbase/sync_gateway/issues/3341)
Expand Down
17 changes: 17 additions & 0 deletions rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,23 @@ func (sc *StartupConfig) Validate(ctx context.Context, isEnterpriseEdition bool)
}
}

const (
minConcurrentChangesBatches = 1
maxConcurrentChangesBatches = 5
minConcurrentRevs = 5
maxConcurrentRevs = 200
)
if val := sc.Replicator.MaxConcurrentChangesBatches; val != nil {
if *val < minConcurrentChangesBatches || *val > maxConcurrentChangesBatches {
multiError = multiError.Append(fmt.Errorf("max_concurrent_changes_batches: %d outside allowed range: %d-%d", *val, minConcurrentChangesBatches, maxConcurrentChangesBatches))
}
}
if val := sc.Replicator.MaxConcurrentRevs; val != nil {
if *val < minConcurrentRevs || *val > maxConcurrentRevs {
multiError = multiError.Append(fmt.Errorf("max_concurrent_revs: %d outside allowed range: %d-%d", *val, minConcurrentRevs, maxConcurrentRevs))
}
}

return multiError.ErrorOrNil()
}

Expand Down
6 changes: 4 additions & 2 deletions rest/config_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func registerConfigFlags(config *StartupConfig, fs *flag.FlagSet) map[string]con

"auth.bcrypt_cost": {&config.Auth.BcryptCost, fs.Int("auth.bcrypt_cost", 0, "Cost to use for bcrypt password hashes")},

"replicator.max_heartbeat": {&config.Replicator.MaxHeartbeat, fs.String("replicator.max_heartbeat", "", "Max heartbeat value for _changes request")},
"replicator.blip_compression": {&config.Replicator.BLIPCompression, fs.Int("replicator.blip_compression", 0, "BLIP data compression level (0-9)")},
"replicator.max_heartbeat": {&config.Replicator.MaxHeartbeat, fs.String("replicator.max_heartbeat", "", "Max heartbeat value for _changes request")},
"replicator.blip_compression": {&config.Replicator.BLIPCompression, fs.Int("replicator.blip_compression", 0, "BLIP data compression level (0-9)")},
"replicator.max_concurrent_changes_batches": {&config.Replicator.MaxConcurrentChangesBatches, fs.Int("replicator.max_concurrent_changes_batches", 0, "Maximum number of changes batches to process concurrently per replication")},
"replicator.max_concurrent_revs": {&config.Replicator.MaxConcurrentRevs, fs.Int("replicator.max_concurrent_revs", 0, "Maximum number of revs to process concurrently per replication")},

"unsupported.stats_log_frequency": {&config.Unsupported.StatsLogFrequency, fs.String("unsupported.stats_log_frequency", "", "How often should stats be written to stats logs")},
"unsupported.use_stdlib_json": {&config.Unsupported.UseStdlibJSON, fs.Bool("unsupported.use_stdlib_json", false, "Bypass the jsoniter package and use Go's stdlib instead")},
Expand Down
11 changes: 9 additions & 2 deletions rest/config_startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/couchbase/go-couchbase"
"github.com/couchbase/sync_gateway/auth"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
)

const (
Expand Down Expand Up @@ -58,6 +59,10 @@ func DefaultStartupConfig(defaultLogFilePath string) StartupConfig {
Auth: AuthConfig{
BcryptCost: auth.DefaultBcryptCost,
},
Replicator: ReplicatorConfig{
MaxConcurrentChangesBatches: base.IntPtr(db.DefaultMaxConcurrentChangesBatches),
MaxConcurrentRevs: base.IntPtr(db.DefaultMaxConcurrentRevs),
},
Unsupported: UnsupportedConfig{
StatsLogFrequency: base.NewConfigDuration(time.Minute),
Serverless: ServerlessConfig{
Expand Down Expand Up @@ -137,8 +142,10 @@ type AuthConfig struct {
}

type ReplicatorConfig struct {
MaxHeartbeat *base.ConfigDuration `json:"max_heartbeat,omitempty" help:"Max heartbeat value for _changes request"`
BLIPCompression *int `json:"blip_compression,omitempty" help:"BLIP data compression level (0-9)"`
MaxHeartbeat *base.ConfigDuration `json:"max_heartbeat,omitempty" help:"Max heartbeat value for _changes request"`
BLIPCompression *int `json:"blip_compression,omitempty" help:"BLIP data compression level (0-9)"`
MaxConcurrentChangesBatches *int `json:"max_concurrent_changes_batches,omitempty" help:"Maximum number of changes batches to process concurrently per replication (1-5)"`
MaxConcurrentRevs *int `json:"max_concurrent_revs,omitempty" help:"Maximum number of revs to process concurrently per replication (5-200)"`
}

type UnsupportedConfig struct {
Expand Down
Loading
Loading