Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR 736: suggestions pt4 #774

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions engine/consensus/approvals/aggregated_signatures.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures {

// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`.
// The returned int is the resulting number of approved chunks.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) int {
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) uint64 {
as.lock.Lock()
defer as.lock.Unlock()
if _, found := as.signatures[chunkIndex]; !found {
as.signatures[chunkIndex] = aggregatedSignature
}
return len(as.signatures)
return uint64(len(as.signatures))
}

// HasSignature returns boolean depending if we have signature for particular chunk
Expand All @@ -54,8 +54,8 @@ func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature {
return aggregatedSigs
}

// CollectChunksWithMissingApprovals returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) CollectChunksWithMissingApprovals() []uint64 {
// ChunksWithoutAggregatedSignature returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64 {
// provide enough capacity to avoid allocations while we hold the lock
missingChunks := make([]uint64, 0, as.numberOfChunks)
as.lock.RLock()
Expand Down
22 changes: 17 additions & 5 deletions engine/consensus/approvals/approval_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ApprovalCollector struct {
chunkCollectors []*ChunkApprovalCollector // slice of chunk collectorTree that is created on construction and doesn't change
aggregatedSignatures *AggregatedSignatures // aggregated signature for each chunk
seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results
numberOfChunks int // number of chunks for execution result, remains constant
numberOfChunks uint64 // number of chunks for execution result, remains constant
}

func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *flow.Header, assignment *chunks.Assignment, seals mempool.IncorporatedResultSeals, requiredApprovalsForSealConstruction uint) *ApprovalCollector {
Expand All @@ -29,15 +29,27 @@ func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *fl
chunkCollectors = append(chunkCollectors, collector)
}

numberOfChunks := result.Result.Chunks.Len()
return &ApprovalCollector{
numberOfChunks := uint64(result.Result.Chunks.Len())
collector := ApprovalCollector{
incorporatedResult: result,
incorporatedBlock: incorporatedBlock,
numberOfChunks: numberOfChunks,
chunkCollectors: chunkCollectors,
aggregatedSignatures: NewAggregatedSignatures(uint64(numberOfChunks)),
aggregatedSignatures: NewAggregatedSignatures(numberOfChunks),
seals: seals,
}

// The following code implements a TEMPORARY SHORTCUT: In case no approvals are required
// to seal an incorporated result, we seal right away when creating the ApprovalCollector.
if requiredApprovalsForSealConstruction == 0 {
err := collector.SealResult()
if err != nil {
err = fmt.Errorf("sealing result %x failed: %w", result.ID(), err)
panic(err.Error())
}
}

return &collector
}

