Skip to content

Commit

Permalink
[dbnode] Assign shards refactoring (#3851)
Browse files Browse the repository at this point in the history
* Increase `defaultExternalChannelSize` to 1024 because previous value (8) was too small, so we might in theory block enqueueing mutually exclusive functions when many kv watch updates are received.

* Refactored `AssignShardSet` without enqueueing function.

* Fixed linter issues.

* Need to release waiters.

* Unlock `assignShardSetMutex` asynchronously when bootstrap is enqueued and completed.
  • Loading branch information
soundvibe authored Oct 26, 2021
1 parent aae5af3 commit 40bc7ff
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 179 deletions.
97 changes: 67 additions & 30 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/generated/proto/annotation"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
Expand Down Expand Up @@ -97,8 +98,9 @@ type increasingIndex interface {

type db struct {
sync.RWMutex
opts Options
nowFn clock.NowFn
assignShardSetMutex sync.Mutex
opts Options
nowFn clock.NowFn

nsWatch namespace.NamespaceWatch
namespaces *databaseNamespacesMap
Expand Down Expand Up @@ -502,13 +504,28 @@ func (d *db) Options() Options {
}

func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
// NB: Use assignShardSetMutex to protect from competing calls.
d.assignShardSetMutex.Lock()
asyncUnlock := false
defer func() {
if !asyncUnlock {
// Unlock only if asyncUnlock is not set. Otherwise, we will unlock asynchronously.
d.assignShardSetMutex.Unlock()
}
}()
// NB: Can hold lock since all long running tasks are enqueued to run
// async while holding the lock.
d.Lock()
defer d.Unlock()

receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
if receivedNewShards {
added, removed, updated := d.shardsDeltaWithLock(shardSet)

if !added && !removed && !updated {
d.log.Info("received identical shardSet, skipping shard assignment")
return
}

if added {
d.lastReceivedNewShards = d.nowFn()
}

Expand All @@ -524,38 +541,21 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
return
}

if err := d.mediator.EnqueueMutuallyExclusiveFn(func() {
d.assignShardSet(shardSet)
}); err != nil {
// should not happen.
instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("failed to enqueue assignShardSet fn",
zap.Error(err),
zap.Uint32s("shards", shardSet.AllIDs()))
})
}
}

func (d *db) assignShardSet(shardSet sharding.ShardSet) {
d.RLock()
receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
d.RUnlock()

if receivedNewShards {
if added {
// Wait outside of holding lock to disable file operations.
d.Unlock()
d.disableFileOpsAndWait()
d.Lock()
}

// NB: Can hold lock since all long-running tasks are enqueued to run
// async while holding the lock.
d.Lock()
defer d.Unlock()

d.assignShardsWithLock(shardSet)

if receivedNewShards {
d.enqueueBootstrapAsyncWithLock(d.enableFileOps)
if added {
asyncUnlock = true
d.enqueueBootstrapAsyncWithLock(func() {
d.enableFileOps()
d.assignShardSetMutex.Unlock()
})
}
}

Expand All @@ -568,6 +568,43 @@ func (d *db) assignShardsWithLock(shardSet sharding.ShardSet) {
}
}

func (d *db) shardsDeltaWithLock(incoming sharding.ShardSet) (bool, bool, bool) {
var (
existing = d.shardSet
existingShards = existing.All()
incomingShards = incoming.All()
existingSet = make(map[uint32]shard.Shard, len(existingShards))
incomingSet = make(map[uint32]shard.Shard, len(incomingShards))
added bool
removed bool
updated bool
)

for _, shard := range existingShards {
existingSet[shard.ID()] = shard
}

for _, shard := range incomingShards {
incomingSet[shard.ID()] = shard
existingShard, ok := existingSet[shard.ID()]
if !ok {
added = true
} else if !existingShard.Equals(shard) {
updated = true
}
}

for shardID := range existingSet {
_, ok := incomingSet[shardID]
if !ok {
removed = true
break
}
}

return added, removed, updated
}

func (d *db) hasReceivedNewShardsWithLock(incoming sharding.ShardSet) bool {
var (
existing = d.shardSet
Expand Down
154 changes: 107 additions & 47 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,24 +477,10 @@ func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) {
close(mapCh)
}()

var ns []*MockdatabaseNamespace
ns = append(ns, dbAddNewMockNamespace(ctrl, d, "testns1"))
ns = append(ns, dbAddNewMockNamespace(ctrl, d, "testns2"))

var wg sync.WaitGroup
wg.Add(len(ns))
for _, n := range ns {
n.EXPECT().AssignShardSet(d.shardSet).Do(func(_ sharding.ShardSet) {
wg.Done()
})
}

t1 := d.lastReceivedNewShards
d.AssignShardSet(d.shardSet)
// Ensure that lastReceivedNewShards is not updated if no new shards are assigned.
require.True(t, d.lastReceivedNewShards.Equal(t1))

wg.Wait()
}

func TestDatabaseBootstrappedAssignShardSet(t *testing.T) {
Expand All @@ -512,13 +498,6 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) {
mediator.EXPECT().IsOpen().Return(true).AnyTimes()
mediator.EXPECT().DisableFileOpsAndWait().AnyTimes()
mediator.EXPECT().EnableFileOps().AnyTimes()
mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error {
// Spawn async since this method is called while DB holds
// lock and expects the mediator to call it asynchronously
// (which avoids deadlocking).
go fn()
return nil
})
mediator.EXPECT().Bootstrap().DoAndReturn(func() (BootstrapResult, error) {
return BootstrapResult{}, nil
})
Expand Down Expand Up @@ -546,32 +525,6 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) {
wg.Wait()
}

