Skip to content

Commit

Permalink
Merge #4386
Browse files Browse the repository at this point in the history
4386: BlockRateController: Metrics and debug logging r=jordanschalm a=jordanschalm



Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
  • Loading branch information
bors[bot] and jordanschalm authored May 30, 2023
2 parents b419adf + a2ba83c commit 5ab3243
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 16 deletions.
77 changes: 62 additions & 15 deletions consensus/hotstuff/cruisectl/block_rate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.uber.org/atomic"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state/protocol"
Expand Down Expand Up @@ -65,9 +66,10 @@ func (epoch *epochInfo) fractionComplete(curView uint64) float64 {
type BlockRateController struct {
component.Component

config *Config
state protocol.State
log zerolog.Logger
config *Config
state protocol.State
log zerolog.Logger
metrics module.CruiseCtlMetrics

lastMeasurement measurement // the most recently taken measurement
epochInfo // scheduled transition view for current/next epoch
Expand All @@ -81,10 +83,11 @@ type BlockRateController struct {
}

// NewBlockRateController returns a new BlockRateController.
func NewBlockRateController(log zerolog.Logger, config *Config, state protocol.State, curView uint64) (*BlockRateController, error) {
func NewBlockRateController(log zerolog.Logger, metrics module.CruiseCtlMetrics, config *Config, state protocol.State, curView uint64) (*BlockRateController, error) {
ctl := &BlockRateController{
config: config,
log: log.With().Str("component", "cruise_ctl").Logger(),
log: log.With().Str("hotstuff", "cruise_ctl").Logger(),
metrics: metrics,
state: state,
viewChanges: make(chan uint64, 10),
epochSetups: make(chan *flow.Header, 5),
Expand Down Expand Up @@ -114,7 +117,17 @@ func (ctl *BlockRateController) initLastMeasurement(curView uint64, now time.Tim
integralErr: 0,
derivativeErr: 0,
}
ctl.proposalDuration.Store(ctl.config.DefaultProposalDuration.Nanoseconds())
initialProposalDuration := ctl.config.DefaultProposalDuration
ctl.proposalDuration.Store(initialProposalDuration.Nanoseconds())

ctl.log.Debug().
Uint64("view", curView).
Time("time", now).
Dur("proposal_duration", initialProposalDuration).
Msg("initialized last measurement")
ctl.metrics.PIDError(ctl.lastMeasurement.proportionalErr, ctl.lastMeasurement.integralErr, ctl.lastMeasurement.derivativeErr)
ctl.metrics.TargetProposalDuration(initialProposalDuration)
ctl.metrics.ControllerOutput(0)
}

// initEpochInfo initializes the epochInfo state upon component startup.
Expand Down Expand Up @@ -155,6 +168,13 @@ func (ctl *BlockRateController) initEpochInfo(curView uint64) error {
}
ctl.epochFallbackTriggered = epochFallbackTriggered

ctl.log.Debug().
Uint64("cur_epoch_first_view", curEpochFirstView).
Uint64("cur_epoch_final_view", curEpochFinalView).
Str("phase", phase.String()).
Bool("epoch_fallback", epochFallbackTriggered).
Msg("initialized epoch config")

return nil
}

Expand Down Expand Up @@ -275,6 +295,15 @@ func (ctl *BlockRateController) checkForEpochTransition(curView uint64, now time
ctl.curEpochFinalView = *ctl.nextEpochFinalView
ctl.nextEpochFinalView = nil
ctl.curEpochTargetEndTime = ctl.config.TargetTransition.inferTargetEndTime(now, ctl.epochInfo.fractionComplete(curView))

ctl.log.Info().
Uint64("cur_view", curView).
Time("now", now).
Uint64("cur_epoch_first_view", ctl.curEpochFirstView).
Uint64("cur_epoch_final_view", ctl.curEpochFinalView).
Time("cur_epoch_target_end_time", ctl.curEpochTargetEndTime).
Msg("processed epoch transition")

return nil
}

Expand Down Expand Up @@ -308,17 +337,35 @@ func (ctl *BlockRateController) measureViewDuration(view uint64, now time.Time)
ctl.lastMeasurement = curMeasurement

// compute the controller output for this measurement
proposalTime := ctl.config.DefaultProposalDuration - ctl.controllerOutput()
controllerOutput := ctl.controllerOutput()
unconstrainedProposalDuration := ctl.config.DefaultProposalDuration - controllerOutput
constrainedProposalDuration := unconstrainedProposalDuration
// constrain the proposal time according to configured boundaries
if proposalTime < ctl.config.MinProposalDuration {
ctl.proposalDuration.Store(ctl.config.MinProposalDuration.Nanoseconds())
return nil
}
if proposalTime > ctl.config.MaxProposalDuration {
ctl.proposalDuration.Store(ctl.config.MaxProposalDuration.Nanoseconds())
return nil
if unconstrainedProposalDuration < ctl.config.MinProposalDuration {
constrainedProposalDuration = ctl.config.MinProposalDuration
} else if unconstrainedProposalDuration > ctl.config.MaxProposalDuration {
constrainedProposalDuration = ctl.config.MaxProposalDuration
}
ctl.proposalDuration.Store(proposalTime.Nanoseconds())
ctl.proposalDuration.Store(constrainedProposalDuration.Nanoseconds())

ctl.log.Debug().
Uint64("last_view", lastMeasurement.view).
Uint64("cur_view", view).
Dur("since_last_view", curMeasurement.viewTime).
Dur("projected_time_remaining", estTimeRemaining).
Float64("inst_err", curMeasurement.instErr).
Float64("proportional_err", curMeasurement.proportionalErr).
Float64("integral_err", curMeasurement.integralErr).
Float64("derivative_err", curMeasurement.derivativeErr).
Dur("controller_output", controllerOutput).
Dur("unconstrained_proposal_duration", unconstrainedProposalDuration).
Dur("constrained_proposal_duration", constrainedProposalDuration).
Msg("measured error upon view change")

ctl.metrics.PIDError(ctl.lastMeasurement.proportionalErr, ctl.lastMeasurement.integralErr, ctl.lastMeasurement.derivativeErr)
ctl.metrics.TargetProposalDuration(constrainedProposalDuration)
ctl.metrics.ControllerOutput(controllerOutput)

return nil
}

Expand Down
44 changes: 43 additions & 1 deletion consensus/hotstuff/cruisectl/block_rate_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
mockmodule "github.com/onflow/flow-go/module/mock"
mockprotocol "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/utils/unittest"
"github.com/onflow/flow-go/utils/unittest/mocks"
Expand All @@ -28,6 +29,7 @@ type BlockRateControllerSuite struct {
curEpochFinalView uint64
epochFallbackTriggered bool

metrics *mockmodule.CruiseCtlMetrics
state *mockprotocol.State
params *mockprotocol.Params
snapshot *mockprotocol.Snapshot
Expand All @@ -53,6 +55,11 @@ func (bs *BlockRateControllerSuite) SetupTest() {
bs.curEpochFinalView = uint64(604_800) // 1 view/sec
bs.epochFallbackTriggered = false

bs.metrics = mockmodule.NewCruiseCtlMetrics(bs.T())
bs.metrics.On("PIDError", mock.Anything, mock.Anything, mock.Anything).Maybe()
bs.metrics.On("TargetProposalDuration", mock.Anything).Maybe()
bs.metrics.On("ControllerOutput", mock.Anything).Maybe()

bs.state = mockprotocol.NewState(bs.T())
bs.params = mockprotocol.NewParams(bs.T())
bs.snapshot = mockprotocol.NewSnapshot(bs.T())
Expand Down Expand Up @@ -80,7 +87,7 @@ func (bs *BlockRateControllerSuite) SetupTest() {
// CreateAndStartController creates and starts the BlockRateController.
// Should be called only once per test case.
func (bs *BlockRateControllerSuite) CreateAndStartController() {
ctl, err := NewBlockRateController(unittest.Logger(), bs.config, bs.state, bs.initialView)
ctl, err := NewBlockRateController(unittest.Logger(), bs.metrics, bs.config, bs.state, bs.initialView)
require.NoError(bs.T(), err)
bs.ctl = ctl
bs.ctl.Start(bs.ctx)
Expand Down Expand Up @@ -392,3 +399,38 @@ func (bs *BlockRateControllerSuite) TestProposalDelay_AheadOfSchedule() {
}
}
}

// TestMetrics tests that correct metrics are tracked when expected.
func (bs *BlockRateControllerSuite) TestMetrics() {
bs.metrics = mockmodule.NewCruiseCtlMetrics(bs.T())
// should set metrics upon initialization
bs.metrics.On("PIDError", float64(0), float64(0), float64(0)).Once()
bs.metrics.On("TargetProposalDuration", bs.config.DefaultProposalDuration).Once()
bs.metrics.On("ControllerOutput", time.Duration(0)).Once()
bs.CreateAndStartController()
defer bs.StopController()
bs.metrics.AssertExpectations(bs.T())

// we are at view 1 of the epoch, but the time is suddenly the target end time
enteredViewAt := bs.ctl.curEpochTargetEndTime
view := bs.initialView + 1
// we should observe a large error
bs.metrics.On("PIDError", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
p := args[0].(float64)
i := args[1].(float64)
d := args[2].(float64)
assert.Greater(bs.T(), p, float64(0))
assert.Greater(bs.T(), i, float64(0))
assert.Greater(bs.T(), d, float64(0))
}).Once()
// should immediately use min proposal duration
bs.metrics.On("TargetProposalDuration", bs.config.MinProposalDuration).Once()
// should have a large negative controller output
bs.metrics.On("ControllerOutput", mock.Anything).Run(func(args mock.Arguments) {
output := args[0].(time.Duration)
assert.Greater(bs.T(), output, time.Duration(0))
}).Once()

err := bs.ctl.measureViewDuration(view, enteredViewAt)
require.NoError(bs.T(), err)
}
15 changes: 15 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ type HotstuffMetrics interface {
PayloadProductionDuration(duration time.Duration)
}

type CruiseCtlMetrics interface {

// PIDError measures the current error values for the proportional, integration,
// and derivative terms of the PID controller.
PIDError(p, i, d float64)

// TargetProposalDuration measures the current value of the Block Rate Controller output:
// the target duration for a proposal, from entering the view to broadcasting.
TargetProposalDuration(duration time.Duration)

// ControllerOutput measures the output of the cruise control PID controller.
// Concretely, this is the quantity to subtract from the baseline proposal duration.
ControllerOutput(duration time.Duration)
}

type CollectionMetrics interface {
// TransactionIngested is called when a new transaction is ingested by the
// node. It increments the total count of ingested transactions and starts
Expand Down
67 changes: 67 additions & 0 deletions module/metrics/cruisectl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// CruiseCtlMetrics captures metrics about the Block Rate Controller, which adjusts
// the proposal duration to attain a target epoch switchover time.
type CruiseCtlMetrics struct {
proportionalErr prometheus.Gauge
integralErr prometheus.Gauge
derivativeErr prometheus.Gauge
targetProposalDur prometheus.Gauge
controllerOutput prometheus.Gauge
}

func NewCruiseCtlMetrics() *CruiseCtlMetrics {
return &CruiseCtlMetrics{
proportionalErr: promauto.NewGauge(prometheus.GaugeOpts{
Name: "proportional_err_s",
Namespace: namespaceConsensus,
Subsystem: subsystemCruiseCtl,
Help: "The current proportional error measured by the controller",
}),
integralErr: promauto.NewGauge(prometheus.GaugeOpts{
Name: "integral_err_s",
Namespace: namespaceConsensus,
Subsystem: subsystemCruiseCtl,
Help: "The current integral error measured by the controller",
}),
derivativeErr: promauto.NewGauge(prometheus.GaugeOpts{
Name: "derivative_err_per_s",
Namespace: namespaceConsensus,
Subsystem: subsystemCruiseCtl,
Help: "The current derivative error measured by the controller",
}),
targetProposalDur: promauto.NewGauge(prometheus.GaugeOpts{
Name: "target_proposal_dur_s",
Namespace: namespaceConsensus,
Subsystem: subsystemCruiseCtl,
Help: "The current target duration for a proposal",
}),
controllerOutput: promauto.NewGauge(prometheus.GaugeOpts{
Name: "controller_output_s",
Namespace: namespaceConsensus,
Subsystem: subsystemCruiseCtl,
Help: "The most recent output of the controller; the adjust to subtract from the baseline proposal duration",
}),
}
}

func (c *CruiseCtlMetrics) PIDError(p, i, d float64) {
c.proportionalErr.Set(p)
c.integralErr.Set(i)
c.derivativeErr.Set(d)
}

func (c *CruiseCtlMetrics) TargetProposalDuration(duration time.Duration) {
c.targetProposalDur.Set(duration.Seconds())
}

func (c *CruiseCtlMetrics) ControllerOutput(duration time.Duration) {
c.controllerOutput.Set(duration.Seconds())
}
1 change: 1 addition & 0 deletions module/metrics/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
const (
subsystemCompliance = "compliance"
subsystemHotstuff = "hotstuff"
subsystemCruiseCtl = "cruisectl"
subsystemMatchEngine = "match"
)

Expand Down
44 changes: 44 additions & 0 deletions module/mock/cruise_ctl_metrics.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5ab3243

Please sign in to comment.