Skip to content

Commit

Permalink
Fix LRU deadlock + regression test and improve comments / logging (#862)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul authored Aug 29, 2018
1 parent 65b3024 commit 399d904
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 107 deletions.
3 changes: 2 additions & 1 deletion src/cmd/services/m3dbnode/config/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ type SeriesCacheConfiguration struct {
// LRUSeriesCachePolicyConfiguration contains configuration for the LRU
// series caching policy.
type LRUSeriesCachePolicyConfiguration struct {
MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"`
MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"`
EventsChannelSize uint `yaml:"eventsChannelSize" validate:"nonzero"`
}
12 changes: 7 additions & 5 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,13 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup

// Set up wired list if required
if storageOpts.SeriesCachePolicy() == series.CacheLRU {
wiredList := block.NewWiredList(
runtimeOptsMgr,
storageOpts.InstrumentOptions(),
storageOpts.ClockOptions(),
)
wiredList := block.NewWiredList(block.WiredListOptions{
RuntimeOptionsManager: runtimeOptsMgr,
InstrumentOptions: storageOpts.InstrumentOptions(),
ClockOptions: storageOpts.ClockOptions(),
// Use a small event channel size to stress-test the implementation
EventsChannelSize: 1,
})
blockOpts := storageOpts.DatabaseBlockOptions().SetWiredList(wiredList)
blockPool := block.NewDatabaseBlockPool(nil)
// Have to manually set the blockpool because the default one uses a constructor
Expand Down
19 changes: 16 additions & 3 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Run(runOpts RunOptions) {
opts = opts.SetSeriesCachePolicy(seriesCachePolicy)

// Apply pooling options
opts = withEncodingAndPoolingOptions(logger, opts, cfg.PoolingPolicy)
opts = withEncodingAndPoolingOptions(cfg, logger, opts, cfg.PoolingPolicy)

// Setup the block retriever
switch seriesCachePolicy {
Expand Down Expand Up @@ -882,6 +882,7 @@ func kvWatchBootstrappers(
}

func withEncodingAndPoolingOptions(
cfg config.DBConfiguration,
logger xlog.Logger,
opts storage.Options,
policy config.PoolingPolicy,
Expand Down Expand Up @@ -1003,8 +1004,20 @@ func withEncodingAndPoolingOptions(
SetBytesPool(bytesPool)

if opts.SeriesCachePolicy() == series.CacheLRU {
runtimeOpts := opts.RuntimeOptionsManager()
wiredList := block.NewWiredList(runtimeOpts, iopts, opts.ClockOptions())
var (
runtimeOpts = opts.RuntimeOptionsManager()
wiredListOpts = block.WiredListOptions{
RuntimeOptionsManager: runtimeOpts,
InstrumentOptions: iopts,
ClockOptions: opts.ClockOptions(),
}
lruCfg = cfg.Cache.SeriesConfiguration().LRU
)

if lruCfg != nil && lruCfg.EventsChannelSize > 0 {
wiredListOpts.EventsChannelSize = int(lruCfg.EventsChannelSize)
}
wiredList := block.NewWiredList(wiredListOpts)
blockOpts = blockOpts.SetWiredList(wiredList)
}
blockPool := block.NewDatabaseBlockPool(poolOptions(policy.BlockPool,
Expand Down
42 changes: 30 additions & 12 deletions src/dbnode/storage/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ type dbBlock struct {
}

type listState struct {
next DatabaseBlock
prev DatabaseBlock
nextPrevUpdatedAtUnixNano int64
next DatabaseBlock
prev DatabaseBlock
enteredListAtUnixNano int64
}

// NewDatabaseBlock creates a new DatabaseBlock instance.
Expand Down Expand Up @@ -414,19 +414,37 @@ func (b *dbBlock) resetRetrievableWithLock(
}

func (b *dbBlock) Discard() ts.Segment {
return b.closeAndDiscard()
seg, _ := b.closeAndDiscardConditionally(nil)
return seg
}

func (b *dbBlock) Close() {
segment := b.closeAndDiscard()
segment, _ := b.closeAndDiscardConditionally(nil)
segment.Finalize()
}

func (b *dbBlock) closeAndDiscard() ts.Segment {
func (b *dbBlock) CloseIfFromDisk() bool {
segment, ok := b.closeAndDiscardConditionally(func(b *dbBlock) bool {
return b.wasRetrievedFromDisk
})
if !ok {
return false
}
segment.Finalize()
return true
}

func (b *dbBlock) closeAndDiscardConditionally(condition func(b *dbBlock) bool) (ts.Segment, bool) {
b.Lock()

if condition != nil && !condition(b) {
b.Unlock()
return ts.Segment{}, false
}

if b.closed {
b.Unlock()
return ts.Segment{}
return ts.Segment{}, true
}

segment := b.segment
Expand All @@ -439,7 +457,7 @@ func (b *dbBlock) closeAndDiscard() ts.Segment {
pool.Put(b)
}

return segment
return segment, true
}

func (b *dbBlock) resetMergeTargetWithLock() {
Expand Down Expand Up @@ -470,13 +488,13 @@ func (b *dbBlock) setPrev(value DatabaseBlock) {
}

// Should only be used by the WiredList.
func (b *dbBlock) nextPrevUpdatedAtUnixNano() int64 {
return b.listState.nextPrevUpdatedAtUnixNano
func (b *dbBlock) enteredListAtUnixNano() int64 {
return b.listState.enteredListAtUnixNano
}

// Should only be used by the WiredList.
func (b *dbBlock) setNextPrevUpdatedAtUnixNano(value int64) {
b.listState.nextPrevUpdatedAtUnixNano = value
func (b *dbBlock) setEnteredListAtUnixNano(value int64) {
b.listState.enteredListAtUnixNano = value
}

// wiredListEntry is a snapshot of a subset of the block's state that the WiredList
Expand Down
60 changes: 36 additions & 24 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions src/dbnode/storage/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,18 @@ func TestDatabaseBlockStreamMergePerformsCopy(t *testing.T) {
require.NoError(t, iter.Err())
}

func TestDatabaseBlockCloseIfFromDisk(t *testing.T) {
var (
blockOpts = NewOptions()
blockNotFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock)
blockFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock)
)
blockFromDisk.wasRetrievedFromDisk = true

require.False(t, blockNotFromDisk.CloseIfFromDisk())
require.True(t, blockFromDisk.CloseIfFromDisk())
}

func TestDatabaseSeriesBlocksAddBlock(t *testing.T) {
now := time.Now()
blockTimes := []time.Time{now, now.Add(time.Second), now.Add(time.Minute), now.Add(-time.Second), now.Add(-time.Hour)}
Expand Down
9 changes: 7 additions & 2 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ type DatabaseBlock interface {
// Close closes the block.
Close()

// CloseIfFromDisk atomically checks if the disk was retrieved from disk, and
// if so, closes it. It is meant as a layered protection for the WiredList
// which should only close blocks that were retrieved from disk.
CloseIfFromDisk() bool

// SetOnEvictedFromWiredList sets the owner of the block
SetOnEvictedFromWiredList(OnEvictedFromWiredList)

Expand All @@ -213,8 +218,8 @@ type databaseBlock interface {
setNext(block DatabaseBlock)
prev() DatabaseBlock
setPrev(block DatabaseBlock)
nextPrevUpdatedAtUnixNano() int64
setNextPrevUpdatedAtUnixNano(value int64)
enteredListAtUnixNano() int64
setEnteredListAtUnixNano(value int64)
wiredListEntry() wiredListEntry
}

Expand Down
Loading

0 comments on commit 399d904

Please sign in to comment.