diff --git a/engine/consensus/approvals/aggregated_signatures.go b/engine/consensus/approvals/aggregated_signatures.go index 7939c4bb35b..d914dcbc405 100644 --- a/engine/consensus/approvals/aggregated_signatures.go +++ b/engine/consensus/approvals/aggregated_signatures.go @@ -24,13 +24,13 @@ func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures { // PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`. // The returned int is the resulting number of approved chunks. -func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) int { +func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) uint64 { as.lock.Lock() defer as.lock.Unlock() if _, found := as.signatures[chunkIndex]; !found { as.signatures[chunkIndex] = aggregatedSignature } - return len(as.signatures) + return uint64(len(as.signatures)) } // HasSignature returns boolean depending if we have signature for particular chunk @@ -54,8 +54,8 @@ func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature { return aggregatedSigs } -// CollectChunksWithMissingApprovals returns indexes of chunks that don't have an aggregated signature -func (as *AggregatedSignatures) CollectChunksWithMissingApprovals() []uint64 { +// ChunksWithoutAggregatedSignature returns indexes of chunks that don't have an aggregated signature +func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64 { // provide enough capacity to avoid allocations while we hold the lock missingChunks := make([]uint64, 0, as.numberOfChunks) as.lock.RLock() diff --git a/engine/consensus/approvals/approval_collector.go b/engine/consensus/approvals/approval_collector.go index 66f27c134d2..adae6a1edd0 100644 --- a/engine/consensus/approvals/approval_collector.go +++ b/engine/consensus/approvals/approval_collector.go @@ -18,7 +18,7 @@ type ApprovalCollector struct { chunkCollectors []*ChunkApprovalCollector // slice of chunk collectorTree that is created on construction and doesn't change aggregatedSignatures *AggregatedSignatures // aggregated signature for each chunk seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results - numberOfChunks int // number of chunks for execution result, remains constant + numberOfChunks uint64 // number of chunks for execution result, remains constant } func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *flow.Header, assignment *chunks.Assignment, seals mempool.IncorporatedResultSeals, requiredApprovalsForSealConstruction uint) *ApprovalCollector { @@ -29,15 +29,27 @@ func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *fl chunkCollectors = append(chunkCollectors, collector) } - numberOfChunks := result.Result.Chunks.Len() - return &ApprovalCollector{ + numberOfChunks := uint64(result.Result.Chunks.Len()) + collector := ApprovalCollector{ incorporatedResult: result, incorporatedBlock: incorporatedBlock, numberOfChunks: numberOfChunks, chunkCollectors: chunkCollectors, - aggregatedSignatures: NewAggregatedSignatures(uint64(numberOfChunks)), + aggregatedSignatures: NewAggregatedSignatures(numberOfChunks), seals: seals, } + + // The following code implements a TEMPORARY SHORTCUT: In case no approvals are required + // to seal an incorporated result, we seal right away when creating the ApprovalCollector. + if requiredApprovalsForSealConstruction == 0 { + err := collector.SealResult() + if err != nil { + err = fmt.Errorf("sealing result %x failed: %w", result.ID(), err) + panic(err.Error()) + } + } + + return &collector } // IncorporatedBlockID returns the ID of block which incorporates execution result @@ -114,7 +126,7 @@ func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error // Returns: map { ChunkIndex -> []VerifierId } func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList { targetIDs := make(map[uint64]flow.IdentifierList) - for _, chunkIndex := range c.aggregatedSignatures.CollectChunksWithMissingApprovals() { + for _, chunkIndex := range c.aggregatedSignatures.ChunksWithoutAggregatedSignature() { missingSigners := c.chunkCollectors[chunkIndex].GetMissingSigners() if missingSigners.Len() > 0 { targetIDs[chunkIndex] = missingSigners diff --git a/engine/consensus/approvals/approvals_cache.go b/engine/consensus/approvals/approvals_cache.go index 87fdedf612d..2a4db9cfade 100644 --- a/engine/consensus/approvals/approvals_cache.go +++ b/engine/consensus/approvals/approvals_cache.go @@ -20,11 +20,16 @@ func NewApprovalsCache(capacity uint) *Cache { } } -// Put saves approval into cache -func (c *Cache) Put(approval *flow.ResultApproval) { +// Put saves approval into cache; returns true iff approval was newly added +func (c *Cache) Put(approval *flow.ResultApproval) bool { + approvalCacheID := approval.Body.PartialID() c.lock.Lock() defer c.lock.Unlock() - c.cache[approval.Body.PartialID()] = approval + if _, found := c.cache[approvalCacheID]; !found { + c.cache[approvalCacheID] = approval + return true + } + return false } // Get returns approval that is saved in cache diff --git a/engine/consensus/approvals/assignment_collector.go b/engine/consensus/approvals/assignment_collector.go index b196df30abb..12c975ce7e1 100644 --- a/engine/consensus/approvals/assignment_collector.go +++ b/engine/consensus/approvals/assignment_collector.go @@ -27,12 +27,16 @@ const DefaultEmergencySealingThreshold = 400 // helper functor that can be used to retrieve cached block height type GetCachedBlockHeight = func(blockID flow.Identifier) (uint64, error) -// AssignmentCollector is responsible collecting approvals that satisfy one assignment, meaning that we will -// have multiple collectorTree for one execution result as same result can be incorporated in multiple forks. -// AssignmentCollector has a strict ordering of processing, before processing approvals at least one incorporated result has to be -// processed. +// AssignmentCollector +// Context: +// * When the same result is incorporated in multiple different forks, +// unique verifier assignment is determined for each fork. +// * The assignment collector is intended to encapsulate the known +// assignments for a particular execution result. +// AssignmentCollector has a strict ordering of processing, before processing +// approvals at least one incorporated result has to be processed. // AssignmentCollector takes advantage of internal caching to speed up processing approvals for different assignments -// AssignmentCollector is responsible for validating approvals on result-level(checking signature, identity). +// AssignmentCollector is responsible for validating approvals on result-level (checking signature, identity). // TODO: currently AssignmentCollector doesn't cleanup collectorTree when blocks that incorporate results get orphaned // For BFT milestone we need to ensure that this cleanup is properly implemented and all orphan collectorTree are pruned by height // when fork gets orphaned @@ -61,12 +65,19 @@ func NewAssignmentCollector(result *flow.ExecutionResult, state protocol.State, if err != nil { return nil, err } + // pre-select all authorized verifiers at the block that is being sealed + authorizedApprovers, err := authorizedVerifiersAtBlock(state, result.BlockID) + if err != nil { + return nil, engine.NewInvalidInputErrorf("could not determine authorized verifiers for sealing candidate: %w", err) + } collector := &AssignmentCollector{ ResultID: result.ID(), result: result, BlockHeight: block.Height, collectors: make(map[flow.Identifier]*ApprovalCollector), + authorizedApprovers: authorizedApprovers, + verifiedApprovalsCache: NewApprovalsCache(uint(result.Chunks.Len() * len(authorizedApprovers))), state: state, assigner: assigner, seals: seals, @@ -76,15 +87,6 @@ func NewAssignmentCollector(result *flow.ExecutionResult, state protocol.State, headers: headers, requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction, } - - // pre-select all authorized verifiers at the block that is being sealed - collector.authorizedApprovers, err = collector.authorizedVerifiersAtBlock(result.BlockID) - if err != nil { - return nil, engine.NewInvalidInputErrorf("could not determine authorized verifiers for sealing candidate: %w", err) - } - - collector.verifiedApprovalsCache = NewApprovalsCache(uint(result.Chunks.Len() * len(collector.authorizedApprovers))) - return collector, nil } @@ -99,36 +101,10 @@ func (ac *AssignmentCollector) collectorByBlockID(incorporatedBlockID flow.Ident return ac.collectors[incorporatedBlockID] } -// authorizedVerifiersAtBlock pre-select all authorized Verifiers at the block that incorporates the result. -// The method returns the set of all node IDs that: -// * are authorized members of the network at the given block and -// * have the Verification role and -// * have _positive_ weight and -// * are not ejected -func (ac *AssignmentCollector) authorizedVerifiersAtBlock(blockID flow.Identifier) (map[flow.Identifier]*flow.Identity, error) { - authorizedVerifierList, err := ac.state.AtBlockID(blockID).Identities( - filter.And( - filter.HasRole(flow.RoleVerification), - filter.HasStake(true), - filter.Not(filter.Ejected), - )) - if err != nil { - return nil, fmt.Errorf("failed to retrieve Identities for block %v: %w", blockID, err) - } - if len(authorizedVerifierList) == 0 { - return nil, fmt.Errorf("no authorized verifiers found for block %v", blockID) - } - identities := make(map[flow.Identifier]*flow.Identity) - for _, identity := range authorizedVerifierList { - identities[identity.NodeID] = identity - } - return identities, nil -} - // emergencySealable determines whether an incorporated Result qualifies for "emergency sealing". // ATTENTION: this is a temporary solution, which is NOT BFT compatible. When the approval process // hangs far enough behind finalization (measured in finalized but unsealed blocks), emergency -// sealing kicks in. This will be removed when implementation of seal & verification is finished. +// sealing kicks in. This will be removed when implementation of Sealing & Verification is finished. func (ac *AssignmentCollector) emergencySealable(collector *ApprovalCollector, finalizedBlockHeight uint64) bool { // Criterion for emergency sealing: // there must be at least DefaultEmergencySealingThreshold number of blocks between @@ -162,25 +138,25 @@ func (ac *AssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flo return nil } - // This function is not exactly thread safe, it can perform double computation of assignment and authorized verifiers - // It is safe in regards that only one collector will be stored to the cache - // In terms of locking time it's better to perform extra computation in edge cases than lock this logic with mutex + // Constructing ApprovalCollector for IncorporatedResult + // The AssignmentCollector is not locked while instantiating the ApprovalCollector. Hence, it is possible that + // multiple threads simultaneously compute the verifier assignment. Nevertheless, the implementation is safe in + // that only one of the instantiated ApprovalCollectors will be stored in the cache. In terms of locking duration, + // it's better to perform extra computation in edge cases than lock this logic with a mutex, // since it's quite unlikely that same incorporated result will be processed by multiple goroutines simultaneously - - // chunk assigment is based on the first block in the fork that incorporates the result assignment, err := ac.assigner.Assign(incorporatedResult.Result, incorporatedBlockID) if err != nil { return fmt.Errorf("could not determine chunk assignment: %w", err) } - incorporatedBlock, err := ac.headers.ByBlockID(incorporatedBlockID) if err != nil { return fmt.Errorf("failed to retrieve header of incorporated block %s: %w", incorporatedBlockID, err) } - collector := NewApprovalCollector(incorporatedResult, incorporatedBlock, assignment, ac.seals, ac.requiredApprovalsForSealConstruction) + // Now, we add the ApprovalCollector to the AssignmentCollector: + // no-op if an ApprovalCollector has already been added by a different routine isDuplicate := ac.putCollector(incorporatedBlockID, collector) if isDuplicate { return nil @@ -233,7 +209,7 @@ func (ac *AssignmentCollector) verifySignature(approval *flow.ResultApproval, no id := approval.Body.ID() valid, err := ac.verifier.Verify(id[:], approval.VerifierSignature, nodeIdentity.StakingPubKey) if err != nil { - return fmt.Errorf("failed to verify signature: %w", err) + return fmt.Errorf("failed to verify approval signature: %w", err) } if !valid { @@ -290,18 +266,22 @@ func (ac *AssignmentCollector) validateApproval(approval *flow.ResultApproval) e } func (ac *AssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error { + // we have this approval cached already, no need to process it again + approvalCacheID := approval.Body.PartialID() + if cached := ac.verifiedApprovalsCache.Get(approvalCacheID); cached != nil { + return nil + } + err := ac.validateApproval(approval) if err != nil { return fmt.Errorf("could not validate approval: %w", err) } - if cached := ac.verifiedApprovalsCache.Get(approval.Body.PartialID()); cached != nil { - // we have this approval cached already, no need to process it again + newlyAdded := ac.verifiedApprovalsCache.Put(approval) + if !newlyAdded { return nil } - ac.verifiedApprovalsCache.Put(approval) - for _, collector := range ac.allCollectors() { err := collector.ProcessApproval(approval) if err != nil { @@ -373,3 +353,29 @@ func (ac *AssignmentCollector) RequestMissingApprovals(sealingTracker *tracker.S } return requestCount, nil } + +// authorizedVerifiersAtBlock pre-select all authorized Verifiers at the block that incorporates the result. +// The method returns the set of all node IDs that: +// * are authorized members of the network at the given block and +// * have the Verification role and +// * have _positive_ weight and +// * are not ejected +func authorizedVerifiersAtBlock(state protocol.State, blockID flow.Identifier) (map[flow.Identifier]*flow.Identity, error) { + authorizedVerifierList, err := state.AtBlockID(blockID).Identities( + filter.And( + filter.HasRole(flow.RoleVerification), + filter.HasStake(true), + filter.Not(filter.Ejected), + )) + if err != nil { + return nil, fmt.Errorf("failed to retrieve Identities for block %v: %w", blockID, err) + } + if len(authorizedVerifierList) == 0 { + return nil, fmt.Errorf("no authorized verifiers found for block %v", blockID) + } + identities := make(map[flow.Identifier]*flow.Identity, len(authorizedVerifierList)) + for _, identity := range authorizedVerifierList { + identities[identity.NodeID] = identity + } + return identities, nil +} diff --git a/engine/consensus/approvals/assignment_collector_tree.go b/engine/consensus/approvals/assignment_collector_tree.go index 00ae2d66962..a49d4b95073 100644 --- a/engine/consensus/approvals/assignment_collector_tree.go +++ b/engine/consensus/approvals/assignment_collector_tree.go @@ -34,18 +34,18 @@ type NewCollectorFactoryMethod = func(result *flow.ExecutionResult) (*Assignment type AssignmentCollectorTree struct { forest *forest.LevelledForest lock sync.RWMutex - onCreateCollector NewCollectorFactoryMethod + createCollector NewCollectorFactoryMethod size uint64 lastSealedID flow.Identifier lastFinalizedHeight uint64 headers storage.Headers } -func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, onCreateCollector NewCollectorFactoryMethod) *AssignmentCollectorTree { +func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, createCollector NewCollectorFactoryMethod) *AssignmentCollectorTree { return &AssignmentCollectorTree{ forest: forest.NewLevelledForest(lastSealed.Height), lock: sync.RWMutex{}, - onCreateCollector: onCreateCollector, + createCollector: createCollector, size: 0, lastSealedID: lastSealed.ID(), lastFinalizedHeight: lastSealed.Height, @@ -141,8 +141,7 @@ type LazyInitCollector struct { Created bool // whether collector was created or retrieved from cache } -// GetOrCreateCollector performs lazy initialization of AssignmentCollector using double checked locking -// Returns, (AssignmentCollector, true or false whenever it was created, error) +// GetOrCreateCollector performs lazy initialization of AssignmentCollector using double-checked locking. func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionResult) (*LazyInitCollector, error) { resultID := result.ID() // first let's check if we have a collector already @@ -155,7 +154,7 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes }, nil } - collector, err := t.onCreateCollector(result) + collector, err := t.createCollector(result) if err != nil { return nil, fmt.Errorf("could not create assignment collector for %v: %w", resultID, err) } @@ -169,12 +168,11 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes return nil, fmt.Errorf("could not fetch executed block %v: %w", result.BlockID, err) } - // fast check shows that there is no collector, need to create one + // Initial check showed that there was no collector. However, it's possible that after the + // initial check but before acquiring the lock to add the newly-created collector, another + // goroutine already added the needed collector. Hence we need to check again: t.lock.Lock() defer t.lock.Unlock() - - // we need to check again, since it's possible that after checking for existing collector but before taking a lock - // new collector was created by concurrent goroutine v, found := t.forest.GetVertex(resultID) if found { return &LazyInitCollector{ @@ -183,6 +181,10 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes Created: false, }, nil } + + // An assignment collector is processable if and only if: + // either (i) the parent result is the latest sealed result (seal is finalized) + // or (ii) the result's parent is processable parent, parentFound := t.forest.GetVertex(result.PreviousResultID) if parentFound { vertex.processable = parent.(*assignmentCollectorVertex).processable diff --git a/engine/consensus/matching/core.go b/engine/consensus/matching/core.go index 2c32e03c252..d6715014850 100644 --- a/engine/consensus/matching/core.go +++ b/engine/consensus/matching/core.go @@ -169,15 +169,15 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) { // if the receipt is for an unknown block, skip it. It will be re-requested // later by `requestPending` function. - head, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID) + executedBlock, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID) if err != nil { log.Debug().Msg("discarding receipt for unknown block") return false, nil } log = log.With(). - Uint64("block_view", head.View). - Uint64("block_height", head.Height). + Uint64("block_view", executedBlock.View). + Uint64("block_height", executedBlock.Height). Logger() log.Debug().Msg("execution receipt received") @@ -187,8 +187,7 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) { if err != nil { return false, fmt.Errorf("could not find sealed block: %w", err) } - isSealed := head.Height <= sealed.Height - if isSealed { + if executedBlock.Height <= sealed.Height { log.Debug().Msg("discarding receipt for already sealed and finalized block height") return false, nil } @@ -224,7 +223,7 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) { return false, fmt.Errorf("failed to validate execution receipt: %w", err) } - _, err = c.storeReceipt(receipt, head) + _, err = c.storeReceipt(receipt, executedBlock) if err != nil { return false, fmt.Errorf("failed to store receipt: %w", err) } diff --git a/engine/consensus/sealing/engine.go b/engine/consensus/sealing/engine.go index 1117bdedf44..55e9dedaae5 100644 --- a/engine/consensus/sealing/engine.go +++ b/engine/consensus/sealing/engine.go @@ -43,22 +43,21 @@ type ( // Purpose of this struct is to provide an efficient way how to consume messages from network layer and pass // them to `Core`. Engine runs 2 separate gorourtines that perform pre-processing and consuming messages by Core. type Engine struct { - unit *engine.Unit - core sealing.SealingCore - log zerolog.Logger - me module.Local - headers storage.Headers - payloads storage.Payloads - cacheMetrics module.MempoolMetrics - engineMetrics module.EngineMetrics - pendingApprovals engine.MessageStore - pendingRequestedApprovals engine.MessageStore - pendingFinalizationEvents *fifoqueue.FifoQueue - pendingIncorporatedResults *fifoqueue.FifoQueue - notifier engine.Notifier - messageHandler *engine.MessageHandler - requiredApprovalsForSealConstruction uint - rootHeader *flow.Header + unit *engine.Unit + core sealing.SealingCore + log zerolog.Logger + me module.Local + headers storage.Headers + payloads storage.Payloads + cacheMetrics module.MempoolMetrics + engineMetrics module.EngineMetrics + pendingApprovals engine.MessageStore + pendingRequestedApprovals engine.MessageStore + pendingFinalizationEvents *fifoqueue.FifoQueue + pendingIncorporatedResults *fifoqueue.FifoQueue + notifier engine.Notifier + messageHandler *engine.MessageHandler + rootHeader *flow.Header } // NewEngine constructs new `Engine` which runs on it's own unit. @@ -84,20 +83,24 @@ func NewEngine(log zerolog.Logger, } e := &Engine{ - unit: engine.NewUnit(), - log: log.With().Str("engine", "sealing.Engine").Logger(), - me: me, - engineMetrics: engineMetrics, - cacheMetrics: mempool, - headers: headers, - payloads: payloads, - requiredApprovalsForSealConstruction: options.RequiredApprovalsForSealConstruction, - rootHeader: rootHeader, + unit: engine.NewUnit(), + log: log.With().Str("engine", "sealing.Engine").Logger(), + me: me, + engineMetrics: engineMetrics, + cacheMetrics: mempool, + headers: headers, + payloads: payloads, + rootHeader: rootHeader, } - err = e.setupMessageHandler() + err = e.setupTrustedInboundQueues() if err != nil { - return nil, fmt.Errorf("could not initialize message handler: %w", err) + return nil, fmt.Errorf("initialization of inbound queues for trusted inputs failed: %w", err) + } + + err = e.setupMessageHandler(options.RequiredApprovalsForSealConstruction) + if err != nil { + return nil, fmt.Errorf("could not initialize message handler for untrusted inputs: %w", err) } // register engine with the approval provider @@ -120,7 +123,26 @@ func NewEngine(log zerolog.Logger, return e, nil } -func (e *Engine) setupMessageHandler() error { +// setupTrustedInboundQueues initializes inbound queues for TRUSTED INPUTS (from other components within the +// consensus node). We deliberately separate the queues for trusted inputs from the MessageHandler, which +// handles external, untrusted inputs. This reduces the attack surface, as it makes it impossible for an external +// attacker to feed values into the inbound channels for trusted inputs, even in the presence of bugs in +// the networking layer or message handler +func (e *Engine) setupTrustedInboundQueues() error { + var err error + e.pendingFinalizationEvents, err = fifoqueue.NewFifoQueue(fifoqueue.WithCapacity(defaultFinalizationEventsQueueCapacity)) + if err != nil { + return fmt.Errorf("failed to create queue for finalization events: %w", err) + } + e.pendingIncorporatedResults, err = fifoqueue.NewFifoQueue() + if err != nil { + return fmt.Errorf("failed to create queue for incorproated results: %w", err) + } + return nil +} + +// setupMessageHandler initializes the inbound queues and the MessageHandler for UNTRUSTED INPUTS. +func (e *Engine) setupMessageHandler(requiredApprovalsForSealConstruction uint) error { // FIFO queue for broadcasted approvals pendingApprovalsQueue, err := fifoqueue.NewFifoQueue( fifoqueue.WithCapacity(defaultApprovalQueueCapacity), @@ -145,17 +167,6 @@ func (e *Engine) setupMessageHandler() error { FifoQueue: pendingRequestedApprovalsQueue, } - e.pendingFinalizationEvents, err = fifoqueue.NewFifoQueue( - fifoqueue.WithCapacity(defaultFinalizationEventsQueueCapacity)) - if err != nil { - return fmt.Errorf("failed to create queue for finalization events: %w", err) - } - - e.pendingIncorporatedResults, err = fifoqueue.NewFifoQueue() - if err != nil { - return fmt.Errorf("failed to create queue for incorproated results: %w", err) - } - e.notifier = engine.NewNotifier() // define message queueing behaviour e.messageHandler = engine.NewMessageHandler( @@ -170,7 +181,7 @@ func (e *Engine) setupMessageHandler() error { return ok }, Map: func(msg *engine.Message) (*engine.Message, bool) { - if e.requiredApprovalsForSealConstruction < 1 { + if requiredApprovalsForSealConstruction < 1 { // if we don't require approvals to construct a seal, don't even process approvals. return nil, false } @@ -188,7 +199,7 @@ func (e *Engine) setupMessageHandler() error { return ok }, Map: func(msg *engine.Message) (*engine.Message, bool) { - if e.requiredApprovalsForSealConstruction < 1 { + if requiredApprovalsForSealConstruction < 1 { // if we don't require approvals to construct a seal, don't even process approvals. return nil, false } diff --git a/engine/consensus/sealing/engine_test.go b/engine/consensus/sealing/engine_test.go index aa97055e820..47984fbdc8b 100644 --- a/engine/consensus/sealing/engine_test.go +++ b/engine/consensus/sealing/engine_test.go @@ -45,18 +45,19 @@ func (s *SealingEngineSuite) SetupTest() { require.NoError(s.T(), err) s.engine = &Engine{ - log: log, - unit: engine.NewUnit(), - core: s.core, - me: me, - engineMetrics: metrics, - cacheMetrics: metrics, - requiredApprovalsForSealConstruction: RequiredApprovalsForSealConstructionTestingValue, - rootHeader: rootHeader, + log: log, + unit: engine.NewUnit(), + core: s.core, + me: me, + engineMetrics: metrics, + cacheMetrics: metrics, + rootHeader: rootHeader, } - // setups message handler - err = s.engine.setupMessageHandler() + // setup inbound queues for trusted inputs and message handler for untrusted inputs + err = s.engine.setupTrustedInboundQueues() + require.NoError(s.T(), err) + err = s.engine.setupMessageHandler(RequiredApprovalsForSealConstructionTestingValue) require.NoError(s.T(), err) <-s.engine.Ready() diff --git a/module/mempool/consensus/execution_tree.go b/module/mempool/consensus/execution_tree.go index de69b52dd46..5fa90f5f506 100644 --- a/module/mempool/consensus/execution_tree.go +++ b/module/mempool/consensus/execution_tree.go @@ -84,9 +84,9 @@ func (et *ExecutionTree) getEquivalenceClass(result *flow.ExecutionResult, block return vertex.(*ReceiptsOfSameResult), nil } -// Add the given execution receipt to the memory pool. Requires height -// of the block the receipt is for. We enforce data consistency on an API -// level by using the block header as input. +// AddReceipt adds the given execution receipt to the memory pool. Requires +// height of the block the receipt is for. We enforce data consistency on +// an API level by using the block header as input. func (et *ExecutionTree) AddReceipt(receipt *flow.ExecutionReceipt, block *flow.Header) (bool, error) { et.Lock() defer et.Unlock()