Skip to content

Commit

Permalink
[dbnode] Get rid of excessive locking when adding new namespaces (#3765)
Browse files Browse the repository at this point in the history
* Set `Bootstrapping` state when waiting for fileOps to be disabled completed, because it can take a while.

* Release database lock before enqueueing bootstrap because it could take a while.

* Rollback bootstrapping state change.

* disable/enable fileOps when adding a new namespace.

* removed comments added for testing.

* fixed linter issues.

* fixed potential issue when competing bootstraps could be started and `onCompleteFn` might be invoked for the second bootstrap while another bootstrap is still in progress. This is bad because bootstrap should work when fileOps are disabled and `onCompleteFn` might be enabling fileOps.

* fixed typo in comment.

* implemented similar enqueue bootstrap logics to `assignShardSet` as well.

* Disable both file ops first and then wait for them to stop (should reduce waiting times)

* New namespace adds can now be enqueued and be non-blocking.
If db was not yet bootstrapped and new shardSet is assigned, do this immediately (no need to enqueue).
New test for add new namespace using enqueue.

* small cleanup

* reverse commit

* We can enqueue bootstrap with lock to avoid potential race now because enqueueing and waiting is fully async.

* unlock right after `namespaceDeltaWithLock()`

* Simplify callback logic by having the mediator own the lifecycle of the callback functions (#3810)

* Simplify callback logic by having the mediator own the lifecycle of the callback functions

* Remove no longer required BootstrapAsyncResult

* Remove unused require.NoError

* Fix lint

* Added test when bootstrap is enqueued in case new namespaces are added.
Fixed possible race in `context.StartSampledTraceSpan()`.
Use zap error logging where needed.
Make sure fileOps are disabled/enabled only when really needed (bootstraps > 0 and mediator is opened), otherwise just use simpler logics.

* Simplify some code paths further and use lock/defer unlock in more places (#3818)

* Simplify some code paths further and use lock/defer unlock in more places

* Fix test

* More test coverage.

* Fixed possible race in unit test assertion.

Co-authored-by: Rob Skillington <rob.skillington@gmail.com>
  • Loading branch information
soundvibe and robskillington authored Oct 6, 2021
1 parent 8dd5622 commit 5f351fb
Show file tree
Hide file tree
Showing 8 changed files with 433 additions and 141 deletions.
48 changes: 30 additions & 18 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type bootstrapManager struct {
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
pendingOnCompleteFns []BootstrapCompleteFn
sleepFn sleepFn
nowFn clock.NowFn
lastBootstrapCompletionTime xtime.UnixNano
Expand Down Expand Up @@ -116,23 +117,29 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool)
return bsTime, bsTime > 0
}

func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult {
bootstrapAsyncResult := newBootstrapAsyncResult()
go func(r *BootstrapAsyncResult) {
if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping {
func (m *bootstrapManager) BootstrapEnqueue(
opts BootstrapEnqueueOptions,
) {
go func() {
result, err := m.startBootstrap(opts.OnCompleteFn)
if err != nil && !result.AlreadyBootstrapping {
m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping")
}
}(bootstrapAsyncResult)
return bootstrapAsyncResult
}()
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
bootstrapAsyncResult := newBootstrapAsyncResult()
return m.startBootstrap(bootstrapAsyncResult)
return m.startBootstrap(nil)
}

func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) {
func (m *bootstrapManager) startBootstrap(
onCompleteFn BootstrapCompleteFn,
) (BootstrapResult, error) {
m.Lock()
if onCompleteFn != nil {
// Append completion fn if specified.
m.pendingOnCompleteFns = append(m.pendingOnCompleteFns, onCompleteFn)
}
switch m.state {
case Bootstrapping:
// NB(r): Already bootstrapping, now a consequent bootstrap
Expand All @@ -144,26 +151,19 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo
m.hasPending = true
m.Unlock()
result := BootstrapResult{AlreadyBootstrapping: true}
asyncResult.bootstrapResult = result
asyncResult.bootstrapStarted.Done()
asyncResult.bootstrapCompleted.Done()
return result, errBootstrapEnqueued
default:
m.state = Bootstrapping
}
m.Unlock()
// NB(xichen): disable filesystem manager before we bootstrap to minimize
// the impact of file operations on bootstrapping performance
m.instrumentation.log.Info("disable fileOps and wait")
m.mediator.DisableFileOpsAndWait()
defer m.mediator.EnableFileOps()
m.instrumentation.log.Info("fileOps disabled")

var result BootstrapResult
asyncResult.bootstrapStarted.Done()
defer func() {
asyncResult.bootstrapResult = result
asyncResult.bootstrapCompleted.Done()
}()

// Keep performing bootstraps until none pending and no error returned.
for i := 0; true; i++ {
// NB(r): Decouple implementation of bootstrap so can override in tests.
Expand Down Expand Up @@ -209,7 +209,19 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.state = Bootstrapped
// NB(r): Clear out the pending completion functions and execute them if
// needed.
pendingOnCompleteFns := m.pendingOnCompleteFns
m.pendingOnCompleteFns = nil
m.Unlock()

if len(pendingOnCompleteFns) > 0 {
// Execute any on complete functions that were queued.
for _, fn := range pendingOnCompleteFns {
fn(result)
}
}

return result, nil
}

Expand Down
12 changes: 9 additions & 3 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) {

var result BootstrapResult
if async {
asyncResult := bsm.BootstrapEnqueue()
asyncResult.WaitForStart()
result = asyncResult.Result()
var wg sync.WaitGroup
wg.Add(1)
bsm.BootstrapEnqueue(BootstrapEnqueueOptions{
OnCompleteFn: func(r BootstrapResult) {
result = r
wg.Done()
},
})
wg.Wait()
} else {
result, err = bsm.Bootstrap()
require.NoError(t, err)
Expand Down
140 changes: 107 additions & 33 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,14 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error {
}
}

// NB: Can hold lock since all long-running tasks are enqueued to run
// async while holding the lock.
d.Lock()
defer d.Unlock()

removes, adds, updates := d.namespaceDeltaWithLock(newNamespaces)
if err := d.logNamespaceUpdate(removes, adds, updates); err != nil {
enrichedErr := fmt.Errorf("unable to log namespace updates: %v", err)
d.log.Error(enrichedErr.Error())
return enrichedErr
}

// add any namespaces marked for addition
if err := d.addNamespacesWithLock(adds); err != nil {
enrichedErr := fmt.Errorf("unable to add namespaces: %v", err)
d.log.Error(enrichedErr.Error())
d.log.Error("unable to log namespace updates", zap.Error(err))
return err
}

Expand All @@ -317,14 +311,68 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error {
"restart the process if you want changes to take effect")
}

// enqueue bootstraps if new namespaces
if len(adds) > 0 {
d.queueBootstrapWithLock()
}
if d.bootstraps == 0 || !d.mediatorIsOpenWithLock() {
// If no bootstraps yet or mediator is not open we can just
// add the namespaces and optionally enqueue bootstrap (which is
// async) since no file operations can be in place since
// no bootstrap and/or mediator is not open.
if err := d.addNamespacesWithLock(adds); err != nil {
d.log.Error("unable to add namespaces", zap.Error(err))
return err
}

if d.bootstraps > 0 {
// If already bootstrapped before, enqueue another
// bootstrap (asynchronously, ok to trigger holding lock).
d.enqueueBootstrapAsync()
}

return nil
}

// NB: mediator is opened, we need to disable fileOps and wait for all the background processes to complete
// so that we could update namespaces safely. Otherwise, there is a high chance in getting
// invariant violation panic because cold/warm flush will receive new namespaces
// in the middle of their operations.
d.Unlock() // Don't hold the lock while we wait for file ops.
d.disableFileOpsAndWait()
d.Lock() // Reacquire lock after waiting.

// Add any namespaces marked for addition.
if err := d.addNamespacesWithLock(adds); err != nil {
d.log.Error("unable to add namespaces", zap.Error(err))
d.enableFileOps()
return err
}

// Enqueue bootstrap and enable file ops when bootstrap is completed.
d.enqueueBootstrapAsyncWithLock(d.enableFileOps)
}
return nil
}

func (d *db) mediatorIsOpenWithLock() bool {
if d.mediator == nil {
return false
}
return d.mediator.IsOpen()
}

func (d *db) disableFileOpsAndWait() {
if mediator := d.mediator; mediator != nil && mediator.IsOpen() {
d.log.Info("waiting for file ops to be disabled")
mediator.DisableFileOpsAndWait()
}
}

func (d *db) enableFileOps() {
if mediator := d.mediator; mediator != nil && mediator.IsOpen() {
d.log.Info("enabling file ops")
mediator.EnableFileOps()
}
}

func (d *db) namespaceDeltaWithLock(newNamespaces namespace.Map) ([]ident.ID, []namespace.Metadata, []namespace.Metadata) {
var (
existing = d.namespaces
Expand Down Expand Up @@ -454,15 +502,25 @@ func (d *db) Options() Options {
}

func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
// NB: Can hold lock since all long running tasks are enqueued to run
// async while holding the lock.
d.Lock()
defer d.Unlock()

receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
if receivedNewShards {
d.lastReceivedNewShards = d.nowFn()
}
d.Unlock()

if !d.mediator.IsOpen() {
d.assignShardSet(shardSet)
if d.bootstraps == 0 || !d.mediatorIsOpenWithLock() {
// If not bootstrapped before or mediator is not open then can just
// immediately assign shards.
d.assignShardsWithLock(shardSet)
if d.bootstraps > 0 {
// If already bootstrapped before, enqueue another
// bootstrap (asynchronously, ok to trigger holding lock).
d.enqueueBootstrapAsync()
}
return
}

Expand All @@ -480,28 +538,34 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
}

func (d *db) assignShardSet(shardSet sharding.ShardSet) {
d.RLock()
receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
d.RUnlock()

if receivedNewShards {
// Wait outside of holding lock to disable file operations.
d.disableFileOpsAndWait()
}

// NB: Can hold lock since all long-running tasks are enqueued to run
// async while holding the lock.
d.Lock()
defer d.Unlock()

d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs()))
d.assignShardsWithLock(shardSet)

receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
d.shardSet = shardSet
if receivedNewShards {
d.enqueueBootstrapAsyncWithLock(d.enableFileOps)
}
}

func (d *db) assignShardsWithLock(shardSet sharding.ShardSet) {
d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs()))
d.shardSet = shardSet
for _, elem := range d.namespaces.Iter() {
ns := elem.Value()
ns.AssignShardSet(shardSet)
}

if receivedNewShards {
// Only trigger a bootstrap if the node received new shards otherwise
// the nodes will perform lots of small bootstraps (that accomplish nothing)
// during topology changes as other nodes mark their shards as available.
//
// These small bootstraps can significantly delay topology changes as they prevent
// the nodes from marking themselves as bootstrapped and durable, for example.
d.queueBootstrapWithLock()
}
}

func (d *db) hasReceivedNewShardsWithLock(incoming sharding.ShardSet) bool {
Expand Down Expand Up @@ -533,7 +597,12 @@ func (d *db) ShardSet() sharding.ShardSet {
return shardSet
}

func (d *db) queueBootstrapWithLock() {
func (d *db) enqueueBootstrapAsync() {
d.log.Info("enqueuing bootstrap")
d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{})
}

func (d *db) enqueueBootstrapAsyncWithLock(onCompleteFn func()) {
// Only perform a bootstrap if at least one bootstrap has already occurred. This enables
// the ability to open the clustered database and assign shardsets to the non-clustered
// database when it receives an initial topology (as well as topology changes) without
Expand All @@ -542,11 +611,16 @@ func (d *db) queueBootstrapWithLock() {
// the non-clustered database bootstrapped by assigning it shardsets which will trigger new
// bootstraps since d.bootstraps > 0 will be true.
if d.bootstraps > 0 {
bootstrapAsyncResult := d.mediator.BootstrapEnqueue()
// NB(linasn): We need to wait for the bootstrap to start and set it's state to Bootstrapping in order
// to safely run fileOps in mediator later.
bootstrapAsyncResult.WaitForStart()
d.log.Info("enqueuing bootstrap with onComplete function")
d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{
OnCompleteFn: func(_ BootstrapResult) {
onCompleteFn()
},
})
return
}

onCompleteFn()
}

func (d *db) Namespace(id ident.ID) (Namespace, bool) {
Expand Down
Loading

0 comments on commit 5f351fb

Please sign in to comment.