Skip to content

Commit

Permalink
CBG-4174 Force disconnection of blip clients on database close (#7166)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcfraser authored Oct 22, 2024
1 parent a6de9a4 commit 9326dbd
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 15 deletions.
3 changes: 2 additions & 1 deletion db/active_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func connect(arc *activeReplicatorCommon, idSuffix string) (blipSender *blip.Sen
arc.replicationStats.NumConnectAttempts.Add(1)

var originPatterns []string // no origin headers for ISGR
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns)
// NewSGBlipContext doesn't set cancellation context - active replication cancellation on db close is handled independently
blipContext, err := NewSGBlipContext(arc.ctx, arc.config.ID+idSuffix, originPatterns, nil)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion db/active_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestBlipSyncErrorUserinfo(t *testing.T) {
srvURL.Path = "/db1"
t.Logf("srvURL: %v", srvURL.String())

blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil)
blipContext, err := NewSGBlipContext(base.TestCtx(t), t.Name(), nil, nil)
require.NoError(t, err)

_, err = blipSync(*srvURL, blipContext, false)
Expand Down
7 changes: 4 additions & 3 deletions db/blip.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ var (
)

// NewSGBlipContext returns a go-blip context with the given ID, initialized for use in Sync Gateway.
func NewSGBlipContext(ctx context.Context, id string, origin []string) (bc *blip.Context, err error) {
return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols())
func NewSGBlipContext(ctx context.Context, id string, origin []string, cancelCtx context.Context) (bc *blip.Context, err error) {
return NewSGBlipContextWithProtocols(ctx, id, origin, supportedSubprotocols(), cancelCtx)
}

func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string) (bc *blip.Context, err error) {
func NewSGBlipContextWithProtocols(ctx context.Context, id string, origin []string, protocols []string, cancelCtx context.Context) (bc *blip.Context, err error) {
opts := blip.ContextOptions{
Origin: origin,
ProtocolIds: protocols,
CancelCtx: cancelCtx,
}
if id == "" {
bc, err = blip.NewContext(opts)
Expand Down
20 changes: 16 additions & 4 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ type DatabaseContext struct {
LocalJWTProviders auth.LocalJWTProviderMap
ServerUUID string // UUID of the server, if available

DbStats *base.DbStats // stats that correspond to this database context
CompactState uint32 // Status of database compaction
terminator chan bool // Signal termination of background goroutines

DbStats *base.DbStats // stats that correspond to this database context
CompactState uint32 // Status of database compaction
terminator chan bool // Signal termination of background goroutines
CancelContext context.Context // Cancelled when the database is closed - used to notify associated processes (e.g. blipContext)
cancelContextFunc context.CancelFunc // Cancel function for cancelContext
backgroundTasks []BackgroundTask // List of background tasks that are initiated.
activeChannels *channels.ActiveChannels // Tracks active replications by channel
CfgSG cbgt.Cfg // Sync Gateway cluster shared config
Expand Down Expand Up @@ -417,6 +418,14 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
UserFunctionTimeout: defaultUserFunctionTimeout,
}

// set up cancellable context based on the background context (context lifecycle for the database
// must be distinct from the request context associated with the db create/update). Used to trigger
// teardown of connected replications on database close.
dbContext.CancelContext, dbContext.cancelContextFunc = context.WithCancel(context.Background())
cleanupFunctions = append(cleanupFunctions, func() {
dbContext.cancelContextFunc()
})

// Check if server version supports multi-xattr operations, required for mou handling
dbContext.EnableMou = bucket.IsSupported(sgbucket.BucketStoreFeatureMultiXattrSubdocOperations)

Expand Down Expand Up @@ -591,6 +600,9 @@ func (context *DatabaseContext) Close(ctx context.Context) {

context.OIDCProviders.Stop()
close(context.terminator)
if context.cancelContextFunc != nil {
context.cancelContextFunc()
}

// Stop All background processors
bgManagers := context.stopBackgroundManagers()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/couchbase/cbgt v1.3.9
github.com/couchbase/clog v0.1.0
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949
github.com/couchbase/gocb/v2 v2.9.1
github.com/couchbase/gocbcore/v10 v10.5.1
github.com/couchbase/gomemcached v0.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ github.com/couchbase/cbgt v1.3.9 h1:MAT3FwD1ctekxuFe0yau0H1BCTvgLXvh1ipbZ3nZhBE=
github.com/couchbase/cbgt v1.3.9/go.mod h1:MImhtmvk0qjJit5HbmA34tnYThZoNtvgjL7jJH/kCAE=
github.com/couchbase/clog v0.1.0 h1:4Kh/YHkhRjMCbdQuvRVsm39XZh4FtL1d8fAwJsHrEPY=
github.com/couchbase/clog v0.1.0/go.mod h1:7tzUpEOsE+fgU81yfcjy5N1H6XtbVC8SgOz/3mCjmd4=
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3 h1:MeikDkvUMHZLpS57pfzhu2E+disqUVulUTb/r3aqUck=
github.com/couchbase/go-blip v0.0.0-20231212195435-3490e96d30e3/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE=
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949 h1:jwFj/GtyaoACmwnGfan/XW38TBTG1kYboXLZfAqd2VE=
github.com/couchbase/go-blip v0.0.0-20241014142134-cc8d8ebf1949/go.mod h1:Dz8Keu17/4cjF7hvKYqOjH4pRXOh1CCnzsKlBOJaoJE=
github.com/couchbase/go-couchbase v0.1.1 h1:ClFXELcKj/ojyoTYbsY34QUrrYCBi/1G749sXSCkdhk=
github.com/couchbase/go-couchbase v0.1.1/go.mod h1:+/bddYDxXsf9qt0xpDUtRR47A2GjaXmGGAqQ/k3GJ8A=
github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ=
Expand Down
34 changes: 34 additions & 0 deletions rest/blip_api_crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3167,3 +3167,37 @@ func TestOnDemandImportBlipFailure(t *testing.T) {
}
})
}

// TestBlipDatabaseClose verifies that the client connection is closed when the database is closed.
// Starts a continuous pull replication then updates the db to trigger a close.
func TestBlipDatabaseClose(t *testing.T) {

base.SetUpTestLogging(t, base.LevelInfo, base.KeyHTTP, base.KeySync, base.KeySyncMsg, base.KeyChanges, base.KeyCache)
btcRunner := NewBlipTesterClientRunner(t)
btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTesterPersistentConfig(t)
defer rt.Close()
const username = "alice"
rt.CreateUser(username, []string{"*"})
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, &BlipTesterClientOpts{Username: username})
var blipContextClosed atomic.Bool
btcRunner.clients[btc.id].pullReplication.bt.blipContext.OnExitCallback = func() {
log.Printf("on exit callback invoked")
blipContextClosed.Store(true)
}

// put a doc, and make sure blip connection is established
markerDoc := "markerDoc"
markerDocVersion := rt.CreateTestDoc(markerDoc)
require.NoError(t, rt.WaitForPendingChanges())
require.NoError(t, btcRunner.StartPull(btc.id))

btcRunner.WaitForVersion(btc.id, markerDoc, markerDocVersion)

RequireStatus(t, rt.SendAdminRequest(http.MethodDelete, "/{{.db}}/", ""), http.StatusOK)

require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, blipContextClosed.Load())
}, time.Second*10, time.Millisecond*100)
})
}
2 changes: 1 addition & 1 deletion rest/blip_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (h *handler) handleBLIPSync() error {
originPatterns, _ := hostOnlyCORS(h.db.CORS.Origin)

// Create a BLIP context:
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns)
blipContext, err := db.NewSGBlipContext(h.ctx(), "", originPatterns, h.db.DatabaseContext.CancelContext)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,8 +1522,9 @@ func createBlipTesterWithSpec(tb testing.TB, spec BlipTesterSpec, rt *RestTester
if err != nil {
return nil, err
}
// Make BLIP/Websocket connection
bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols)
// Make BLIP/Websocket connection. Not specifying cancellation context here as this is a
// client blip context that doesn't require cancellation-based close
bt.blipContext, err = db.NewSGBlipContextWithProtocols(base.TestCtx(tb), "", origin, protocols, nil)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9326dbd

Please sign in to comment.