// IncorporatedBlockID returns the ID of block which incorporates execution result
Expand Down Expand Up @@ -114,7 +126,7 @@ func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error
// Returns: map { ChunkIndex -> []VerifierId }
func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList {
targetIDs := make(map[uint64]flow.IdentifierList)
for _, chunkIndex := range c.aggregatedSignatures.CollectChunksWithMissingApprovals() {
for _, chunkIndex := range c.aggregatedSignatures.ChunksWithoutAggregatedSignature() {
missingSigners := c.chunkCollectors[chunkIndex].GetMissingSigners()
if missingSigners.Len() > 0 {
targetIDs[chunkIndex] = missingSigners
Expand Down
11 changes: 8 additions & 3 deletions engine/consensus/approvals/approvals_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@ func NewApprovalsCache(capacity uint) *Cache {
}
}

// Put saves approval into cache
func (c *Cache) Put(approval *flow.ResultApproval) {
// Put saves approval into cache; returns true iff approval was newly added
func (c *Cache) Put(approval *flow.ResultApproval) bool {
approvalCacheID := approval.Body.PartialID()
c.lock.Lock()
defer c.lock.Unlock()
c.cache[approval.Body.PartialID()] = approval
if _, found := c.cache[approvalCacheID]; !found {
c.cache[approvalCacheID] = approval
return true
}
return false
}

// Get returns approval that is saved in cache
Expand Down
112 changes: 59 additions & 53 deletions engine/consensus/approvals/assignment_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ const DefaultEmergencySealingThreshold = 400
// helper functor that can be used to retrieve cached block height
type GetCachedBlockHeight = func(blockID flow.Identifier) (uint64, error)

// AssignmentCollector is responsible collecting approvals that satisfy one assignment, meaning that we will
// have multiple collectorTree for one execution result as same result can be incorporated in multiple forks.
// AssignmentCollector has a strict ordering of processing, before processing approvals at least one incorporated result has to be
// processed.
// AssignmentCollector
// Context:
// * When the same result is incorporated in multiple different forks,
// unique verifier assignment is determined for each fork.
// * The assignment collector is intended to encapsulate the known
// assignments for a particular execution result.
// AssignmentCollector has a strict ordering of processing, before processing
// approvals at least one incorporated result has to be processed.
// AssignmentCollector takes advantage of internal caching to speed up processing approvals for different assignments
// AssignmentCollector is responsible for validating approvals on result-level(checking signature, identity).
// AssignmentCollector is responsible for validating approvals on result-level (checking signature, identity).
// TODO: currently AssignmentCollector doesn't cleanup collectorTree when blocks that incorporate results get orphaned
// For BFT milestone we need to ensure that this cleanup is properly implemented and all orphan collectorTree are pruned by height
// when fork gets orphaned
Expand Down Expand Up @@ -61,12 +65,19 @@ func NewAssignmentCollector(result *flow.ExecutionResult, state protocol.State,
if err != nil {
return nil, err
}
// pre-select all authorized verifiers at the block that is being sealed
authorizedApprovers, err := authorizedVerifiersAtBlock(state, result.BlockID)
if err != nil {
return nil, engine.NewInvalidInputErrorf("could not determine authorized verifiers for sealing candidate: %w", err)
}
Comment on lines +68 to +72
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just shifted this code up. Thereby, we are initializing the dependencies of AssignmentCollector first and subsequently the struct itself.


collector := &AssignmentCollector{
ResultID: result.ID(),
result: result,
BlockHeight: block.Height,
collectors: make(map[flow.Identifier]*ApprovalCollector),
authorizedApprovers: authorizedApprovers,
verifiedApprovalsCache: NewApprovalsCache(uint(result.Chunks.Len() * len(authorizedApprovers))),
state: state,
assigner: assigner,
seals: seals,
Expand All @@ -76,15 +87,6 @@ func NewAssignmentCollector(result *flow.ExecutionResult, state protocol.State,
headers: headers,
requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction,
}

// pre-select all authorized verifiers at the block that is being sealed
collector.authorizedApprovers, err = collector.authorizedVerifiersAtBlock(result.BlockID)
if err != nil {
return nil, engine.NewInvalidInputErrorf("could not determine authorized verifiers for sealing candidate: %w", err)
}

collector.verifiedApprovalsCache = NewApprovalsCache(uint(result.Chunks.Len() * len(collector.authorizedApprovers)))

return collector, nil
}

Expand All @@ -99,36 +101,10 @@ func (ac *AssignmentCollector) collectorByBlockID(incorporatedBlockID flow.Ident
return ac.collectors[incorporatedBlockID]
}

// authorizedVerifiersAtBlock pre-select all authorized Verifiers at the block that incorporates the result.
// The method returns the set of all node IDs that:
// * are authorized members of the network at the given block and
// * have the Verification role and
// * have _positive_ weight and
// * are not ejected
func (ac *AssignmentCollector) authorizedVerifiersAtBlock(blockID flow.Identifier) (map[flow.Identifier]*flow.Identity, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I removed the pointer receiver, which allows to compute authorizedVerifiersAtBlock before initializing the AssignmentCollector.
  • As this method is now not part of AssignmentCollector, I moved it to the end of the file

authorizedVerifierList, err := ac.state.AtBlockID(blockID).Identities(
filter.And(
filter.HasRole(flow.RoleVerification),
filter.HasStake(true),
filter.Not(filter.Ejected),
))
if err != nil {
return nil, fmt.Errorf("failed to retrieve Identities for block %v: %w", blockID, err)
}
if len(authorizedVerifierList) == 0 {
return nil, fmt.Errorf("no authorized verifiers found for block %v", blockID)
}
identities := make(map[flow.Identifier]*flow.Identity)
for _, identity := range authorizedVerifierList {
identities[identity.NodeID] = identity
}
return identities, nil
}

// emergencySealable determines whether an incorporated Result qualifies for "emergency sealing".
// ATTENTION: this is a temporary solution, which is NOT BFT compatible. When the approval process
// hangs far enough behind finalization (measured in finalized but unsealed blocks), emergency
// sealing kicks in. This will be removed when implementation of seal & verification is finished.
// sealing kicks in. This will be removed when implementation of Sealing & Verification is finished.
func (ac *AssignmentCollector) emergencySealable(collector *ApprovalCollector, finalizedBlockHeight uint64) bool {
// Criterion for emergency sealing:
// there must be at least DefaultEmergencySealingThreshold number of blocks between
Expand Down Expand Up @@ -162,25 +138,25 @@ func (ac *AssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flo
return nil
}

// This function is not exactly thread safe, it can perform double computation of assignment and authorized verifiers
// It is safe in regards that only one collector will be stored to the cache
// In terms of locking time it's better to perform extra computation in edge cases than lock this logic with mutex
// Constructing ApprovalCollector for IncorporatedResult
// The AssignmentCollector is not locked while instantiating the ApprovalCollector. Hence, it is possible that
// multiple threads simultaneously compute the verifier assignment. Nevertheless, the implementation is safe in
// that only one of the instantiated ApprovalCollectors will be stored in the cache. In terms of locking duration,
// it's better to perform extra computation in edge cases than lock this logic with a mutex,
// since it's quite unlikely that same incorporated result will be processed by multiple goroutines simultaneously

// chunk assigment is based on the first block in the fork that incorporates the result
assignment, err := ac.assigner.Assign(incorporatedResult.Result, incorporatedBlockID)
if err != nil {
return fmt.Errorf("could not determine chunk assignment: %w", err)
}

incorporatedBlock, err := ac.headers.ByBlockID(incorporatedBlockID)
if err != nil {
return fmt.Errorf("failed to retrieve header of incorporated block %s: %w",
incorporatedBlockID, err)
}

collector := NewApprovalCollector(incorporatedResult, incorporatedBlock, assignment, ac.seals, ac.requiredApprovalsForSealConstruction)

// Now, we add the ApprovalCollector to the AssignmentCollector:
// no-op if an ApprovalCollector has already been added by a different routine
isDuplicate := ac.putCollector(incorporatedBlockID, collector)
if isDuplicate {
return nil
Expand Down Expand Up @@ -233,7 +209,7 @@ func (ac *AssignmentCollector) verifySignature(approval *flow.ResultApproval, no
id := approval.Body.ID()
valid, err := ac.verifier.Verify(id[:], approval.VerifierSignature, nodeIdentity.StakingPubKey)
if err != nil {
return fmt.Errorf("failed to verify signature: %w", err)
return fmt.Errorf("failed to verify approval signature: %w", err)
}

if !valid {
Expand Down Expand Up @@ -290,18 +266,22 @@ func (ac *AssignmentCollector) validateApproval(approval *flow.ResultApproval) e
}

func (ac *AssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error {
// we have this approval cached already, no need to process it again
approvalCacheID := approval.Body.PartialID()
if cached := ac.verifiedApprovalsCache.Get(approvalCacheID); cached != nil {
return nil
}

Comment on lines +269 to +274
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checks first whether an approval is already cached and drops duplicates, before doing expensive crypto verifications

err := ac.validateApproval(approval)
if err != nil {
return fmt.Errorf("could not validate approval: %w", err)
}

if cached := ac.verifiedApprovalsCache.Get(approval.Body.PartialID()); cached != nil {
// we have this approval cached already, no need to process it again
newlyAdded := ac.verifiedApprovalsCache.Put(approval)
if !newlyAdded {
return nil
}

ac.verifiedApprovalsCache.Put(approval)

for _, collector := range ac.allCollectors() {
err := collector.ProcessApproval(approval)
if err != nil {
Expand Down Expand Up @@ -373,3 +353,29 @@ func (ac *AssignmentCollector) RequestMissingApprovals(sealingTracker *tracker.S
}
return requestCount, nil
}

// authorizedVerifiersAtBlock pre-select all authorized Verifiers at the block that incorporates the result.
// The method returns the set of all node IDs that:
// * are authorized members of the network at the given block and
// * have the Verification role and
// * have _positive_ weight and
// * are not ejected
func authorizedVerifiersAtBlock(state protocol.State, blockID flow.Identifier) (map[flow.Identifier]*flow.Identity, error) {
authorizedVerifierList, err := state.AtBlockID(blockID).Identities(
filter.And(
filter.HasRole(flow.RoleVerification),
filter.HasStake(true),
filter.Not(filter.Ejected),
))
if err != nil {
return nil, fmt.Errorf("failed to retrieve Identities for block %v: %w", blockID, err)
}
if len(authorizedVerifierList) == 0 {
return nil, fmt.Errorf("no authorized verifiers found for block %v", blockID)
}
identities := make(map[flow.Identifier]*flow.Identity, len(authorizedVerifierList))
for _, identity := range authorizedVerifierList {
identities[identity.NodeID] = identity
}
return identities, nil
}
22 changes: 12 additions & 10 deletions engine/consensus/approvals/assignment_collector_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ type NewCollectorFactoryMethod = func(result *flow.ExecutionResult) (*Assignment
type AssignmentCollectorTree struct {
forest *forest.LevelledForest
lock sync.RWMutex
onCreateCollector NewCollectorFactoryMethod
createCollector NewCollectorFactoryMethod
size uint64
lastSealedID flow.Identifier
lastFinalizedHeight uint64
headers storage.Headers
}

func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, onCreateCollector NewCollectorFactoryMethod) *AssignmentCollectorTree {
func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, createCollector NewCollectorFactoryMethod) *AssignmentCollectorTree {
return &AssignmentCollectorTree{
forest: forest.NewLevelledForest(lastSealed.Height),
lock: sync.RWMutex{},
onCreateCollector: onCreateCollector,
createCollector: createCollector,
size: 0,
lastSealedID: lastSealed.ID(),
lastFinalizedHeight: lastSealed.Height,
Expand Down Expand Up @@ -141,8 +141,7 @@ type LazyInitCollector struct {
Created bool // whether collector was created or retrieved from cache
}

// GetOrCreateCollector performs lazy initialization of AssignmentCollector using double checked locking
// Returns, (AssignmentCollector, true or false whenever it was created, error)
// GetOrCreateCollector performs lazy initialization of AssignmentCollector using double-checked locking.
func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionResult) (*LazyInitCollector, error) {
resultID := result.ID()
// first let's check if we have a collector already
Expand All @@ -155,7 +154,7 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes
}, nil
}

collector, err := t.onCreateCollector(result)
collector, err := t.createCollector(result)
if err != nil {
return nil, fmt.Errorf("could not create assignment collector for %v: %w", resultID, err)
}
Expand All @@ -169,12 +168,11 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes
return nil, fmt.Errorf("could not fetch executed block %v: %w", result.BlockID, err)
}

// fast check shows that there is no collector, need to create one
// Initial check showed that there was no collector. However, it's possible that after the
// initial check but before acquiring the lock to add the newly-created collector, another
// goroutine already added the needed collector. Hence we need to check again:
t.lock.Lock()
defer t.lock.Unlock()

// we need to check again, since it's possible that after checking for existing collector but before taking a lock
// new collector was created by concurrent goroutine
v, found := t.forest.GetVertex(resultID)
if found {
return &LazyInitCollector{
Expand All @@ -183,6 +181,10 @@ func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionRes
Created: false,
}, nil
}

// An assignment collector is processable if and only if:
// either (i) the parent result is the latest sealed result (seal is finalized)
// or (ii) the result's parent is processable
parent, parentFound := t.forest.GetVertex(result.PreviousResultID)
if parentFound {
vertex.processable = parent.(*assignmentCollectorVertex).processable
Expand Down
11 changes: 5 additions & 6 deletions engine/consensus/matching/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {

// if the receipt is for an unknown block, skip it. It will be re-requested
// later by `requestPending` function.
head, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID)
executedBlock, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID)
if err != nil {
log.Debug().Msg("discarding receipt for unknown block")
return false, nil
}

log = log.With().
Uint64("block_view", head.View).
Uint64("block_height", head.Height).
Uint64("block_view", executedBlock.View).
Uint64("block_height", executedBlock.Height).
Logger()
log.Debug().Msg("execution receipt received")

Expand All @@ -187,8 +187,7 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {
if err != nil {
return false, fmt.Errorf("could not find sealed block: %w", err)
}
isSealed := head.Height <= sealed.Height
if isSealed {
if executedBlock.Height <= sealed.Height {
log.Debug().Msg("discarding receipt for already sealed and finalized block height")
return false, nil
}
Expand Down Expand Up @@ -224,7 +223,7 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {
return false, fmt.Errorf("failed to validate execution receipt: %w", err)
}

_, err = c.storeReceipt(receipt, head)
_, err = c.storeReceipt(receipt, executedBlock)
if err != nil {
return false, fmt.Errorf("failed to store receipt: %w", err)
}
Expand Down
Loading