Skip to content

Commit

Permalink
[m3db] Check bloom filter before stream request allocation (#3103)
Browse files Browse the repository at this point in the history
* [m3db] Check bloom filter before stream request allocation

* Add test assertions for bloom filer misses metric

* Remove redundant series-read metric
  • Loading branch information
wesleyk authored Jan 19, 2021
1 parent cdf5f11 commit 142d35b
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 69 deletions.
128 changes: 71 additions & 57 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ const (
type blockRetriever struct {
sync.RWMutex

opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
seriesReadCount tally.Counter
opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
seriesBloomFilterMisses tally.Counter

newSeekerMgrFn newSeekerMgrFn

Expand Down Expand Up @@ -126,18 +126,18 @@ func NewBlockRetriever(
scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever")

return &blockRetriever{
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
seriesReadCount: scope.Counter("series-read"),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
// We just close this channel when the fetchLoops should shutdown, so no
// buffering is required
fetchLoopsShouldShutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -560,6 +560,33 @@ func (r *blockRetriever) fetchBatch(
}
}

func (r *blockRetriever) seriesPresentInBloomFilter(
id ident.ID,
shard uint32,
startTime time.Time,
) (bool, error) {
// Capture variable and RLock() because this slice can be modified in the
// Open() method
r.RLock()
// This should never happen unless caller tries to use Stream() before Open()
if r.seekerMgr == nil {
r.RUnlock()
return false, errNoSeekerMgr
}
r.RUnlock()

idExists, err := r.seekerMgr.Test(id, shard, startTime)
if err != nil {
return false, err
}

if !idExists {
r.seriesBloomFilterMisses.Inc(1)
}

return idExists, nil
}

// streamRequest returns a bool indicating if the ID was found, and any errors.
func (r *blockRetriever) streamRequest(
ctx context.Context,
Expand All @@ -568,11 +595,10 @@ func (r *blockRetriever) streamRequest(
id ident.ID,
startTime time.Time,
nsCtx namespace.Context,
) (bool, error) {
) error {
req.resultWg.Add(1)
r.seriesReadCount.Inc(1)
if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil {
return false, err
return err
}
req.shard = shard

Expand All @@ -592,29 +618,9 @@ func (r *blockRetriever) streamRequest(
// Ensure to finalize at the end of request
ctx.RegisterFinalizer(req)

// Capture variable and RLock() because this slice can be modified in the
// Open() method
r.RLock()
// This should never happen unless caller tries to use Stream() before Open()
if r.seekerMgr == nil {
r.RUnlock()
return false, errNoSeekerMgr
}
r.RUnlock()

idExists, err := r.seekerMgr.Test(id, shard, startTime)
if err != nil {
return false, err
}

// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !idExists {
return false, nil
}
reqs, err := r.shardRequests(shard)
if err != nil {
return false, err
return err
}

reqs.Lock()
Expand All @@ -633,7 +639,7 @@ func (r *blockRetriever) streamRequest(
// the data. This means that even though we're returning nil for error
// here, the caller may still encounter an error when they attempt to
// read the data.
return true, nil
return nil
}

func (r *blockRetriever) Stream(
Expand All @@ -644,6 +650,16 @@ func (r *blockRetriever) Stream(
onRetrieve block.OnRetrieveBlock,
nsCtx namespace.Context,
) (xio.BlockReader, error) {
found, err := r.seriesPresentInBloomFilter(id, shard, startTime)
if err != nil {
return xio.EmptyBlockReader, err
}
// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !found {
return xio.EmptyBlockReader, nil
}

req := r.reqPool.Get()
req.onRetrieve = onRetrieve
req.streamReqType = streamDataReq
Expand All @@ -655,18 +671,12 @@ func (r *blockRetriever) Stream(
}
}

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
req.resultWg.Done()
return xio.EmptyBlockReader, err
}

if !found {
req.onRetrieved(ts.Segment{}, namespace.Context{})
req.success = true
req.onDone()
}

// The request may not have completed yet, but it has an internal
// waitgroup which the caller will have to wait for before retrieving
// the data. This means that even though we're returning nil for error
Expand All @@ -683,22 +693,26 @@ func (r *blockRetriever) StreamWideEntry(
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (block.StreamedWideEntry, error) {
found, err := r.seriesPresentInBloomFilter(id, shard, startTime)
if err != nil {
return block.EmptyStreamedWideEntry, err
}
// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !found {
return block.EmptyStreamedWideEntry, nil
}

req := r.reqPool.Get()
req.streamReqType = streamWideEntryReq
req.wideFilter = filter

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
req.resultWg.Done()
return block.EmptyStreamedWideEntry, err
}

if !found {
req.wideEntry = xio.WideEntry{}
req.success = true
req.onDone()
}

// The request may not have completed yet, but it has an internal
// waitgroup which the caller will have to wait for before retrieving
// the data. This means that even though we're returning nil for error
Expand Down
32 changes: 20 additions & 12 deletions src/dbnode/persist/fs/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type streamResult struct {
shard uint32
id string
blockStart time.Time
stream xio.SegmentReader
stream xio.BlockReader
}

// TestBlockRetrieverHighConcurrentSeeks tests the retriever with high
Expand Down Expand Up @@ -395,6 +395,14 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
}

for _, r := range results {
compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)]

// If the stream is empty, assert that the expected result is also nil
if r.stream.IsEmpty() {
require.Nil(t, compare.Head)
continue
}

seg, err := r.stream.Segment()
if err != nil {
fmt.Printf("\nstream seg err: %v\n", err)
Expand All @@ -404,7 +412,6 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
}

require.NoError(t, err)
compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)]
require.True(
t,
seg.Equal(&compare),
Expand Down Expand Up @@ -538,6 +545,8 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
// on the retriever in the case where the requested ID does not exist. In that
// case, Stream() should return an empty segment.
func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
scope := tally.NewTestScope("test", nil)

// Make sure reader/writer are looking at the same test directory
dir, err := ioutil.TempDir("", "testdb")
require.NoError(t, err)
Expand All @@ -555,7 +564,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
// Setup the reader
opts := testBlockRetrieverOptions{
retrieverOpts: defaultTestBlockRetrieverOptions,
fsOpts: fsOpts,
fsOpts: fsOpts.SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)),
shards: []uint32{shard},
}
retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts)
Expand All @@ -572,17 +581,18 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
assert.NoError(t, err)
closer()

