Skip to content

Commit

Permalink
Merge branch 'master' into update-bn2-automation
Browse files Browse the repository at this point in the history
  • Loading branch information
haroldsphinx authored Apr 3, 2023
2 parents 0c146e8 + ae79b52 commit 1d47246
Show file tree
Hide file tree
Showing 99 changed files with 3,944 additions and 2,666 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ generate-mocks: install-mock-generators
mockery --name '.*' --dir=engine/execution/computation/computer --case=underscore --output="./engine/execution/computation/computer/mock" --outpkg="mock"
mockery --name '.*' --dir=engine/execution/state --case=underscore --output="./engine/execution/state/mock" --outpkg="mock"
mockery --name '.*' --dir=engine/collection --case=underscore --output="./engine/collection/mock" --outpkg="mock"
mockery --name 'complianceCore' --dir=engine/common/follower --exported --case=underscore --output="./engine/common/follower/mock" --outpkg="mock"
mockery --name '.*' --dir=engine/common/follower/cache --case=underscore --output="./engine/common/follower/cache/mock" --outpkg="mock"
mockery --name '.*' --dir=engine/consensus --case=underscore --output="./engine/consensus/mock" --outpkg="mock"
mockery --name '.*' --dir=engine/consensus/approvals --case=underscore --output="./engine/consensus/approvals/mock" --outpkg="mock"
Expand Down
45 changes: 27 additions & 18 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
consensuspubsub "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
hotstuffvalidator "github.com/onflow/flow-go/consensus/hotstuff/validator"
Expand All @@ -39,17 +40,15 @@ import (
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/common/follower"
followereng "github.com/onflow/flow-go/engine/common/follower"
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/blobs"
"github.com/onflow/flow-go/module/buffer"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/compliance"
modulecompliance "github.com/onflow/flow-go/module/compliance"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/id"
Expand Down Expand Up @@ -223,7 +222,7 @@ type FlowAccessNodeBuilder struct {
// engines
IngestEng *ingestion.Engine
RequestEng *requester.Engine
FollowerEng *followereng.Engine
FollowerEng *followereng.ComplianceEngine
SyncEng *synceng.Engine
StateStreamEng *state_stream.Engine
}
Expand Down Expand Up @@ -319,31 +318,39 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder

func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuilder {
builder.Component("follower engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize cleaner for DB
cleaner := bstorage.NewCleaner(node.Logger, node.DB, builder.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
conCache := buffer.NewPendingBlocks()
var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
if node.HeroCacheMetricsEnable {
heroCacheCollector = metrics.FollowerCacheMetrics(node.MetricsRegisterer)
}

followerEng, err := follower.New(
core, err := followereng.NewComplianceCore(
node.Logger,
node.Network,
node.Me,
node.Metrics.Engine,
node.Metrics.Mempool,
cleaner,
node.Storage.Headers,
node.Storage.Payloads,
heroCacheCollector,
builder.FinalizationDistributor,
builder.FollowerState,
conCache,
builder.FollowerCore,
builder.Validator,
builder.SyncCore,
node.Tracer,
follower.WithComplianceOptions(compliance.WithSkipNewProposalsThreshold(builder.ComplianceConfig.SkipNewProposalsThreshold)),
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
)
if err != nil {
return nil, fmt.Errorf("could not create follower core: %w", err)
}

builder.FollowerEng, err = followereng.NewComplianceLayer(
node.Logger,
node.Network,
node.Me,
node.Metrics.Engine,
node.Storage.Headers,
builder.Finalized,
core,
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
}
builder.FollowerEng = followerEng

return builder.FollowerEng, nil
})
Expand Down Expand Up @@ -560,10 +567,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
}

func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
dist := consensuspubsub.NewFinalizationDistributor()
dist.AddConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
return &FlowAccessNodeBuilder{
AccessNodeConfig: DefaultAccessNodeConfig(),
FlowNodeBuilder: nodeBuilder,
FinalizationDistributor: consensuspubsub.NewFinalizationDistributor(),
FinalizationDistributor: dist,
}
}

Expand Down
1 change: 0 additions & 1 deletion cmd/bootstrap/run/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func GenerateServiceAccountPrivateKey(seed []byte) (flow.AccountPrivateKey, erro
}, nil
}

// NOTE: this is now unused and should become part of another tool.
func GenerateExecutionState(
dbDir string,
accountKey flow.AccountPublicKey,
Expand Down
64 changes: 36 additions & 28 deletions cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout"
hotsignature "github.com/onflow/flow-go/consensus/hotstuff/signature"
Expand All @@ -37,7 +38,6 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/buffer"
builder "github.com/onflow/flow-go/module/builder/collection"
"github.com/onflow/flow-go/module/chainsync"
"github.com/onflow/flow-go/module/epochs"
Expand All @@ -50,7 +50,6 @@ import (
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/state/protocol/events/gadgets"
storagekv "github.com/onflow/flow-go/storage/badger"
)

func main() {
Expand Down Expand Up @@ -80,15 +79,14 @@ func main() {
clusterComplianceConfig modulecompliance.Config

pools *epochpool.TransactionPools // epoch-scoped transaction pools
followerBuffer *buffer.PendingBlocks // pending block cache for follower
finalizationDistributor *pubsub.FinalizationDistributor
finalizedHeader *consync.FinalizedHeaderCache

push *pusher.Engine
ing *ingest.Engine
mainChainSyncCore *chainsync.Core
followerCore *hotstuff.FollowerLoop // follower hotstuff logic
followerEng *followereng.Engine
followerEng *followereng.ComplianceEngine
colMetrics module.CollectionMetrics
err error

Expand Down Expand Up @@ -173,6 +171,11 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
Module("finalization distributor", func(node *cmd.NodeConfig) error {
finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
// For now, we only support state implementations from package badger.
// If we ever support different implementations, the following can be replaced by a type-aware factory
Expand All @@ -199,10 +202,6 @@ func main() {
err := node.Metrics.Mempool.Register(metrics.ResourceTransaction, pools.CombinedSize)
return err
}).
Module("pending block cache", func(node *cmd.NodeConfig) error {
followerBuffer = buffer.NewPendingBlocks()
return nil
}).
Module("metrics", func(node *cmd.NodeConfig) error {
colMetrics = metrics.NewCollectionCollector(node.Tracer)
return nil
Expand Down Expand Up @@ -251,6 +250,14 @@ func main() {

return validator, err
}).
Component("finalized snapshot", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
finalizedHeader, err = consync.NewFinalizedHeaderCache(node.Logger, node.State, finalizationDistributor)
if err != nil {
return nil, fmt.Errorf("could not create finalized snapshot cache: %w", err)
}

return finalizedHeader, nil
}).
Component("consensus committee", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize consensus committee's membership state
// This committee state is for the HotStuff follower, which follows the MAIN CONSENSUS Committee
Expand All @@ -270,7 +277,6 @@ func main() {
packer := hotsignature.NewConsensusSigDataPacker(mainConsensusCommittee)
// initialize the verifier for the protocol consensus
verifier := verification.NewCombinedVerifier(mainConsensusCommittee, packer)
finalizationDistributor = pubsub.NewFinalizationDistributor()
// creates a consensus follower with noop consumer as the notifier
followerCore, err = consensus.NewFollower(
node.Logger,
Expand All @@ -290,45 +296,47 @@ func main() {
return followerCore, nil
}).
Component("follower engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize cleaner for DB
cleaner := storagekv.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)

packer := hotsignature.NewConsensusSigDataPacker(mainConsensusCommittee)
// initialize the verifier for the protocol consensus
verifier := verification.NewCombinedVerifier(mainConsensusCommittee, packer)

validator := validator.New(mainConsensusCommittee, verifier)

followerEng, err = followereng.New(
var heroCacheCollector module.HeroCacheMetrics = metrics.NewNoopCollector()
if node.HeroCacheMetricsEnable {
heroCacheCollector = metrics.FollowerCacheMetrics(node.MetricsRegisterer)
}

core, err := followereng.NewComplianceCore(
node.Logger,
node.Network,
node.Me,
node.Metrics.Engine,
node.Metrics.Mempool,
cleaner,
node.Storage.Headers,
node.Storage.Payloads,
heroCacheCollector,
finalizationDistributor,
followerState,
followerBuffer,
followerCore,
validator,
mainChainSyncCore,
node.Tracer,
followereng.WithComplianceOptions(modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold)),
modulecompliance.WithSkipNewProposalsThreshold(node.ComplianceConfig.SkipNewProposalsThreshold),
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
return nil, fmt.Errorf("could not create follower core: %w", err)
}

return followerEng, nil
}).
Component("finalized snapshot", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
finalizedHeader, err = consync.NewFinalizedHeaderCache(node.Logger, node.State, finalizationDistributor)
followerEng, err = followereng.NewComplianceLayer(
node.Logger,
node.Network,
node.Me,
node.Metrics.Engine,
node.Storage.Headers,
finalizedHeader.Get(),
core,
)
if err != nil {
return nil, fmt.Errorf("could not create finalized snapshot cache: %w", err)
return nil, fmt.Errorf("could not create follower engine: %w", err)
}

return finalizedHeader, nil
return followerEng, nil
}).
Component("main chain sync engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {

Expand Down
9 changes: 4 additions & 5 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/blockproducer"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
Expand Down Expand Up @@ -355,6 +356,7 @@ func main() {
}).
Module("finalization distributor", func(node *cmd.NodeConfig) error {
finalizationDistributor = pubsub.NewFinalizationDistributor()
finalizationDistributor.AddConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
return nil
}).
Module("machine account config", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -683,20 +685,17 @@ func main() {
return hot, nil
}).
Component("consensus compliance engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize the entity database accessors
cleaner := bstorage.NewCleaner(node.Logger, node.DB, node.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)

// initialize the pending blocks cache
proposals := buffer.NewPendingBlocks()

logger := createLogger(node.Logger, node.RootChainID)
complianceCore, err := compliance.NewCore(logger,
complianceCore, err := compliance.NewCore(
logger,
node.Metrics.Engine,
node.Metrics.Mempool,
mainMetrics,
node.Metrics.Compliance,
node.Tracer,
cleaner,
node.Storage.Headers,
node.Storage.Payloads,
mutableState,
Expand Down
5 changes: 0 additions & 5 deletions cmd/execution/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ required to trigger this condition is put in place to prevent triggering it in c
happen on unstable networks.
EN keeps track of the highest block it has executed. This is not a Flow protocol feature, and only serves synchronisation needs.

### Execution State syncing
Other execution node is queried for range of missing blocks and hold authority to decide if it's willing (and able) to answer this query.
If so, it sends the `ExecutionStateDelta` which contains all the block data and results of execution.
Currently, this is fully trusted operation, meaning data is applied as-is without any extra checks.

### Missing blocks
If no other EN are available, the block-level synchronisation is started. This requests blocks from consensus nodes, and
incoming blocks are processed as if they were received during normal mode of operation
Expand Down
Loading

0 comments on commit 1d47246

Please sign in to comment.