From 0b6a5baa061fdfc375bc67bdb5533dd48f4f5484 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Wed, 24 Jan 2024 14:17:01 -0800 Subject: [PATCH] handle race condition --- .../ingestion/block_queue/block_queue.go | 45 ++++++++++++++----- .../ingestion/block_queue/block_queue_test.go | 24 +++++++++- 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/engine/execution/ingestion/block_queue/block_queue.go b/engine/execution/ingestion/block_queue/block_queue.go index 54a73f40c1a..3d42d50bd5a 100644 --- a/engine/execution/ingestion/block_queue/block_queue.go +++ b/engine/execution/ingestion/block_queue/block_queue.go @@ -9,6 +9,8 @@ import ( "github.com/rs/zerolog" ) +var ErrMissingParent = fmt.Errorf("missing parent block") + // BlockQueue keeps track of state of blocks and determines which blocks are executable // A block becomes executable when all the following conditions are met: // 1. the block has been validated by consensus algorithm @@ -18,8 +20,14 @@ type BlockQueue struct { sync.Mutex log zerolog.Logger - // when receiving a new block, adding it to the map, and add missing collections to the map + // if a block still exists in this map, it means either some of its collection is missing, + // or its parent block has not been executed. + // if a block's StartState is not nil, it means its parent block has been executed, and + // its parent block must have been removed from this map + // if a block's StartState is nil, it means its parent block has not been executed yet. + // and its parent must be found in the this map as well blocks map[flow.Identifier]*entity.ExecutableBlock + // a collection could be included in multiple blocks, // when a missing block is received, it might trigger multiple blocks to be executable, which // can be looked up by the map @@ -30,8 +38,8 @@ type BlockQueue struct { // blockIDsByHeight is used to find next executable block. // when a block is executed, the next executable block must be a block with height = current block height + 1 - // the following map allows us to find the next executable block by height - blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock // for finding next executable block + // the following map allows us to find the next executable block by height and their parent block ID + blockIDsByHeight map[uint64]map[flow.Identifier]*entity.ExecutableBlock } type MissingCollection struct { @@ -62,11 +70,13 @@ func NewBlockQueue(logger zerolog.Logger) *BlockQueue { } } -// OnBlock is called when a new block is received, and its parent is not executed. -// It returns a list of missing collections and a list of executable blocks -// Note: caller must ensure when OnBlock is called with a block, -// if its parent is not executed, then the parent must be added to the queue first. -// if its parent is executed, then the parent's finalState must be passed in. +// OnBlock is called when a new block is received, the parentFinalState indicates +// whether its parent block has been executed. +// Caller must ensure: +// 1. blocks are passsed in order, i.e. parent block is passed in before its child block +// 2. if a block's parent is not executed, then the parent block must be passed in first +// 3. if a block's parent is executed, then the parent's finalState must be passed in +// It returns (nil, nil, nil) if this block is a duplication func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateCommitment) ( []*MissingCollection, // missing collections []*entity.ExecutableBlock, // blocks ready to execute @@ -125,8 +135,9 @@ func (q *BlockQueue) OnBlock(block *flow.Block, parentFinalState *flow.StateComm if parentFinalState == nil { _, parentExists := q.blocks[block.Header.ParentID] if !parentExists { - return nil, nil, fmt.Errorf("parent block %s of block %s is not in the queue", - block.Header.ParentID, blockID) + return nil, nil, + fmt.Errorf("block %s has no parent commitment, but its parent block %s does not exist in the queue: %w", + blockID, block.Header.ParentID, ErrMissingParent) } } @@ -235,6 +246,8 @@ func (q *BlockQueue) OnCollection(collection *flow.Collection) ([]*entity.Execut // OnBlockExecuted is called when a block is executed // It returns a list of executable blocks (usually its child blocks) +// The caller has to ensure OnBlockExecuted is not called in a wrong order, such as +// OnBlockExecuted(childBlock) being called before OnBlockExecuted(parentBlock). func (q *BlockQueue) OnBlockExecuted( blockID flow.Identifier, commit flow.StateCommitment, @@ -258,6 +271,15 @@ func (q *BlockQueue) onBlockExecuted( return nil, nil } + // sanity check + // if a block exists in the queue and is executed, then its parent block + // must not exist in the queue, otherwise the state is inconsistent + _, parentExists := q.blocks[block.Block.Header.ParentID] + if parentExists { + return nil, fmt.Errorf("parent block %s of block %s is in the queue", + block.Block.Header.ParentID, blockID) + } + delete(q.blocks, blockID) // remove height index @@ -326,7 +348,8 @@ func (q *BlockQueue) checkIfChildBlockBecomeExecutable( return executables, nil } -// GetMissingCollections returns the missing collections and the start state +// GetMissingCollections returns the missing collections and the start state for the given block +// Useful for debugging what is missing for the next unexecuted block to become executable. // It returns an error if the block is not found func (q *BlockQueue) GetMissingCollections(blockID flow.Identifier) ( []*MissingCollection, *flow.StateCommitment, error) { diff --git a/engine/execution/ingestion/block_queue/block_queue_test.go b/engine/execution/ingestion/block_queue/block_queue_test.go index 1c1da0d1ab8..376f1138b51 100644 --- a/engine/execution/ingestion/block_queue/block_queue_test.go +++ b/engine/execution/ingestion/block_queue/block_queue_test.go @@ -1,6 +1,7 @@ package block_queue import ( + "errors" "testing" "github.com/stretchr/testify/require" @@ -281,7 +282,7 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { require.Empty(t, executables) requireCollectionHas(t, missing, c1) - // block A is executable + // block A has all the collections and become executable executables, err = q.OnCollection(c1) require.NoError(t, err) requireExecutableHas(t, executables, blockA) @@ -294,11 +295,30 @@ func TestOnBlockWithMissingParentCommit(t *testing.T) { requireExecutableHas(t, executables) requireQueueIsEmpty(t, q) - missing, executables, err = q.OnBlock(blockB, nil) + // verify when race condition happens, ErrMissingParent will be returned + _, _, err = q.OnBlock(blockB, nil) + require.True(t, errors.Is(err, ErrMissingParent), err) + + // verify if called again with parent commit, it will be successful + missing, executables, err = q.OnBlock(blockB, commitFor("A")) require.NoError(t, err) require.Empty(t, executables) requireCollectionHas(t, missing, c2, c3) + // verify after receiving all collections, B becomes executable + executables, err = q.OnCollection(c2) + require.NoError(t, err) + require.Empty(t, executables) + + executables, err = q.OnCollection(c3) + require.NoError(t, err) + requireExecutableHas(t, executables, blockB) + + // verify after B is executed, the queue is empty + executables, err = q.OnBlockExecuted(blockB.ID(), *commitFor("B")) + require.NoError(t, err) + requireExecutableHas(t, executables) + requireQueueIsEmpty(t, q) } /* ==== Test utils ==== */