func TestDatabaseAssignShardSetShouldPanic(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped)
defer func() {
close(mapCh)
}()

d.bootstraps = 1
mediator := NewMockdatabaseMediator(ctrl)
mediator.EXPECT().IsOpen().Return(true)
mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).Return(errors.New("unknown error"))
d.mediator = mediator

shards := append(sharding.NewShards([]uint32{0, 1}, shard.Available),
sharding.NewShards([]uint32{2}, shard.Initializing)...)
shardSet, err := sharding.NewShardSet(shards, nil)
require.NoError(t, err)

defer instrument.SetShouldPanicEnvironmentVariable(true)()
require.Panics(t, func() {
d.AssignShardSet(shardSet)
})
}

func TestDatabaseRemoveNamespace(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1748,6 +1701,113 @@ func TestNewAggregateTilesOptions(t *testing.T) {
assert.NoError(t, err)
}

func TestShardsDelta(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewBackground()
defer ctx.Close()

d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped)
defer func() {
close(mapCh)
}()

shards := append(sharding.NewShards([]uint32{0, 1}, shard.Available),
sharding.NewShards([]uint32{2}, shard.Initializing)...)
shardSet, err := sharding.NewShardSet(shards, nil)
require.NoError(t, err)

d.shardSet = shardSet

t.Run("unchanged", func(t *testing.T) {
incoming := append(sharding.NewShards([]uint32{0, 1}, shard.Available),
sharding.NewShards([]uint32{2}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incoming, nil)
require.NoError(t, err)

added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.False(t, added)
require.False(t, removed)
require.False(t, updated)
})

t.Run("added-updated-deleted", func(t *testing.T) {
incomingAddedRemovedUpdated := append(sharding.NewShards([]uint32{1, 2}, shard.Available),
sharding.NewShards([]uint32{3}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incomingAddedRemovedUpdated, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.True(t, added)
require.True(t, removed)
require.True(t, updated)
})

t.Run("added", func(t *testing.T) {
incomingAdded := append(sharding.NewShards([]uint32{0, 1}, shard.Available),
sharding.NewShards([]uint32{2, 3}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incomingAdded, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.True(t, added)
require.False(t, removed)
require.False(t, updated)
})

t.Run("updated", func(t *testing.T) {
incomingUpdated := sharding.NewShards([]uint32{0, 1, 2}, shard.Available)
shardSet, err = sharding.NewShardSet(incomingUpdated, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.False(t, added)
require.False(t, removed)
require.True(t, updated)
})

t.Run("removed", func(t *testing.T) {
incomingRemoved := sharding.NewShards([]uint32{0, 1}, shard.Available)
shardSet, err = sharding.NewShardSet(incomingRemoved, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.False(t, added)
require.True(t, removed)
require.False(t, updated)
})

t.Run("added-updated", func(t *testing.T) {
incomingAddedUpdated := append(sharding.NewShards([]uint32{0, 1, 2}, shard.Available),
sharding.NewShards([]uint32{3}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incomingAddedUpdated, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.True(t, added)
require.False(t, removed)
require.True(t, updated)
})

t.Run("added-removed", func(t *testing.T) {
incomingAddedRemoved := append(sharding.NewShards([]uint32{1}, shard.Available),
sharding.NewShards([]uint32{3}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incomingAddedRemoved, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.True(t, added)
require.True(t, removed)
require.False(t, updated)
})

t.Run("updated-removed", func(t *testing.T) {
incomingUpdatedRemoved := append(sharding.NewShards([]uint32{0}, shard.Available),
sharding.NewShards([]uint32{1}, shard.Initializing)...)
shardSet, err = sharding.NewShardSet(incomingUpdatedRemoved, nil)
require.NoError(t, err)
added, removed, updated := d.shardsDeltaWithLock(shardSet)
require.False(t, added)
require.True(t, removed)
require.True(t, updated)
})
}

func assertFileOpsEnabled(t *testing.T, d *db) {
mediator := d.mediator.(*mediator)
coldFlushManager := mediator.databaseColdFlushManager.(*coldFlushManager)
Expand Down
Loading

0 comments on commit 40bc7ff

Please sign in to comment.