diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a154e09493..59cba42206 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -70,23 +70,142 @@ const ( type bootstrapFn func() error -type bootstrapManager struct { - sync.RWMutex - - database database - mediator databaseMediator +type instrumentation struct { opts Options log *zap.Logger - bootstrapFn bootstrapFn nowFn clock.NowFn sleepFn sleepFn - processProvider bootstrap.ProcessProvider - state BootstrapState - hasPending bool status tally.Gauge bootstrapDuration tally.Timer + bootstrapNamespacesDuration tally.Timer durableStatus tally.Gauge lastBootstrapCompletionTime xtime.UnixNano + start time.Time + startNamespaces time.Time + logFields []zapcore.Field +} + +func (i *instrumentation) bootstrapFnFailed(retry int) { + i.log.Warn("retrying bootstrap after backoff", + zap.Duration("backoff", bootstrapRetryInterval), + zap.Int("numRetries", retry+1)) + i.sleepFn(bootstrapRetryInterval) +} + +func (i *instrumentation) bootstrapPreparing() { + i.start = i.nowFn() + i.log.Info("bootstrap prepare") +} + +func (i *instrumentation) bootstrapPrepareFailed(err error) { + i.log.Error("bootstrap prepare failed", zap.Error(err)) +} + +func (i *instrumentation) bootstrapStarted(shards int) { + i.logFields = []zapcore.Field{ + zap.Int("numShards", shards), + } + i.log.Info("bootstrap started", i.logFields...) +} + +func (i *instrumentation) bootstrapSucceeded() { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...) +} + +func (i *instrumentation) bootstrapFailed(err error) { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceId string) { + i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{ + zap.String("namespace", namespaceId), + zap.Error(err), + }...)...) +} + +func (i *instrumentation) bootstrapNamespacesFailed(err error) { + duration := i.nowFn().Sub(i.startNamespaces) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentation) bootstrapNamespacesStarted() { + i.startNamespaces = i.nowFn() + i.log.Info("bootstrap namespaces start", i.logFields...) +} + +func (i *instrumentation) bootstrapNamespacesSucceeded() { + duration := i.nowFn().Sub(i.startNamespaces) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces success", i.logFields...) +} + +func (i *instrumentation) bootstrapCompletion() { + i.lastBootstrapCompletionTime = xtime.ToUnixNano(i.nowFn()) +} + +func (i *instrumentation) setIsBootstrapped(isBootstrapped bool) { + if isBootstrapped { + i.status.Update(1) + } else { + i.status.Update(0) + } +} + +func (i *instrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) { + if isBootstrappedAndDurable { + i.durableStatus.Update(1) + } else { + i.durableStatus.Update(0) + } +} + +func (i *instrumentation) missingNamespaceFromResult(err error) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) + }) +} + +func (i *instrumentation) bootstrapDataAccumulatorCloseFailed(err error) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("could not close bootstrap data accumulator", + zap.Error(err)) + }) +} + +func newInstrumentation(opts Options) *instrumentation { + scope := opts.InstrumentOptions().MetricsScope() + return &instrumentation{ + opts: opts, + log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + status: scope.Gauge("bootstrapped"), + bootstrapDuration: scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), + durableStatus: scope.Gauge("bootstrapped-durable"), + } +} + +type bootstrapManager struct { + sync.RWMutex + + database database + mediator databaseMediator + bootstrapFn bootstrapFn + processProvider bootstrap.ProcessProvider + state BootstrapState + hasPending bool + instrumentation *instrumentation } func newBootstrapManager( @@ -94,18 +213,11 @@ func newBootstrapManager( mediator databaseMediator, opts Options, ) databaseBootstrapManager { - scope := opts.InstrumentOptions().MetricsScope() m := &bootstrapManager{ - database: database, - mediator: mediator, - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - processProvider: opts.BootstrapProcessProvider(), - status: scope.Gauge("bootstrapped"), - bootstrapDuration: scope.Timer("bootstrap-duration"), - durableStatus: scope.Gauge("bootstrapped-durable"), + database: database, + mediator: mediator, + processProvider: opts.BootstrapProcessProvider(), + instrumentation: newInstrumentation(opts), } m.bootstrapFn = m.bootstrap return m @@ -120,7 +232,7 @@ func (m *bootstrapManager) IsBootstrapped() bool { func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) { m.RLock() - bsTime := m.lastBootstrapCompletionTime + bsTime := m.instrumentation.lastBootstrapCompletionTime m.RUnlock() return bsTime, bsTime > 0 } @@ -176,10 +288,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(r): Last bootstrap failed, since this could be due to transient // failure we retry the bootstrap again. This is to avoid operators // needing to manually intervene for cases where failures are transient. - m.log.Warn("retrying bootstrap after backoff", - zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", i+1)) - m.sleepFn(bootstrapRetryInterval) + m.instrumentation.bootstrapFnFailed(i + 1) continue } @@ -196,23 +305,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // on its own course so that the load of ticking and flushing is more spread out // across the cluster. m.Lock() - m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) + m.instrumentation.bootstrapCompletion() m.Unlock() return result, nil } func (m *bootstrapManager) Report() { - if m.IsBootstrapped() { - m.status.Update(1) - } else { - m.status.Update(0) - } - - if m.database.IsBootstrappedAndDurable() { - m.durableStatus.Update(1) - } else { - m.durableStatus.Update(0) - } + m.instrumentation.setIsBootstrapped(m.IsBootstrapped()) + m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable()) } type bootstrapNamespace struct { @@ -243,18 +343,12 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("could not close bootstrap data accumulator", - zap.Error(err)) - }) + m.instrumentation.bootstrapDataAccumulatorCloseFailed(err) } } }() - start := m.nowFn() - m.log.Info("bootstrap prepare") - + m.instrumentation.bootstrapPreparing() var ( bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces)) prepareWg sync.WaitGroup @@ -288,7 +382,7 @@ func (m *bootstrapManager) bootstrap() error { prepareWg.Wait() if err := prepareMultiErr.FinalError(); err != nil { - m.log.Error("bootstrap prepare failed", zap.Error(err)) + m.instrumentation.bootstrapPrepareFailed(err) return err } @@ -329,28 +423,19 @@ func (m *bootstrapManager) bootstrap() error { }) } - logFields := []zapcore.Field{ - zap.Int("numShards", len(uniqueShards)), - } - m.log.Info("bootstrap started", logFields...) - + m.instrumentation.bootstrapStarted(len(uniqueShards)) // Run the bootstrap. - bootstrapResult, err := process.Run(ctx, start, targets) - - bootstrapDuration := m.nowFn().Sub(start) - m.bootstrapDuration.Record(bootstrapDuration) - logFields = append(logFields, - zap.Duration("bootstrapDuration", bootstrapDuration)) - + bootstrapResult, err := process.Run(ctx, m.instrumentation.start, targets) if err != nil { - m.log.Error("bootstrap failed", - append(logFields, zap.Error(err))...) + m.instrumentation.bootstrapFailed(err) return err } - m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...) + m.instrumentation.bootstrapSucceeded() // Use a multi-error here because we want to at least bootstrap // as many of the namespaces as possible. + + m.instrumentation.bootstrapNamespacesStarted() multiErr := xerrors.NewMultiError() for _, namespace := range namespaces { id := namespace.ID() @@ -358,29 +443,21 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - i := m.opts.InstrumentOptions() - instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) { - l.Error("bootstrap failed", - append(logFields, zap.Error(err))...) - }) + m.instrumentation.missingNamespaceFromResult(err) return err } if err := namespace.Bootstrap(ctx, result); err != nil { - m.log.Info("bootstrap error", append(logFields, []zapcore.Field{ - zap.String("namespace", id.String()), - zap.Error(err), - }...)...) + m.instrumentation.bootstrapNamespaceFailed(err, id.String()) multiErr = multiErr.Add(err) } } if err := multiErr.FinalError(); err != nil { - m.log.Info("bootstrap namespaces failed", - append(logFields, zap.Error(err))...) + m.instrumentation.bootstrapNamespacesFailed(err) return err } - m.log.Info("bootstrap success", logFields...) + m.instrumentation.bootstrapNamespacesSucceeded() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index a0bf66d87b..f5bc15c9b6 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -59,15 +59,12 @@ import ( ) type peersSource struct { - opts Options - log *zap.Logger + opts Options + // log *zap.Logger newPersistManager func() (persist.Manager, error) - nowFn clock.NowFn - metrics peersSourceMetrics -} - -type peersSourceMetrics struct { - persistedIndexBlocksOutOfRetention tally.Counter + // nowFn clock.NowFn + // metrics peersSourceMetrics + instrumentation *instrumentation } type persistenceFlush struct { @@ -77,24 +74,209 @@ type persistenceFlush struct { timeRange xtime.Range } +type instrumentation struct { + opts Options + log *zap.Logger + nowFn clock.NowFn + bootstrapDataDuration tally.Timer + bootstrapIndexDuration tally.Timer + bootstrapShardsDuration tally.Timer + persistedIndexBlocksOutOfRetention tally.Counter + start time.Time + startShards time.Time + logFields []zapcore.Field +} + +func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) { + i.start = i.nowFn() + i.log.Info("bootstrapping time series data start") + span.LogEvent("bootstrap_data_start") +} + +func (i *instrumentation) bootstrapDataCompleted(span opentracing.Span) { + duration := i.nowFn().Sub(i.start) + i.bootstrapDataDuration.Record(duration) + i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) + span.LogEvent("bootstrap_data_done") +} + +func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) { + i.start = i.nowFn() + i.log.Info("bootstrapping index metadata start") + span.LogEvent("bootstrap_index_start") +} + +func (i *instrumentation) bootstrapIndexCompleted(span opentracing.Span) { + duration := i.nowFn().Sub(i.start) + i.bootstrapIndexDuration.Record(duration) + i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) + span.LogEvent("bootstrap_index_done") +} + +func (i *instrumentation) bootstrapIndexSkipped(namespaceId ident.ID) { + i.log.Info("skipping bootstrap for namespace based on options", + zap.Stringer("namespace", namespaceId)) +} + +func (i *instrumentation) getDefaultAdminSessionFailed(err error) { + i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) +} + +func (i *instrumentation) bootstrapShardsStarted(count int, concurrency int, shouldPersist bool) { + i.startShards = i.nowFn() + i.log.Info("peers bootstrapper bootstrapping shards for ranges", + zap.Int("shards", count), + zap.Int("concurrency", concurrency), + zap.Bool("shouldPersist", shouldPersist)) +} + +func (i *instrumentation) bootstrapShardsCompleted() { + duration := i.nowFn().Sub(i.startShards) + i.bootstrapShardsDuration.Record(duration) + i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) +} + +func (i *instrumentation) persistenceFlushFailed(err error) { + i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", + zap.Error(err)) +} + +func (i *instrumentation) seriesCheckoutFailed(err error) { + i.log.Error("could not checkout series", zap.Error(err)) +} + +func (i *instrumentation) seriesLoadFailed(err error) { + i.log.Error("could not load series block", zap.Error(err)) +} + +func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) { + i.log.Info("peer bootstrapped shard", + zap.Uint32("shard", shard), + zap.Int64("numSeries", numSeries), + zap.Time("blockStart", blockTime), + ) +} + +func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) { + i.log.Error("error fetching bootstrap blocks", + zap.Uint32("shard", shard), + zap.Error(err), + ) +} + +func (i *instrumentation) peersBootstrapperIndexForRanges(count int) { + i.log.Info("peers bootstrapper bootstrapping index for ranges", + zap.Int("shards", count), + ) +} + +func (i *instrumentation) processingReadersFailed(err error, start time.Time) { + i.log.Error("error processing readers", zap.Error(err), + zap.Time("timeRange.start", start)) +} + +func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) { + i.log.Debug("building file set index segment", fields...) +} + +func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { + i.log.Debug("skipping out of retention index segment", fields...) + i.persistedIndexBlocksOutOfRetention.Inc(1) +} + +func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) { + i.log.Info("building in-memory index segment", fields...) +} + +func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) { + i.log.Info("encountered errors for range", + zap.String("requestedRanges", summaryString), + zap.Strings("timesWithErrors", errorsString)) +} + +func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) { + i.log.Debug( + "0 available peers, unable to peer bootstrap", + zap.Int("total", total), zap.Uint32("shard", shardIDUint)) +} + +func (i *instrumentation) readConsistencyNotAchieved( + bootstrapConsistencyLevel topology.ReadConsistencyLevel, + majorityReplicas int, + total int, + available int, +) { + i.log.Debug( + "read consistency not achieved, unable to peer bootstrap", + zap.Any("level", bootstrapConsistencyLevel), + zap.Int("replicas", majorityReplicas), + zap.Int("total", total), + zap.Int("available", available)) +} + +func (i *instrumentation) peersBootstrapperSourceReadStarted(ctx context.Context) ( + context.Context, opentracing.Span, bool) { + return ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) +} + +func (i *instrumentation) persistFsIndexBootstrapFailed(err error, + iopts instrument.Options, + id ident.ID, + ranges result.ShardTimeRanges) { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("persist fs index bootstrap failed", + zap.Stringer("namespace", id), + zap.Stringer("requestedRanges", ranges), + zap.Error(err)) + }) +} + +func (i *instrumentation) buildFsIndexBootstrapFailed(err error, + iopts instrument.Options, + id ident.ID, + ranges result.ShardTimeRanges) { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("build fs index bootstrap failed", + zap.Stringer("namespace", id), + zap.Stringer("requestedRanges", ranges), + zap.Error(err)) + }) +} + +func newInstrumentation(opts Options) *instrumentation { + instrumentOptions := opts.ResultOptions().InstrumentOptions() + scope := instrumentOptions.MetricsScope().SubScope("peers-bootstrapper") + instrumentOptions = instrumentOptions.SetMetricsScope(scope) + return &instrumentation{ + opts: opts, + log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + bootstrapDataDuration: scope.Timer("peer-bootstrap-data-duration"), + bootstrapIndexDuration: scope.Timer("peer-bootstrap-index-duration"), + bootstrapShardsDuration: scope.Timer("peer-bootstrap-shards-duration"), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + } +} + func newPeersSource(opts Options) (bootstrap.Source, error) { if err := opts.Validate(); err != nil { return nil, err } - iopts := opts.ResultOptions().InstrumentOptions() - scope := iopts.MetricsScope().SubScope("peers-bootstrapper") - iopts = iopts.SetMetricsScope(scope) + // iopts := opts.ResultOptions().InstrumentOptions() + // scope := iopts.MetricsScope().SubScope("peers-bootstrapper") + // iopts = iopts.SetMetricsScope(scope) return &peersSource{ opts: opts, - log: iopts.Logger().With(zap.String("bootstrapper", "peers")), + // log: iopts.Logger().With(zap.String("bootstrapper", "peers")), newPersistManager: func() (persist.Manager, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - metrics: peersSourceMetrics{ - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - }, + /* nowFn: opts.ResultOptions().ClockOptions().NowFn(), + metrics: peersSourceMetrics{ + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + },*/ + instrumentation: newInstrumentation(opts), }, nil } @@ -132,7 +314,7 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + ctx, span, _ := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) defer span.Finish() timeRangesEmpty := true @@ -157,10 +339,8 @@ func (s *peersSource) Read( } // NB(r): Perform all data bootstrapping first then index bootstrapping - // to more clearly deliniate which process is slower than the other. - start := s.nowFn() - s.log.Info("bootstrapping time series data start") - span.LogEvent("bootstrap_data_start") + // to more clearly delineate which process is slower than the other. + s.instrumentation.bootstrapDataStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -178,24 +358,17 @@ func (s *peersSource) Read( DataResult: r, }) } - s.log.Info("bootstrapping time series data success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_data_done") - + s.instrumentation.bootstrapDataCompleted(span) // NB(bodu): We need to evict the info file cache before reading index data since we've // maybe fetched blocks from peers so the cached info file state is now stale. cache.Evict() - start = s.nowFn() - s.log.Info("bootstrapping index metadata start") - span.LogEvent("bootstrap_index_start") + + s.instrumentation.bootstrapIndexStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata if !md.Options().IndexOptions().Enabled() { - s.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", md.ID())) - - // Not bootstrapping for index. + s.instrumentation.bootstrapIndexSkipped(md.ID()) continue } @@ -220,9 +393,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } - s.log.Info("bootstrapping index metadata success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_index_done") + s.instrumentation.bootstrapIndexCompleted(span) return results, nil } @@ -257,7 +428,7 @@ func (s *peersSource) readData( result := result.NewDataBootstrapResult() session, err := s.opts.AdminClient().DefaultAdminSession() if err != nil { - s.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) + s.instrumentation.getDefaultAdminSessionFailed(err) result.SetUnfulfilled(shardTimeRanges) return nil, err } @@ -277,10 +448,7 @@ func (s *peersSource) readData( concurrency = s.opts.ShardPersistenceConcurrency() } - s.log.Info("peers bootstrapper bootstrapping shards for ranges", - zap.Int("shards", count), - zap.Int("concurrency", concurrency), - zap.Bool("shouldPersist", shouldPersist)) + s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) if shouldPersist { // Spin up persist workers. for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { @@ -324,6 +492,7 @@ func (s *peersSource) readData( } } + s.instrumentation.bootstrapShardsCompleted() return result, nil } @@ -382,8 +551,7 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( } // Remove results and make unfulfilled if an error occurred. - s.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", - zap.Error(err)) + s.instrumentation.persistenceFlushFailed(err) // Make unfulfilled. lock.Lock() @@ -461,14 +629,14 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } unfulfill(currRange) - s.log.Error("could not checkout series", zap.Error(err)) + s.instrumentation.seriesCheckoutFailed(err) continue } for _, block := range entry.Blocks.AllBlocks() { if err := ref.Series.LoadBlock(block, series.WarmWrite); err != nil { unfulfill(currRange) - s.log.Error("could not load series block", zap.Error(err)) + s.instrumentation.seriesLoadFailed(err) } } @@ -485,27 +653,21 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( shardResult result.ShardResult, err error, ) { - if err == nil { - shardBlockSeriesCounter := map[xtime.UnixNano]int64{} - for _, entry := range shardResult.AllSeries().Iter() { - series := entry.Value() - for blockStart := range series.Blocks.AllBlocks() { - shardBlockSeriesCounter[blockStart]++ - } - } + if err != nil { + s.instrumentation.fetchBootstrapBlocksFailed(err, shard) + return + } - for block, numSeries := range shardBlockSeriesCounter { - s.log.Info("peer bootstrapped shard", - zap.Uint32("shard", shard), - zap.Int64("numSeries", numSeries), - zap.Time("blockStart", block.ToTime()), - ) + shardBlockSeriesCounter := map[xtime.UnixNano]int64{} + for _, entry := range shardResult.AllSeries().Iter() { + series := entry.Value() + for blockStart := range series.Blocks.AllBlocks() { + shardBlockSeriesCounter[blockStart]++ } - } else { - s.log.Error("error fetching bootstrap blocks", - zap.Uint32("shard", shard), - zap.Error(err), - ) + } + + for block, numSeries := range shardBlockSeriesCounter { + s.instrumentation.shardBootstrapped(shard, numSeries, block.ToTime()) } } @@ -717,9 +879,7 @@ func (s *peersSource) readIndex( indexSegmentConcurrency = s.opts.IndexSegmentConcurrency() readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) ) - s.log.Info("peers bootstrapper bootstrapping index for ranges", - zap.Int("shards", count), - ) + s.instrumentation.peersBootstrapperIndexForRanges(count) go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ NsMD: ns, @@ -733,9 +893,9 @@ func (s *peersSource) readIndex( // NB(bodu): We only read metadata when performing a peers bootstrap // so we do not need to sort the data fileset reader. OptimizedReadMetadataOnly: true, - Logger: s.log, + Logger: s.instrumentation.log, Span: span, - NowFn: s.nowFn, + NowFn: s.instrumentation.nowFn, Cache: cache, }) @@ -899,8 +1059,7 @@ func (s *peersSource) processReaders( xtime.NewRanges(timeRange), )) } else { - s.log.Error("error processing readers", zap.Error(err), - zap.Time("timeRange.start", start)) + s.instrumentation.processingReadersFailed(err, start) timesWithErrors = append(timesWithErrors, timeRange.Start) } } @@ -944,7 +1103,7 @@ func (s *peersSource) processReaders( zap.String("remainingRanges", remainingRanges.SummaryString()), } if shouldPersist { - s.log.Debug("building file set index segment", buildIndexLogFields...) + s.instrumentation.buildingFileSetIndexSegmentStarted(buildIndexLogFields) indexBlock, err = bootstrapper.PersistBootstrapIndexSegment( ns, requestedRanges, @@ -960,19 +1119,13 @@ func (s *peersSource) processReaders( // Bail early if the index segment is already out of retention. // This can happen when the edge of requested ranges at time of data bootstrap // is now out of retention. - s.log.Debug("skipping out of retention index segment", buildIndexLogFields...) - s.metrics.persistedIndexBlocksOutOfRetention.Inc(1) + s.instrumentation.outOfRetentionIndexSegmentSkipped(buildIndexLogFields) return remainingRanges, timesWithErrors } else if err != nil { - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("persist fs index bootstrap failed", - zap.Stringer("namespace", ns.ID()), - zap.Stringer("requestedRanges", requestedRanges), - zap.Error(err)) - }) + s.instrumentation.persistFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) } } else { - s.log.Info("building in-memory index segment", buildIndexLogFields...) + s.instrumentation.buildingInMemoryIndexSegmentStarted(buildIndexLogFields) indexBlock, err = bootstrapper.BuildBootstrapIndexSegment( ns, requestedRanges, @@ -984,13 +1137,7 @@ func (s *peersSource) processReaders( blockEnd, ) if err != nil { - iopts := s.opts.ResultOptions().InstrumentOptions() - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("build fs index bootstrap failed", - zap.Stringer("namespace", ns.ID()), - zap.Stringer("requestedRanges", requestedRanges), - zap.Error(err)) - }) + s.instrumentation.buildFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) } } @@ -1060,9 +1207,9 @@ func (s *peersSource) markRunResultErrorsAndUnfulfilled( for i := range timesWithErrors { timesWithErrorsString[i] = timesWithErrors[i].String() } - s.log.Info("encounted errors for range", - zap.String("requestedRanges", requestedRanges.SummaryString()), - zap.Strings("timesWithErrors", timesWithErrorsString)) + s.instrumentation.errorsForRangeEncountered( + remainingRanges.SummaryString(), + timesWithErrorsString) } if !remainingRanges.IsEmpty() { @@ -1140,20 +1287,17 @@ func (s *peersSource) peerAvailability( if available == 0 { // Can't peer bootstrap if there are no available peers. - s.log.Debug( - "0 available peers, unable to peer bootstrap", - zap.Int("total", total), zap.Uint32("shard", shardIDUint)) + s.instrumentation.noPeersAvailable(total, shardIDUint) continue } if !topology.ReadConsistencyAchieved( bootstrapConsistencyLevel, majorityReplicas, total, available) { - s.log.Debug( - "read consistency not achieved, unable to peer bootstrap", - zap.Any("level", bootstrapConsistencyLevel), - zap.Int("replicas", majorityReplicas), - zap.Int("total", total), - zap.Int("available", available)) + s.instrumentation.readConsistencyNotAchieved( + bootstrapConsistencyLevel, + majorityReplicas, + total, + available) continue } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 95337e996f..1faebed605 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -63,7 +63,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) // Don't sleep. - bsm.sleepFn = func(time.Duration) {} + bsm.instrumentation.sleepFn = func(time.Duration) {} gomock.InOrder( ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil),