Skip to content

Commit

Permalink
refactored instrumentation code for peers bootstrapper.
Browse files Browse the repository at this point in the history
added additional timers for peers bootstrapper to track individual steps.
  • Loading branch information
soundvibe committed Jan 4, 2021
1 parent 84ec433 commit a548480
Show file tree
Hide file tree
Showing 3 changed files with 394 additions and 173 deletions.
223 changes: 150 additions & 73 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,42 +70,154 @@ const (

type bootstrapFn func() error

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
type instrumentation struct {
opts Options
log *zap.Logger
bootstrapFn bootstrapFn
nowFn clock.NowFn
sleepFn sleepFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
bootstrapDuration tally.Timer
bootstrapNamespacesDuration tally.Timer
durableStatus tally.Gauge
lastBootstrapCompletionTime xtime.UnixNano
start time.Time
startNamespaces time.Time
logFields []zapcore.Field
}

func (i *instrumentation) bootstrapFnFailed(retry int) {
i.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", retry+1))
i.sleepFn(bootstrapRetryInterval)
}

func (i *instrumentation) bootstrapPreparing() {
i.start = i.nowFn()
i.log.Info("bootstrap prepare")
}

func (i *instrumentation) bootstrapPrepareFailed(err error) {
i.log.Error("bootstrap prepare failed", zap.Error(err))
}

func (i *instrumentation) bootstrapStarted(shards int) {
i.logFields = []zapcore.Field{
zap.Int("numShards", shards),
}
i.log.Info("bootstrap started", i.logFields...)
}

func (i *instrumentation) bootstrapSucceeded() {
bootstrapDuration := i.nowFn().Sub(i.start)
i.bootstrapDuration.Record(bootstrapDuration)
i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration))
i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...)
}

func (i *instrumentation) bootstrapFailed(err error) {
bootstrapDuration := i.nowFn().Sub(i.start)
i.bootstrapDuration.Record(bootstrapDuration)
i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration))
i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...)
}

func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceId string) {
i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{
zap.String("namespace", namespaceId),
zap.Error(err),
}...)...)
}

func (i *instrumentation) bootstrapNamespacesFailed(err error) {
duration := i.nowFn().Sub(i.startNamespaces)
i.bootstrapNamespacesDuration.Record(duration)
i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration))
i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...)
}

func (i *instrumentation) bootstrapNamespacesStarted() {
i.startNamespaces = i.nowFn()
i.log.Info("bootstrap namespaces start", i.logFields...)
}

func (i *instrumentation) bootstrapNamespacesSucceeded() {
duration := i.nowFn().Sub(i.startNamespaces)
i.bootstrapNamespacesDuration.Record(duration)
i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration))
i.log.Info("bootstrap namespaces success", i.logFields...)
}

func (i *instrumentation) bootstrapCompletion() {
i.lastBootstrapCompletionTime = xtime.ToUnixNano(i.nowFn())
}

func (i *instrumentation) setIsBootstrapped(isBootstrapped bool) {
if isBootstrapped {
i.status.Update(1)
} else {
i.status.Update(0)
}
}

func (i *instrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) {
if isBootstrappedAndDurable {
i.durableStatus.Update(1)
} else {
i.durableStatus.Update(0)
}
}

func (i *instrumentation) missingNamespaceFromResult(err error) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) {
l.Error("bootstrap failed", append(i.logFields, zap.Error(err))...)
})
}

func (i *instrumentation) bootstrapDataAccumulatorCloseFailed(err error) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
}

func newInstrumentation(opts Options) *instrumentation {
scope := opts.InstrumentOptions().MetricsScope()
return &instrumentation{
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
}
}

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
bootstrapFn bootstrapFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
instrumentation *instrumentation
}

func newBootstrapManager(
database database,
mediator databaseMediator,
opts Options,
) databaseBootstrapManager {
scope := opts.InstrumentOptions().MetricsScope()
m := &bootstrapManager{
database: database,
mediator: mediator,
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
processProvider: opts.BootstrapProcessProvider(),
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
database: database,
mediator: mediator,
processProvider: opts.BootstrapProcessProvider(),
instrumentation: newInstrumentation(opts),
}
m.bootstrapFn = m.bootstrap
return m
Expand All @@ -120,7 +232,7 @@ func (m *bootstrapManager) IsBootstrapped() bool {

func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) {
m.RLock()
bsTime := m.lastBootstrapCompletionTime
bsTime := m.instrumentation.lastBootstrapCompletionTime
m.RUnlock()
return bsTime, bsTime > 0
}
Expand Down Expand Up @@ -176,10 +288,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", i+1))
m.sleepFn(bootstrapRetryInterval)
m.instrumentation.bootstrapFnFailed(i + 1)
continue
}

Expand All @@ -196,23 +305,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// on its own course so that the load of ticking and flushing is more spread out
// across the cluster.
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.instrumentation.bootstrapCompletion()
m.Unlock()
return result, nil
}

func (m *bootstrapManager) Report() {
if m.IsBootstrapped() {
m.status.Update(1)
} else {
m.status.Update(0)
}

if m.database.IsBootstrappedAndDurable() {
m.durableStatus.Update(1)
} else {
m.durableStatus.Update(0)
}
m.instrumentation.setIsBootstrapped(m.IsBootstrapped())
m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable())
}

type bootstrapNamespace struct {
Expand Down Expand Up @@ -243,18 +343,12 @@ func (m *bootstrapManager) bootstrap() error {
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
m.instrumentation.bootstrapDataAccumulatorCloseFailed(err)
}
}
}()

start := m.nowFn()
m.log.Info("bootstrap prepare")

m.instrumentation.bootstrapPreparing()
var (
bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces))
prepareWg sync.WaitGroup
Expand Down Expand Up @@ -288,7 +382,7 @@ func (m *bootstrapManager) bootstrap() error {
prepareWg.Wait()

if err := prepareMultiErr.FinalError(); err != nil {
m.log.Error("bootstrap prepare failed", zap.Error(err))
m.instrumentation.bootstrapPrepareFailed(err)
return err
}

Expand Down Expand Up @@ -329,58 +423,41 @@ func (m *bootstrapManager) bootstrap() error {
})
}

logFields := []zapcore.Field{
zap.Int("numShards", len(uniqueShards)),
}
m.log.Info("bootstrap started", logFields...)

m.instrumentation.bootstrapStarted(len(uniqueShards))
// Run the bootstrap.
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
logFields = append(logFields,
zap.Duration("bootstrapDuration", bootstrapDuration))

bootstrapResult, err := process.Run(ctx, m.instrumentation.start, targets)
if err != nil {
m.log.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapFailed(err)
return err
}

m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...)
m.instrumentation.bootstrapSucceeded()
// Use a multi-error here because we want to at least bootstrap
// as many of the namespaces as possible.

m.instrumentation.bootstrapNamespacesStarted()
multiErr := xerrors.NewMultiError()
for _, namespace := range namespaces {
id := namespace.ID()
result, ok := bootstrapResult.Results.Get(id)
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
i := m.opts.InstrumentOptions()
instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) {
l.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
})
m.instrumentation.missingNamespaceFromResult(err)
return err
}

if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
}...)...)
m.instrumentation.bootstrapNamespaceFailed(err, id.String())
multiErr = multiErr.Add(err)
}
}

if err := multiErr.FinalError(); err != nil {
m.log.Info("bootstrap namespaces failed",
append(logFields, zap.Error(err))...)
m.instrumentation.bootstrapNamespacesFailed(err)
return err
}

m.log.Info("bootstrap success", logFields...)
m.instrumentation.bootstrapNamespacesSucceeded()
return nil
}
Loading

0 comments on commit a548480

Please sign in to comment.