diff --git a/src/cmd/services/m3dbnode/config/cache.go b/src/cmd/services/m3dbnode/config/cache.go index b9e2207015..7e29c6e040 100644 --- a/src/cmd/services/m3dbnode/config/cache.go +++ b/src/cmd/services/m3dbnode/config/cache.go @@ -47,5 +47,6 @@ type SeriesCacheConfiguration struct { // LRUSeriesCachePolicyConfiguration contains configuration for the LRU // series caching policy. type LRUSeriesCachePolicyConfiguration struct { - MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"` + MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"` + EventsChannelSize uint `yaml:"eventsChannelSize" validate:"nonzero"` } diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index c6296838ee..67ff39f038 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -298,11 +298,13 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup // Set up wired list if required if storageOpts.SeriesCachePolicy() == series.CacheLRU { - wiredList := block.NewWiredList( - runtimeOptsMgr, - storageOpts.InstrumentOptions(), - storageOpts.ClockOptions(), - ) + wiredList := block.NewWiredList(block.WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: storageOpts.InstrumentOptions(), + ClockOptions: storageOpts.ClockOptions(), + // Use a small event channel size to stress-test the implementation + EventsChannelSize: 1, + }) blockOpts := storageOpts.DatabaseBlockOptions().SetWiredList(wiredList) blockPool := block.NewDatabaseBlockPool(nil) // Have to manually set the blockpool because the default one uses a constructor diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index efb83ce84d..7d6415c552 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -326,7 +326,7 @@ func Run(runOpts RunOptions) { opts = opts.SetSeriesCachePolicy(seriesCachePolicy) // Apply pooling options - opts = withEncodingAndPoolingOptions(logger, opts, cfg.PoolingPolicy) + opts = withEncodingAndPoolingOptions(cfg, logger, opts, cfg.PoolingPolicy) // Setup the block retriever switch seriesCachePolicy { @@ -882,6 +882,7 @@ func kvWatchBootstrappers( } func withEncodingAndPoolingOptions( + cfg config.DBConfiguration, logger xlog.Logger, opts storage.Options, policy config.PoolingPolicy, @@ -1003,8 +1004,20 @@ func withEncodingAndPoolingOptions( SetBytesPool(bytesPool) if opts.SeriesCachePolicy() == series.CacheLRU { - runtimeOpts := opts.RuntimeOptionsManager() - wiredList := block.NewWiredList(runtimeOpts, iopts, opts.ClockOptions()) + var ( + runtimeOpts = opts.RuntimeOptionsManager() + wiredListOpts = block.WiredListOptions{ + RuntimeOptionsManager: runtimeOpts, + InstrumentOptions: iopts, + ClockOptions: opts.ClockOptions(), + } + lruCfg = cfg.Cache.SeriesConfiguration().LRU + ) + + if lruCfg != nil && lruCfg.EventsChannelSize > 0 { + wiredListOpts.EventsChannelSize = int(lruCfg.EventsChannelSize) + } + wiredList := block.NewWiredList(wiredListOpts) blockOpts = blockOpts.SetWiredList(wiredList) } blockPool := block.NewDatabaseBlockPool(poolOptions(policy.BlockPool, diff --git a/src/dbnode/storage/block/block.go b/src/dbnode/storage/block/block.go index dd6797f768..5d77905699 100644 --- a/src/dbnode/storage/block/block.go +++ b/src/dbnode/storage/block/block.go @@ -73,9 +73,9 @@ type dbBlock struct { } type listState struct { - next DatabaseBlock - prev DatabaseBlock - nextPrevUpdatedAtUnixNano int64 + next DatabaseBlock + prev DatabaseBlock + enteredListAtUnixNano int64 } // NewDatabaseBlock creates a new DatabaseBlock instance. @@ -414,19 +414,37 @@ func (b *dbBlock) resetRetrievableWithLock( } func (b *dbBlock) Discard() ts.Segment { - return b.closeAndDiscard() + seg, _ := b.closeAndDiscardConditionally(nil) + return seg } func (b *dbBlock) Close() { - segment := b.closeAndDiscard() + segment, _ := b.closeAndDiscardConditionally(nil) segment.Finalize() } -func (b *dbBlock) closeAndDiscard() ts.Segment { +func (b *dbBlock) CloseIfFromDisk() bool { + segment, ok := b.closeAndDiscardConditionally(func(b *dbBlock) bool { + return b.wasRetrievedFromDisk + }) + if !ok { + return false + } + segment.Finalize() + return true +} + +func (b *dbBlock) closeAndDiscardConditionally(condition func(b *dbBlock) bool) (ts.Segment, bool) { b.Lock() + + if condition != nil && !condition(b) { + b.Unlock() + return ts.Segment{}, false + } + if b.closed { b.Unlock() - return ts.Segment{} + return ts.Segment{}, true } segment := b.segment @@ -439,7 +457,7 @@ func (b *dbBlock) closeAndDiscard() ts.Segment { pool.Put(b) } - return segment + return segment, true } func (b *dbBlock) resetMergeTargetWithLock() { @@ -470,13 +488,13 @@ func (b *dbBlock) setPrev(value DatabaseBlock) { } // Should only be used by the WiredList. -func (b *dbBlock) nextPrevUpdatedAtUnixNano() int64 { - return b.listState.nextPrevUpdatedAtUnixNano +func (b *dbBlock) enteredListAtUnixNano() int64 { + return b.listState.enteredListAtUnixNano } // Should only be used by the WiredList. -func (b *dbBlock) setNextPrevUpdatedAtUnixNano(value int64) { - b.listState.nextPrevUpdatedAtUnixNano = value +func (b *dbBlock) setEnteredListAtUnixNano(value int64) { + b.listState.enteredListAtUnixNano = value } // wiredListEntry is a snapshot of a subset of the block's state that the WiredList diff --git a/src/dbnode/storage/block/block_mock.go b/src/dbnode/storage/block/block_mock.go index 99cc893367..e6e757a660 100644 --- a/src/dbnode/storage/block/block_mock.go +++ b/src/dbnode/storage/block/block_mock.go @@ -461,6 +461,18 @@ func (mr *MockDatabaseBlockMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDatabaseBlock)(nil).Close)) } +// CloseIfFromDisk mocks base method +func (m *MockDatabaseBlock) CloseIfFromDisk() bool { + ret := m.ctrl.Call(m, "CloseIfFromDisk") + ret0, _ := ret[0].(bool) + return ret0 +} + +// CloseIfFromDisk indicates an expected call of CloseIfFromDisk +func (mr *MockDatabaseBlockMockRecorder) CloseIfFromDisk() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseIfFromDisk", reflect.TypeOf((*MockDatabaseBlock)(nil).CloseIfFromDisk)) +} + // SetOnEvictedFromWiredList mocks base method func (m *MockDatabaseBlock) SetOnEvictedFromWiredList(arg0 OnEvictedFromWiredList) { m.ctrl.Call(m, "SetOnEvictedFromWiredList", arg0) @@ -527,26 +539,26 @@ func (mr *MockDatabaseBlockMockRecorder) setPrev(block interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setPrev", reflect.TypeOf((*MockDatabaseBlock)(nil).setPrev), block) } -// nextPrevUpdatedAtUnixNano mocks base method -func (m *MockDatabaseBlock) nextPrevUpdatedAtUnixNano() int64 { - ret := m.ctrl.Call(m, "nextPrevUpdatedAtUnixNano") +// enteredListAtUnixNano mocks base method +func (m *MockDatabaseBlock) enteredListAtUnixNano() int64 { + ret := m.ctrl.Call(m, "enteredListAtUnixNano") ret0, _ := ret[0].(int64) return ret0 } -// nextPrevUpdatedAtUnixNano indicates an expected call of nextPrevUpdatedAtUnixNano -func (mr *MockDatabaseBlockMockRecorder) nextPrevUpdatedAtUnixNano() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "nextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).nextPrevUpdatedAtUnixNano)) +// enteredListAtUnixNano indicates an expected call of enteredListAtUnixNano +func (mr *MockDatabaseBlockMockRecorder) enteredListAtUnixNano() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "enteredListAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).enteredListAtUnixNano)) } -// setNextPrevUpdatedAtUnixNano mocks base method -func (m *MockDatabaseBlock) setNextPrevUpdatedAtUnixNano(value int64) { - m.ctrl.Call(m, "setNextPrevUpdatedAtUnixNano", value) +// setEnteredListAtUnixNano mocks base method +func (m *MockDatabaseBlock) setEnteredListAtUnixNano(value int64) { + m.ctrl.Call(m, "setEnteredListAtUnixNano", value) } -// setNextPrevUpdatedAtUnixNano indicates an expected call of setNextPrevUpdatedAtUnixNano -func (mr *MockDatabaseBlockMockRecorder) setNextPrevUpdatedAtUnixNano(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setNextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).setNextPrevUpdatedAtUnixNano), value) +// setEnteredListAtUnixNano indicates an expected call of setEnteredListAtUnixNano +func (mr *MockDatabaseBlockMockRecorder) setEnteredListAtUnixNano(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setEnteredListAtUnixNano", reflect.TypeOf((*MockDatabaseBlock)(nil).setEnteredListAtUnixNano), value) } // wiredListEntry mocks base method @@ -628,26 +640,26 @@ func (mr *MockdatabaseBlockMockRecorder) setPrev(block interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setPrev", reflect.TypeOf((*MockdatabaseBlock)(nil).setPrev), block) } -// nextPrevUpdatedAtUnixNano mocks base method -func (m *MockdatabaseBlock) nextPrevUpdatedAtUnixNano() int64 { - ret := m.ctrl.Call(m, "nextPrevUpdatedAtUnixNano") +// enteredListAtUnixNano mocks base method +func (m *MockdatabaseBlock) enteredListAtUnixNano() int64 { + ret := m.ctrl.Call(m, "enteredListAtUnixNano") ret0, _ := ret[0].(int64) return ret0 } -// nextPrevUpdatedAtUnixNano indicates an expected call of nextPrevUpdatedAtUnixNano -func (mr *MockdatabaseBlockMockRecorder) nextPrevUpdatedAtUnixNano() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "nextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).nextPrevUpdatedAtUnixNano)) +// enteredListAtUnixNano indicates an expected call of enteredListAtUnixNano +func (mr *MockdatabaseBlockMockRecorder) enteredListAtUnixNano() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "enteredListAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).enteredListAtUnixNano)) } -// setNextPrevUpdatedAtUnixNano mocks base method -func (m *MockdatabaseBlock) setNextPrevUpdatedAtUnixNano(value int64) { - m.ctrl.Call(m, "setNextPrevUpdatedAtUnixNano", value) +// setEnteredListAtUnixNano mocks base method +func (m *MockdatabaseBlock) setEnteredListAtUnixNano(value int64) { + m.ctrl.Call(m, "setEnteredListAtUnixNano", value) } -// setNextPrevUpdatedAtUnixNano indicates an expected call of setNextPrevUpdatedAtUnixNano -func (mr *MockdatabaseBlockMockRecorder) setNextPrevUpdatedAtUnixNano(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setNextPrevUpdatedAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).setNextPrevUpdatedAtUnixNano), value) +// setEnteredListAtUnixNano indicates an expected call of setEnteredListAtUnixNano +func (mr *MockdatabaseBlockMockRecorder) setEnteredListAtUnixNano(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setEnteredListAtUnixNano", reflect.TypeOf((*MockdatabaseBlock)(nil).setEnteredListAtUnixNano), value) } // wiredListEntry mocks base method diff --git a/src/dbnode/storage/block/block_test.go b/src/dbnode/storage/block/block_test.go index c27718217e..84e74ab6ad 100644 --- a/src/dbnode/storage/block/block_test.go +++ b/src/dbnode/storage/block/block_test.go @@ -551,6 +551,18 @@ func TestDatabaseBlockStreamMergePerformsCopy(t *testing.T) { require.NoError(t, iter.Err()) } +func TestDatabaseBlockCloseIfFromDisk(t *testing.T) { + var ( + blockOpts = NewOptions() + blockNotFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock) + blockFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock) + ) + blockFromDisk.wasRetrievedFromDisk = true + + require.False(t, blockNotFromDisk.CloseIfFromDisk()) + require.True(t, blockFromDisk.CloseIfFromDisk()) +} + func TestDatabaseSeriesBlocksAddBlock(t *testing.T) { now := time.Now() blockTimes := []time.Time{now, now.Add(time.Second), now.Add(time.Minute), now.Add(-time.Second), now.Add(-time.Hour)} diff --git a/src/dbnode/storage/block/types.go b/src/dbnode/storage/block/types.go index ba62328bc3..97fa6eea7d 100644 --- a/src/dbnode/storage/block/types.go +++ b/src/dbnode/storage/block/types.go @@ -197,6 +197,11 @@ type DatabaseBlock interface { // Close closes the block. Close() + // CloseIfFromDisk atomically checks if the disk was retrieved from disk, and + // if so, closes it. It is meant as a layered protection for the WiredList + // which should only close blocks that were retrieved from disk. + CloseIfFromDisk() bool + // SetOnEvictedFromWiredList sets the owner of the block SetOnEvictedFromWiredList(OnEvictedFromWiredList) @@ -213,8 +218,8 @@ type databaseBlock interface { setNext(block DatabaseBlock) prev() DatabaseBlock setPrev(block DatabaseBlock) - nextPrevUpdatedAtUnixNano() int64 - setNextPrevUpdatedAtUnixNano(value int64) + enteredListAtUnixNano() int64 + setEnteredListAtUnixNano(value int64) wiredListEntry() wiredListEntry } diff --git a/src/dbnode/storage/block/wired_list.go b/src/dbnode/storage/block/wired_list.go index 32fbc9eac6..b9c41efa30 100644 --- a/src/dbnode/storage/block/wired_list.go +++ b/src/dbnode/storage/block/wired_list.go @@ -64,8 +64,8 @@ import ( ) const ( - wiredListEventsChannelLength = 65536 - wiredListSampleGaugesEvery = 100 + defaultWiredListEventsChannelSize = 65536 + wiredListSampleGaugesEvery = 100 ) var ( @@ -82,13 +82,14 @@ type WiredList struct { // Max wired blocks, must use atomic store and load to access. maxWired int64 - root dbBlock - length int - updatesCh chan DatabaseBlock - doneCh chan struct{} + root dbBlock + length int + updatesChSize int + updatesCh chan DatabaseBlock + doneCh chan struct{} metrics wiredListMetrics - logger xlog.Logger + iOpts instrument.Options } type wiredListMetrics struct { @@ -118,22 +119,31 @@ func newWiredListMetrics(scope tally.Scope) wiredListMetrics { } } +// WiredListOptions is the options struct for the WiredList constructor. +type WiredListOptions struct { + RuntimeOptionsManager runtime.OptionsManager + InstrumentOptions instrument.Options + ClockOptions clock.Options + EventsChannelSize int +} + // NewWiredList returns a new database block wired list. -func NewWiredList( - runtimeOptsMgr runtime.OptionsManager, - iopts instrument.Options, - copts clock.Options, -) *WiredList { - scope := iopts.MetricsScope(). +func NewWiredList(opts WiredListOptions) *WiredList { + scope := opts.InstrumentOptions.MetricsScope(). SubScope("wired-list") l := &WiredList{ - nowFn: copts.NowFn(), + nowFn: opts.ClockOptions.NowFn(), metrics: newWiredListMetrics(scope), - logger: iopts.Logger(), + iOpts: opts.InstrumentOptions, + } + if opts.EventsChannelSize > 0 { + l.updatesChSize = opts.EventsChannelSize + } else { + l.updatesChSize = defaultWiredListEventsChannelSize } l.root.setNext(&l.root) l.root.setPrev(&l.root) - runtimeOptsMgr.RegisterListener(l) + opts.RuntimeOptionsManager.RegisterListener(l) return l } @@ -151,7 +161,7 @@ func (l *WiredList) Start() error { return errAlreadyStarted } - l.updatesCh = make(chan DatabaseBlock, wiredListEventsChannelLength) + l.updatesCh = make(chan DatabaseBlock, l.updatesChSize) l.doneCh = make(chan struct{}, 1) go func() { i := 0 @@ -188,34 +198,39 @@ func (l *WiredList) Stop() error { return nil } -// Update places the block into the channel of blocks which are waiting to notify the +// BlockingUpdate places the block into the channel of blocks which are waiting to notify the // wired list that they were accessed. All updates must be processed through this channel // to force synchronization. // // We use a channel and a background processing goroutine to reduce blocking / lock contention. -func (l *WiredList) Update(v DatabaseBlock) { +func (l *WiredList) BlockingUpdate(v DatabaseBlock) { l.updatesCh <- v } +// NonBlockingUpdate will attempt to put the block in the events channel, but will not block +// if the channel is full. Used in cases where a blocking update could trigger deadlock with +// the WiredList itself. +func (l *WiredList) NonBlockingUpdate(v DatabaseBlock) bool { + select { + case l.updatesCh <- v: + return true + default: + return false + } +} + // processUpdateBlock inspects a block that has been modified or read recently // and determines what outcome its state should have on the wired list. func (l *WiredList) processUpdateBlock(v DatabaseBlock) { entry := v.wiredListEntry() - if !entry.wasRetrievedFromDisk { - // The WiredList should should never receive blocks that were not retrieved from disk, - // but we check for posterity. - l.logger.WithFields( - xlog.NewField("closed", entry.closed), - xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), - ).Errorf("wired list tried to process a block that was not unwireable") - } - // In some cases the WiredList can receive blocks that are closed. This can happen if a block is // in the updatesCh (because it was read) but also already in the WiredList, and while its still // in the updatesCh, it is evicted from the wired list to make room for some other block that is // being processed. The eviction of the block will close it, but the enqueued update is still in - // the updateCh even though its an update for a closed block. + // the updateCh even though its an update for a closed block. For the same reason, the wired list + // can receive blocks that were not retrieved from disk because the closed block was returned to + // a pool and then re-used. unwireable := !entry.closed && entry.wasRetrievedFromDisk // If a block is still unwireable then its worth keeping track of in the wired list @@ -239,7 +254,6 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { at.setNext(v) v.setPrev(at) v.setNext(n) - v.setNextPrevUpdatedAtUnixNano(now.UnixNano()) n.setPrev(v) l.length++ @@ -252,24 +266,45 @@ func (l *WiredList) insertAfter(v, at DatabaseBlock) { // Try to unwire all blocks possible bl := l.root.next() for l.length > maxWired && bl != &l.root { + entry := bl.wiredListEntry() + if !entry.wasRetrievedFromDisk { + // This should never happen because processUpdateBlock performs the same + // check, and a block should never be pooled in-between those steps because + // the wired list is supposed to have sole ownership over that lifecycle and + // is single-threaded. + invariantLogger := instrument.EmitInvariantViolationAndGetLogger(l.iOpts) + invariantLogger.WithFields( + xlog.NewField("blockStart", entry.startTime), + xlog.NewField("closed", entry.closed), + xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), + ).Errorf("wired list tried to process a block that was not retrieved from disk") + } + // Evict the block before closing it so that callers of series.ReadEncoded() // don't get errors about trying to read from a closed block. if onEvict := bl.OnEvictedFromWiredList(); onEvict != nil { - wlEntry := bl.wiredListEntry() - onEvict.OnEvictedFromWiredList(wlEntry.retrieveID, wlEntry.startTime) + onEvict.OnEvictedFromWiredList(entry.retrieveID, entry.startTime) } - // bl.Close() will return the block to the pool. In order to avoid races - // with the pool itself, we capture the value of the next block and remove - // the block from the wired list before we close it. + // bl.CloseIfFromDisk() will return the block to the pool. In order to avoid + // races with the pool itself, we capture the value of the next block and + // remove the block from the wired list before we close it. nextBl := bl.next() l.remove(bl) - bl.Close() + if wasFromDisk := bl.CloseIfFromDisk(); !wasFromDisk { + // Should never happen + invariantLogger := instrument.EmitInvariantViolationAndGetLogger(l.iOpts) + invariantLogger.WithFields( + xlog.NewField("blockStart", entry.startTime), + xlog.NewField("closed", entry.closed), + xlog.NewField("wasRetrievedFromDisk", entry.wasRetrievedFromDisk), + ).Errorf("wired list tried to close a block that was not from disk") + } l.metrics.evicted.Inc(1) - lastUpdatedAt := time.Unix(0, bl.nextPrevUpdatedAtUnixNano()) - l.metrics.evictedAfterDuration.Record(now.Sub(lastUpdatedAt)) + enteredListAt := time.Unix(0, bl.enteredListAtUnixNano()) + l.metrics.evictedAfterDuration.Record(now.Sub(enteredListAt)) bl = nextBl } @@ -296,6 +331,7 @@ func (l *WiredList) pushBack(v DatabaseBlock) { l.metrics.inserted.Inc(1) l.insertAfter(v, l.root.prev()) + v.setEnteredListAtUnixNano(l.nowFn().UnixNano()) } func (l *WiredList) moveToBack(v DatabaseBlock) { diff --git a/src/dbnode/storage/block/wired_list_test.go b/src/dbnode/storage/block/wired_list_test.go index 22d46a39e0..baf65c0c30 100644 --- a/src/dbnode/storage/block/wired_list_test.go +++ b/src/dbnode/storage/block/wired_list_test.go @@ -54,7 +54,13 @@ func newTestWiredList( iopts = iopts.SetMetricsScope(overrideMetricsScope) } copts := clock.NewOptions() - return NewWiredList(runtimeOptsMgr, iopts, copts), runtimeOptsMgr + return NewWiredList(WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: iopts, + ClockOptions: copts, + // Use a small channel to stress-test the implementation + EventsChannelSize: 1, + }), runtimeOptsMgr } func newTestUnwireableBlock( @@ -92,10 +98,10 @@ func TestWiredListInsertsAndUpdatesWiredBlocks(t *testing.T) { blocks = append(blocks, bl) } - l.Update(blocks[0]) - l.Update(blocks[1]) - l.Update(blocks[2]) - l.Update(blocks[1]) + l.BlockingUpdate(blocks[0]) + l.BlockingUpdate(blocks[1]) + l.BlockingUpdate(blocks[2]) + l.BlockingUpdate(blocks[1]) l.Stop() @@ -127,9 +133,9 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks = append(blocks, bl) } - l.Update(blocks[0]) - l.Update(blocks[1]) - l.Update(blocks[0]) + l.BlockingUpdate(blocks[0]) + l.BlockingUpdate(blocks[1]) + l.BlockingUpdate(blocks[0]) l.Stop() @@ -141,7 +147,7 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks[1].closed = true l.Start() - l.Update(blocks[1]) + l.BlockingUpdate(blocks[1]) l.Stop() require.Equal(t, 1, l.length) @@ -152,7 +158,7 @@ func TestWiredListRemovesUnwiredBlocks(t *testing.T) { blocks[0].closed = true l.Start() - l.Update(blocks[0]) + l.BlockingUpdate(blocks[0]) l.Stop() require.Equal(t, 0, l.length) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index a16f209802..275f0bfbde 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3x/context" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" xlog "github.com/m3db/m3x/log" xtime "github.com/m3db/m3x/time" ) @@ -518,14 +519,36 @@ func (s *dbSeries) OnRetrieveBlock( startTime time.Time, segment ts.Segment, ) { + var ( + b block.DatabaseBlock + list *block.WiredList + ) s.Lock() - defer s.Unlock() + defer func() { + s.Unlock() + if b != nil && list != nil { + // 1) We need to update the WiredList so that blocks that were read from disk + // can enter the list (OnReadBlock is only called for blocks that + // were read from memory, regardless of whether the data originated + // from disk or a buffer rotation.) + // 2) We must perform this action outside of the lock to prevent deadlock + // with the WiredList itself when it tries to call OnEvictedFromWiredList + // on the same series that is trying to perform a blocking update. + // 3) Doing this outside of the lock is safe because updating the + // wired list is asynchronous already (Update just puts the block in + // a channel to be processed later.) + // 4) We have to perform a blocking update because in this flow, the block + // is not already in the wired list so we need to make sure that the WiredList + // takes control of its lifecycle. + list.BlockingUpdate(b) + } + }() if !id.Equal(s.id) { return } - b := s.opts.DatabaseBlockOptions().DatabaseBlockPool().Get() + b = s.opts.DatabaseBlockOptions().DatabaseBlockPool().Get() metadata := block.RetrievableBlockMetadata{ ID: s.id, Length: segment.Len(), @@ -549,13 +572,7 @@ func (s *dbSeries) OnRetrieveBlock( // If we retrieved this from disk then we directly emplace it s.addBlockWithLock(b) - if list := s.opts.DatabaseBlockOptions().WiredList(); list != nil { - // Need to update the WiredList so blocks that were read from disk - // can enter the list (OnReadBlock is only called for blocks that - // were read from memory, regardless of whether the data originated - // from disk or a buffer rotation.) - list.Update(b) - } + list = s.opts.DatabaseBlockOptions().WiredList() } // OnReadBlock is only called for blocks that were read from memory, regardless of @@ -565,9 +582,16 @@ func (s *dbSeries) OnReadBlock(b block.DatabaseBlock) { // The WiredList is only responsible for managing the lifecycle of blocks // retrieved from disk. if b.WasRetrievedFromDisk() { - // Need to update the WiredList so it knows which blocks have been + // 1) Need to update the WiredList so it knows which blocks have been // most recently read. - list.Update(b) + // 2) We do a non-blocking update here to prevent deadlock with the + // WiredList calling OnEvictedFromWiredList on the same series since + // OnReadBlock is usually called within the context of a read lock + // on this series. + // 3) Its safe to do a non-blocking update because the wired list has + // already been exposed to this block, so even if the wired list drops + // this update, it will still manage this blocks lifecycle. + list.NonBlockingUpdate(b) } } } @@ -585,7 +609,8 @@ func (s *dbSeries) OnEvictedFromWiredList(id ident.ID, blockStart time.Time) { if ok { if !block.WasRetrievedFromDisk() { // Should never happen - invalid application state could cause data loss - s.opts.InstrumentOptions().Logger().WithFields( + instrument.EmitInvariantViolationAndGetLogger( + s.opts.InstrumentOptions()).WithFields( xlog.NewField("id", id.String()), xlog.NewField("blockStart", blockStart), ).Errorf("tried to evict block that was not retrieved from disk") diff --git a/src/dbnode/storage/series_wired_list_interaction_test.go b/src/dbnode/storage/series_wired_list_interaction_test.go new file mode 100644 index 0000000000..46cc29cea2 --- /dev/null +++ b/src/dbnode/storage/series_wired_list_interaction_test.go @@ -0,0 +1,123 @@ +package storage + +import ( + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/runtime" + "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/series" + "github.com/m3db/m3/src/dbnode/storage/series/lookup" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3x/context" + "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/pool" + + "github.com/stretchr/testify/require" +) + +// TestSeriesWiredListConcurrentInteractions was added as a regression test +// after discovering that interactions between a single series and the wired +// list could trigger a mutual dead lock. Specifically, if the wired list event +// channel was full, then the series could get blocked on a call to list.Update() +// in the OnRetrieveBlockMethod while the only goroutine pulling items off of that +// channel was stuck on the same series OnEvictedFromWiredList method. In that case, +// the OnRetrieveBlockMethod was stuck on a channel send while holding a lock that was +// required for the OnEvictedFromWiredList method that the wired list worker routine +// was calling. +func TestSeriesWiredListConcurrentInteractions(t *testing.T) { + var ( + runtimeOptsMgr = runtime.NewOptionsManager() + runtimeOpts = runtime.NewOptions().SetMaxWiredBlocks(1) + ) + runtimeOptsMgr.Update(runtimeOpts) + + runtime.NewOptions().SetMaxWiredBlocks(1) + wl := block.NewWiredList(block.WiredListOptions{ + RuntimeOptionsManager: runtimeOptsMgr, + InstrumentOptions: instrument.NewOptions(), + ClockOptions: clock.NewOptions(), + // Use a small channel to stress-test the implementation + EventsChannelSize: 1, + }) + wl.Start() + defer wl.Stop() + + var ( + blOpts = testDatabaseOptions().DatabaseBlockOptions() + blPool = block.NewDatabaseBlockPool( + // Small pool size to make any pooling issues more + // likely to manifest. + pool.NewObjectPoolOptions().SetSize(5), + ) + ) + blPool.Init(func() block.DatabaseBlock { + return block.NewDatabaseBlock(time.Time{}, 0, ts.Segment{}, blOpts) + }) + + var ( + opts = testDatabaseOptions().SetDatabaseBlockOptions( + blOpts. + SetWiredList(wl). + SetDatabaseBlockPool(blPool), + ) + shard = testDatabaseShard(t, opts) + id = ident.StringID("foo") + series = series.NewDatabaseSeries(id, ident.Tags{}, shard.seriesOpts) + ) + + series.Reset(id, ident.Tags{}, nil, shard.seriesOnRetrieveBlock, shard, shard.seriesOpts) + series.Bootstrap(nil) + shard.Lock() + shard.insertNewShardEntryWithLock(lookup.NewEntry(series, 0)) + shard.Unlock() + + var ( + wg = sync.WaitGroup{} + doneCh = make(chan struct{}) + blockSize = 2 * time.Hour + ) + go func() { + // Try and trigger any pooling issues + for { + select { + case <-doneCh: + return + default: + bl := blPool.Get() + bl.ResetRetrievable(time.Time{}, blockSize, nil, block.RetrievableBlockMetadata{}) + bl.Close() + } + } + }() + + var ( + start = time.Now().Truncate(blockSize) + startLock = sync.Mutex{} + getAndIncStart = func() time.Time { + startLock.Lock() + t := start + start = start.Add(blockSize) + startLock.Unlock() + return t + } + ) + + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + blTime := getAndIncStart() + shard.OnRetrieveBlock(id, nil, blTime, ts.Segment{}) + // Simulate concurrent reads + _, err := shard.ReadEncoded(context.NewContext(), id, blTime, blTime.Add(blockSize)) + require.NoError(t, err) + wg.Done() + }() + } + + wg.Wait() + close(doneCh) +}