// Make sure we return the correct error if the ID does not exist
ctx := context.NewContext()
defer ctx.Close()
segmentReader, err := retriever.Stream(ctx, shard,
ident.StringID("not-exists"), blockStart, nil, nsCtx)
assert.NoError(t, err)

segment, err := segmentReader.Segment()
assert.NoError(t, err)
assert.Equal(t, nil, segment.Head)
assert.Equal(t, nil, segment.Tail)
assert.True(t, segmentReader.IsEmpty())

// Check that the bloom filter miss metric was incremented
snapshot := scope.Snapshot()
seriesRead := snapshot.Counters()["test.retriever.series-bloom-filter-misses+"]
require.Equal(t, int64(1), seriesRead.Value())
}

// TestBlockRetrieverOnlyCreatesTagItersIfTagsExists verifies that the block retriever
Expand Down Expand Up @@ -823,14 +833,12 @@ func TestLimitSeriesReadFromDisk(t *testing.T) {
require.NoError(t, err)
req := &retrieveRequest{}
retriever := publicRetriever.(*blockRetriever)
_, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
_, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
_ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
require.Error(t, err)
require.Contains(t, err.Error(), "query aborted due to limit")

snapshot := scope.Snapshot()
seriesRead := snapshot.Counters()["test.retriever.series-read+"]
require.Equal(t, int64(2), seriesRead.Value())
seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"]
require.Equal(t, int64(1), seriesLimit.Value())
}
Expand Down

0 comments on commit 142d35b

Please sign in to comment.