Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dot/sync): simplify processBlockData and associated tests #2811

Merged
merged 21 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions dot/sync/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ func (bq *blockQueue) push(blockData *types.BlockData) {
bq.queue <- blockData
}

// pop pops an item from the queue. It blocks if the queue is empty.
func (bq *blockQueue) pop(ctx context.Context) (blockData *types.BlockData) {
// pop pops the next item from the queue. It blocks if the queue is empty
// until the context is cancelled. If the context is canceled, it returns
// the error from the context.
func (bq *blockQueue) pop(ctx context.Context) (blockData *types.BlockData, err error) {
select {
case <-ctx.Done():
return nil
return blockData, ctx.Err()
case blockData = <-bq.queue:
}
bq.hashesSetMutex.Lock()
delete(bq.hashesSet, blockData.Hash)
bq.hashesSetMutex.Unlock()
return blockData
return blockData, nil
}

func (bq *blockQueue) has(blockHash common.Hash) (has bool) {
Expand Down
14 changes: 9 additions & 5 deletions dot/sync/block_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func Test_blockQueue_pop(t *testing.T) {
const capacity = 1
bq := newBlockQueue(capacity)

blockData := bq.pop(ctx)
blockData, err := bq.pop(ctx)
assert.Nil(t, blockData)
assert.ErrorIs(t, err, context.Canceled)
})

t.Run("get block data after waiting", func(t *testing.T) {
Expand All @@ -92,12 +93,13 @@ func Test_blockQueue_pop(t *testing.T) {
bq.push(blockData)
})

blockData := bq.pop(ctx)
blockData, err := bq.pop(ctx)

expectedBlockData := &types.BlockData{
Hash: common.Hash{1},
}
assert.Equal(t, expectedBlockData, blockData)
assert.NoError(t, err)

assert.Len(t, bq.queue, 0)
bq.queue = nil
Expand Down Expand Up @@ -158,8 +160,9 @@ func Test_lockQueue_endToEnd(t *testing.T) {
blockQueue.push(newBlockData(2))
blockQueue.push(newBlockData(3))

blockData := blockQueue.pop(context.Background())
blockData, err := blockQueue.pop(context.Background())
assert.Equal(t, newBlockData(1), blockData)
assert.NoError(t, err)

has := blockQueue.has(newBlockData(2).Hash)
assert.True(t, has)
Expand All @@ -171,8 +174,9 @@ func Test_lockQueue_endToEnd(t *testing.T) {
has = blockQueue.has(newBlockData(4).Hash)
assert.True(t, has)

blockData = blockQueue.pop(context.Background())
blockData, err = blockQueue.pop(context.Background())
assert.Equal(t, newBlockData(2), blockData)
assert.NoError(t, err)

// drain queue
for len(blockQueue.queue) > 0 {
Expand Down Expand Up @@ -236,7 +240,7 @@ func Test_lockQueue_threadSafety(t *testing.T) {
})

go runInLoop(func() {
_ = blockQueue.pop(ctx)
_, _ = blockQueue.pop(ctx)
})

go runInLoop(func() {
Expand Down
158 changes: 87 additions & 71 deletions dot/sync/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ func (s *chainProcessor) stop() {

func (s *chainProcessor) processReadyBlocks() {
for {
bd := s.readyBlocks.pop(s.ctx)
if s.ctx.Err() != nil {
return
bd, err := s.readyBlocks.pop(s.ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
panic(fmt.Sprintf("unhandled error: %s", err))
}

if err := s.processBlockData(bd); err != nil {
if err := s.processBlockData(*bd); err != nil {
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved
// depending on the error, we might want to save this block for later
if !errors.Is(err, errFailedToGetParent) {
logger.Errorf("block data processing for block with hash %s failed: %s", bd.Hash, err)
Expand All @@ -108,101 +111,116 @@ func (s *chainProcessor) processReadyBlocks() {
// processBlockData processes the BlockData from a BlockResponse and
// returns the index of the last BlockData it handled on success,
// or the index of the block data that errored on failure.
func (s *chainProcessor) processBlockData(bd *types.BlockData) error {
if bd == nil {
return ErrNilBlockData
}
func (c *chainProcessor) processBlockData(blockData types.BlockData) error { //nolint:revive
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugf("processing block data with hash %s", blockData.Hash)

hasHeader, err := s.blockState.HasHeader(bd.Hash)
headerInState, err := c.blockState.HasHeader(blockData.Hash)
if err != nil {
return fmt.Errorf("failed to check if block state has header for hash %s: %w", bd.Hash, err)
return fmt.Errorf("checking if block state has header: %w", err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
hasBody, err := s.blockState.HasBlockBody(bd.Hash)

bodyInState, err := c.blockState.HasBlockBody(blockData.Hash)
if err != nil {
return fmt.Errorf("failed to check block state has body for hash %s: %w", bd.Hash, err)
return fmt.Errorf("checking if block state has body: %w", err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}

// while in bootstrap mode we don't need to broadcast block announcements
announceImportedBlock := s.chainSync.syncState() == tip
if hasHeader && hasBody {
// TODO: fix this; sometimes when the node shuts down the "best block" isn't stored properly,
// so when the node restarts it has blocks higher than what it thinks is the best, causing it not to sync
// if we update the node to only store finalised blocks in the database, this should be fixed and the entire
// code block can be removed (#1784)
block, err := s.blockState.GetBlockByHash(bd.Hash)
announceImportedBlock := c.chainSync.syncState() == tip
if headerInState && bodyInState {
err = c.processBlockDataWithStateHeaderAndBody(blockData, announceImportedBlock)
if err != nil {
logger.Debugf("failed to get block header for hash %s: %s", bd.Hash, err)
return err
return fmt.Errorf("processing block data with header and "+
"body in block state: %w", err)
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

logger.Debugf(
"skipping block number %d with hash %s, already have",
block.Header.Number, bd.Hash) // TODO is this valid?

err = s.blockState.AddBlockToBlockTree(block)
if errors.Is(err, blocktree.ErrBlockExists) {
return nil
} else if err != nil {
logger.Warnf("failed to add block with hash %s to blocktree: %s", bd.Hash, err)
return err
if blockData.Header != nil {
if blockData.Body != nil {
err = c.processBlockDataWithHeaderAndBody(blockData, announceImportedBlock)
if err != nil {
return fmt.Errorf("processing block data with header and body: %w", err)
}
logger.Debugf("block with hash %s processed", blockData.Hash)
}

if bd.Justification != nil {
logger.Debugf("handling Justification for block number %d with hash %s...", block.Header.Number, bd.Hash)
err = s.handleJustification(&block.Header, *bd.Justification)
if blockData.Justification != nil && len(*blockData.Justification) > 0 {
err = c.handleJustification(blockData.Header, *blockData.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
}
}

// TODO: this is probably unnecessary, since the state is already in the database
// however, this case shouldn't be hit often, since it's only hit if the node state
// is rewinded or if the node shuts down unexpectedly (#1784)
state, err := s.storageState.TrieState(&block.Header.StateRoot)
if err != nil {
logger.Warnf("failed to load state for block with hash %s: %s", block.Header.Hash(), err)
return err
}
err = c.blockState.CompareAndSetBlockData(&blockData)
if err != nil {
return fmt.Errorf("comparing and setting block data: %w", err)
}

if err := s.blockImportHandler.HandleBlockImport(block, state, announceImportedBlock); err != nil {
logger.Warnf("failed to handle block import: %s", err)
}
return nil
}

return nil
func (c *chainProcessor) processBlockDataWithStateHeaderAndBody(blockData types.BlockData, //nolint:revive
announceImportedBlock bool) (err error) {
// TODO: fix this; sometimes when the node shuts down the "best block" isn't stored properly,
// so when the node restarts it has blocks higher than what it thinks is the best, causing it not to sync
// if we update the node to only store finalised blocks in the database, this should be fixed and the entire
// code block can be removed (#1784)
block, err := c.blockState.GetBlockByHash(blockData.Hash)
if err != nil {
return fmt.Errorf("getting block by hash: %w", err)
}

logger.Debugf("processing block data with hash %s", bd.Hash)
err = c.blockState.AddBlockToBlockTree(block)
if errors.Is(err, blocktree.ErrBlockExists) {
logger.Debugf(
"block number %d with hash %s already exists in block tree, skipping it.",
block.Header.Number, blockData.Hash)
return nil
} else if err != nil {
return fmt.Errorf("adding block to blocktree: %w", err)
}

if bd.Header != nil && bd.Body != nil {
if err := s.babeVerifier.VerifyBlock(bd.Header); err != nil {
return err
if blockData.Justification != nil && len(*blockData.Justification) > 0 {
err = c.handleJustification(&block.Header, *blockData.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
}

s.handleBody(bd.Body)
// TODO: this is probably unnecessary, since the state is already in the database
// however, this case shouldn't be hit often, since it's only hit if the node state
// is rewinded or if the node shuts down unexpectedly (#1784)
state, err := c.storageState.TrieState(&block.Header.StateRoot)
if err != nil {
return fmt.Errorf("loading trie state: %w", err)
}

block := &types.Block{
Header: *bd.Header,
Body: *bd.Body,
}
err = c.blockImportHandler.HandleBlockImport(block, state, announceImportedBlock)
if err != nil {
return fmt.Errorf("handling block import: %w", err)
}

if err := s.handleBlock(block, announceImportedBlock); err != nil {
logger.Debugf("failed to handle block number %d: %s", block.Header.Number, err)
return err
}
return nil
}

logger.Debugf("block with hash %s processed", bd.Hash)
func (c *chainProcessor) processBlockDataWithHeaderAndBody(blockData types.BlockData, //nolint:revive
announceImportedBlock bool) (err error) {
err = c.babeVerifier.VerifyBlock(blockData.Header)
if err != nil {
return fmt.Errorf("babe verifying block: %w", err)
}

if bd.Justification != nil && bd.Header != nil {
logger.Debugf("handling Justification for block number %d with hash %s...", bd.Number(), bd.Hash)
err = s.handleJustification(bd.Header, *bd.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
c.handleBody(blockData.Body)

block := &types.Block{
Header: *blockData.Header,
Body: *blockData.Body,
}

if err := s.blockState.CompareAndSetBlockData(bd); err != nil {
return fmt.Errorf("failed to compare and set data: %w", err)
err = c.handleBlock(block, announceImportedBlock)
if err != nil {
return fmt.Errorf("handling block: %w", err)
}

return nil
Expand Down Expand Up @@ -263,9 +281,7 @@ func (s *chainProcessor) handleBlock(block *types.Block, announceImportedBlock b
}

func (s *chainProcessor) handleJustification(header *types.Header, justification []byte) (err error) {
if len(justification) == 0 {
return nil
}
qdm12 marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugf("handling justification for block %d...", header.Number)

headerHash := header.Hash()
returnedJustification, err := s.finalityGadget.VerifyBlockJustification(headerHash, justification)
Expand Down
14 changes: 4 additions & 10 deletions dot/sync/chain_processor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// process response
for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}

Expand All @@ -77,7 +77,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// process response
for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestChainProcessor_HandleBlockResponse_MissingBlocks(t *testing.T) {
require.NoError(t, err)

for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.True(t, errors.Is(err, errFailedToGetParent))
}
}
Expand All @@ -154,12 +154,6 @@ func TestChainProcessor_handleBody_ShouldRemoveIncludedExtrinsics(t *testing.T)
require.Nil(t, inQueue, "queue should be empty")
}

func TestChainProcessor_HandleBlockResponse_NoBlockData(t *testing.T) {
syncer := newTestSyncer(t)
err := syncer.chainProcessor.(*chainProcessor).processBlockData(nil)
require.Equal(t, ErrNilBlockData, err)
}

// TODO: add test against latest gssmr runtime
// See https://github.com/ChainSafe/gossamer/issues/2703
func TestChainProcessor_HandleBlockResponse_BlockData(t *testing.T) {
Expand All @@ -186,7 +180,7 @@ func TestChainProcessor_HandleBlockResponse_BlockData(t *testing.T) {
}

for _, bd := range msg.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}
}
Expand Down
Loading