Skip to content

Commit

Permalink
[dbnode] Do not unnecessarily sort index entries in bootstrap paths (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Aug 11, 2020
1 parent cdb8248 commit 22f0ea0
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 72 deletions.
21 changes: 18 additions & 3 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ var (

// errReadNotExpectedSize returned when the size of the next read does not match size specified by the index
errReadNotExpectedSize = errors.New("next read not expected size")

// errReadMetadataOptimizedForRead returned when we optimized for only reading metadata but are attempting a regular read
errReadMetadataOptimizedForRead = errors.New("read metadata optimized for regular read")
)

const (
Expand Down Expand Up @@ -99,6 +102,10 @@ type reader struct {
shard uint32
volume int
open bool
// NB(bodu): Informs whether or not we optimize for only reading
// metadata. We don't need to sort for reading metadata but sorting is
// required if we are performing regulars reads.
optimizedReadMetadataOnly bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -271,6 +278,7 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.open = true
r.namespace = namespace
r.shard = shard
r.optimizedReadMetadataOnly = opts.OptimizedReadMetadataOnly

return nil
}
Expand Down Expand Up @@ -337,13 +345,20 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}
r.indexEntriesByOffsetAsc = append(r.indexEntriesByOffsetAsc, entry)
}
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
// This is false by default so we always sort unless otherwise specified.
if !r.optimizedReadMetadataOnly {
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data. This is only required for regular reads.
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
}
return nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
// NB(bodu): We cannot perform regular reads if we're optimizing for only reading metadata.
if r.optimizedReadMetadataOnly {
return nil, nil, nil, 0, errReadMetadataOptimizedForRead
}
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ type DataFileSetReaderStatus struct {
type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
FileSetType persist.FileSetType
// NB(bodu): This option can inform the reader to optimize for reading
// only metadata by not sorting index entries. Setting this option will
// throw an error if a regular `Read()` is attempted.
OptimizedReadMetadataOnly bool
}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set
Expand Down
85 changes: 56 additions & 29 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/m3db/m3/src/dbnode/clock"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
Expand All @@ -48,6 +49,8 @@ import (
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
opentracinglog "github.com/opentracing/opentracing-go/log"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -69,6 +72,7 @@ type fileSystemSource struct {
opts Options
fsopts fs.Options
log *zap.Logger
nowFn clock.NowFn
idPool ident.Pool
newReaderFn newDataFileSetReaderFn
newReaderPoolOpts bootstrapper.NewReaderPoolOptions
Expand Down Expand Up @@ -96,6 +100,7 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {
opts: opts,
fsopts: opts.FilesystemOptions(),
log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")),
nowFn: opts.ResultOptions().ClockOptions().NowFn(),
idPool: opts.IdentifierPool(),
newReaderFn: fs.NewReader,
persistManager: &bootstrapper.SharedPersistManager{
Expand All @@ -116,18 +121,18 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) {

func (s *fileSystemSource) AvailableData(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
return s.availability(md, shardTimeRanges)
}

func (s *fileSystemSource) AvailableIndex(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
) (result.ShardTimeRanges, error) {
return s.availability(md, shardsTimeRanges)
return s.availability(md, shardTimeRanges)
}

func (s *fileSystemSource) Read(
Expand All @@ -150,8 +155,7 @@ func (s *fileSystemSource) Read(

// NB(r): Perform all data bootstrapping first then index bootstrapping
// to more clearly deliniate which process is slower than the other.
nowFn := s.opts.ResultOptions().ClockOptions().NowFn()
start := nowFn()
start := s.nowFn()
dataLogFields := []zapcore.Field{
zap.Stringer("cachePolicy", s.opts.ResultOptions().SeriesCachePolicy()),
}
Expand All @@ -164,7 +168,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapDataRunType, md, namespace.DataAccumulator,
namespace.DataRunOptions.ShardTimeRanges,
namespace.DataRunOptions.RunOptions, builder)
namespace.DataRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -176,10 +180,10 @@ func (s *fileSystemSource) Read(
})
}
s.log.Info("bootstrapping time series data success",
append(dataLogFields, zap.Duration("took", nowFn().Sub(start)))...)
append(dataLogFields, zap.Duration("took", s.nowFn().Sub(start)))...)
span.LogEvent("bootstrap_data_done")

start = nowFn()
start = s.nowFn()
s.log.Info("bootstrapping index metadata start")
span.LogEvent("bootstrap_index_start")
for _, elem := range namespaces.Namespaces.Iter() {
Expand All @@ -194,7 +198,7 @@ func (s *fileSystemSource) Read(

r, err := s.read(bootstrapIndexRunType, md, namespace.DataAccumulator,
namespace.IndexRunOptions.ShardTimeRanges,
namespace.IndexRunOptions.RunOptions, builder)
namespace.IndexRunOptions.RunOptions, builder, span)
if err != nil {
return bootstrap.NamespaceResults{}, err
}
Expand All @@ -210,18 +214,18 @@ func (s *fileSystemSource) Read(
results.Results.Set(md.ID(), result)
}
s.log.Info("bootstrapping index metadata success",
zap.Duration("took", nowFn().Sub(start)))
zap.Duration("took", s.nowFn().Sub(start)))
span.LogEvent("bootstrap_index_done")

return results, nil
}

func (s *fileSystemSource) availability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) (result.ShardTimeRanges, error) {
result := result.NewShardTimeRangesFromSize(shardsTimeRanges.Len())
for shard, ranges := range shardsTimeRanges.Iter() {
result := result.NewShardTimeRangesFromSize(shardTimeRanges.Len())
for shard, ranges := range shardTimeRanges.Iter() {
result.Set(shard, s.shardAvailability(md.ID(), shard, ranges))
}
return result, nil
Expand Down Expand Up @@ -459,9 +463,8 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
var (
indexBlockSize = ns.Options().IndexOptions().BlockSize()
retentionPeriod = ns.Options().RetentionOptions().RetentionPeriod()
nowFn = s.opts.ResultOptions().ClockOptions().NowFn()
beginningOfIndexRetention = retention.FlushTimeStartForRetentionPeriod(
retentionPeriod, indexBlockSize, nowFn())
retentionPeriod, indexBlockSize, s.nowFn())
initialIndexRange = xtime.Range{
Start: beginningOfIndexRetention,
End: beginningOfIndexRetention.Add(indexBlockSize),
Expand Down Expand Up @@ -674,15 +677,16 @@ func (s *fileSystemSource) read(
run runType,
md namespace.Metadata,
accumulator bootstrap.NamespaceDataAccumulator,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
runOpts bootstrap.RunOptions,
builder *result.IndexBuilder,
span opentracing.Span,
) (*runResult, error) {
var (
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy()
res *runResult
)
if shardsTimeRanges.IsEmpty() {
if shardTimeRanges.IsEmpty() {
return newRunResult(), nil
}

Expand All @@ -701,25 +705,34 @@ func (s *fileSystemSource) read(
if seriesCachePolicy != series.CacheAll {
// Unless we're caching all series (or all series metadata) in memory, we
// return just the availability of the files we have.
return s.bootstrapDataRunResultFromAvailability(md, shardsTimeRanges), nil
return s.bootstrapDataRunResultFromAvailability(md, shardTimeRanges), nil
}
}

logSpan := func(event string) {
span.LogFields(
opentracinglog.String("event", event),
opentracinglog.String("nsID", md.ID().String()),
opentracinglog.String("shardTimeRanges", shardTimeRanges.SummaryString()),
)
}
if run == bootstrapIndexRunType {
logSpan("bootstrap_from_index_persisted_blocks_start")
// NB(r): First read all the FSTs and add to runResult index results,
// subtract the shard + time ranges from what we intend to bootstrap
// for those we found.
r, err := s.bootstrapFromIndexPersistedBlocks(md,
shardsTimeRanges)
shardTimeRanges)
if err != nil {
s.log.Warn("filesystem bootstrapped failed to read persisted index blocks")
} else {
// We may have less we need to read
shardsTimeRanges = shardsTimeRanges.Copy()
shardsTimeRanges.Subtract(r.fulfilled)
shardTimeRanges = shardTimeRanges.Copy()
shardTimeRanges.Subtract(r.fulfilled)
// Set or merge result.
setOrMergeResult(r.result)
}
logSpan("bootstrap_from_index_persisted_blocks_done")
}

// Create a reader pool once per bootstrap as we don't really want to
Expand All @@ -737,8 +750,22 @@ func (s *fileSystemSource) read(
panic(fmt.Errorf("unrecognized run type: %d", run))
}
runtimeOpts := s.opts.RuntimeOptionsManager().Get()
go bootstrapper.EnqueueReaders(md, runOpts, runtimeOpts, s.fsopts, shardsTimeRanges,
readerPool, readersCh, blockSize, s.log)
go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{
NsMD: md,
RunOpts: runOpts,
RuntimeOpts: runtimeOpts,
FsOpts: s.fsopts,
ShardTimeRanges: shardTimeRanges,
ReaderPool: readerPool,
ReadersCh: readersCh,
BlockSize: blockSize,
// NB(bodu): We only read metadata when bootstrap index
// so we do not need to sort the data fileset reader.
OptimizedReadMetadataOnly: run == bootstrapIndexRunType,
Logger: s.log,
Span: span,
NowFn: s.nowFn,
})
bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md,
accumulator, runOpts, readerPool, readersCh, builder)

Expand All @@ -755,11 +782,11 @@ func (s *fileSystemSource) newReader() (fs.DataFileSetReader, error) {

func (s *fileSystemSource) bootstrapDataRunResultFromAvailability(
md namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) *runResult {
runResult := newRunResult()
unfulfilled := runResult.data.Unfulfilled()
for shard, ranges := range shardsTimeRanges.Iter() {
for shard, ranges := range shardTimeRanges.Iter() {
if ranges.IsEmpty() {
continue
}
Expand All @@ -784,7 +811,7 @@ type bootstrapFromIndexPersistedBlocksResult struct {

func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
ns namespace.Metadata,
shardsTimeRanges result.ShardTimeRanges,
shardTimeRanges result.ShardTimeRanges,
) (bootstrapFromIndexPersistedBlocksResult, error) {
res := bootstrapFromIndexPersistedBlocksResult{
fulfilled: result.NewShardTimeRanges(),
Expand All @@ -799,7 +826,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
s.log.Error("unable to read index info file",
zap.Stringer("namespace", ns.ID()),
zap.Error(err),
zap.Stringer("shardsTimeRanges", shardsTimeRanges),
zap.Stringer("shardTimeRanges", shardTimeRanges),
zap.String("filepath", infoFile.Err.Filepath()),
)
continue
Expand All @@ -813,7 +840,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
}
willFulfill := result.NewShardTimeRanges()
for _, shard := range info.Shards {
tr, ok := shardsTimeRanges.Get(shard)
tr, ok := shardTimeRanges.Get(shard)
if !ok {
// No ranges match for this shard.
continue
Expand Down
Loading

0 comments on commit 22f0ea0

Please sign in to comment.