Skip to content

Commit

Permalink
chore(dot/sync): simplify processBlockData and associated tests (#2811
Browse files Browse the repository at this point in the history
)

- `processBlockData`:
  - Split subblocks into functions:
    - `processBlockDataWithHeaderAndBody` function with test
    - `processBlockDataWithStateHeaderAndBody` function with test
  - takes block data `blockData` as value argument instead of pointer
  - Review error wrappings
  - Remove block data nil check
  - Rename receiver from `s` to `c` for `chainProcessor`
- Remove unneeded unit test cases for `processBlockData` (now covered in subfunctions associated tests)
- Push (possibly unneeded) justification length check up in caller of `handleJustification`.
- Return error on block import error
- Fix position of debug logs
  • Loading branch information
qdm12 authored Nov 15, 2022
1 parent 94dcef8 commit 8a17f37
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 428 deletions.
10 changes: 6 additions & 4 deletions dot/sync/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ func (bq *blockQueue) push(blockData *types.BlockData) {
bq.queue <- blockData
}

// pop pops an item from the queue. It blocks if the queue is empty.
func (bq *blockQueue) pop(ctx context.Context) (blockData *types.BlockData) {
// pop pops the next item from the queue. It blocks if the queue is empty
// until the context is cancelled. If the context is canceled, it returns
// the error from the context.
func (bq *blockQueue) pop(ctx context.Context) (blockData *types.BlockData, err error) {
select {
case <-ctx.Done():
return nil
return blockData, ctx.Err()
case blockData = <-bq.queue:
}
bq.hashesSetMutex.Lock()
delete(bq.hashesSet, blockData.Hash)
bq.hashesSetMutex.Unlock()
return blockData
return blockData, nil
}

func (bq *blockQueue) has(blockHash common.Hash) (has bool) {
Expand Down
14 changes: 9 additions & 5 deletions dot/sync/block_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func Test_blockQueue_pop(t *testing.T) {
const capacity = 1
bq := newBlockQueue(capacity)

blockData := bq.pop(ctx)
blockData, err := bq.pop(ctx)
assert.Nil(t, blockData)
assert.ErrorIs(t, err, context.Canceled)
})

t.Run("get block data after waiting", func(t *testing.T) {
Expand All @@ -92,12 +93,13 @@ func Test_blockQueue_pop(t *testing.T) {
bq.push(blockData)
})

blockData := bq.pop(ctx)
blockData, err := bq.pop(ctx)

expectedBlockData := &types.BlockData{
Hash: common.Hash{1},
}
assert.Equal(t, expectedBlockData, blockData)
assert.NoError(t, err)

assert.Len(t, bq.queue, 0)
bq.queue = nil
Expand Down Expand Up @@ -158,8 +160,9 @@ func Test_lockQueue_endToEnd(t *testing.T) {
blockQueue.push(newBlockData(2))
blockQueue.push(newBlockData(3))

blockData := blockQueue.pop(context.Background())
blockData, err := blockQueue.pop(context.Background())
assert.Equal(t, newBlockData(1), blockData)
assert.NoError(t, err)

has := blockQueue.has(newBlockData(2).Hash)
assert.True(t, has)
Expand All @@ -171,8 +174,9 @@ func Test_lockQueue_endToEnd(t *testing.T) {
has = blockQueue.has(newBlockData(4).Hash)
assert.True(t, has)

blockData = blockQueue.pop(context.Background())
blockData, err = blockQueue.pop(context.Background())
assert.Equal(t, newBlockData(2), blockData)
assert.NoError(t, err)

// drain queue
for len(blockQueue.queue) > 0 {
Expand Down Expand Up @@ -236,7 +240,7 @@ func Test_lockQueue_threadSafety(t *testing.T) {
})

go runInLoop(func() {
_ = blockQueue.pop(ctx)
_, _ = blockQueue.pop(ctx)
})

go runInLoop(func() {
Expand Down
158 changes: 87 additions & 71 deletions dot/sync/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ func (s *chainProcessor) stop() {

func (s *chainProcessor) processReadyBlocks() {
for {
bd := s.readyBlocks.pop(s.ctx)
if s.ctx.Err() != nil {
return
bd, err := s.readyBlocks.pop(s.ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
panic(fmt.Sprintf("unhandled error: %s", err))
}

if err := s.processBlockData(bd); err != nil {
if err := s.processBlockData(*bd); err != nil {
// depending on the error, we might want to save this block for later
if !errors.Is(err, errFailedToGetParent) {
logger.Errorf("block data processing for block with hash %s failed: %s", bd.Hash, err)
Expand All @@ -108,101 +111,116 @@ func (s *chainProcessor) processReadyBlocks() {
// processBlockData processes the BlockData from a BlockResponse and
// returns the index of the last BlockData it handled on success,
// or the index of the block data that errored on failure.
func (s *chainProcessor) processBlockData(bd *types.BlockData) error {
if bd == nil {
return ErrNilBlockData
}
func (c *chainProcessor) processBlockData(blockData types.BlockData) error { //nolint:revive
logger.Debugf("processing block data with hash %s", blockData.Hash)

hasHeader, err := s.blockState.HasHeader(bd.Hash)
headerInState, err := c.blockState.HasHeader(blockData.Hash)
if err != nil {
return fmt.Errorf("failed to check if block state has header for hash %s: %w", bd.Hash, err)
return fmt.Errorf("checking if block state has header: %w", err)
}
hasBody, err := s.blockState.HasBlockBody(bd.Hash)

bodyInState, err := c.blockState.HasBlockBody(blockData.Hash)
if err != nil {
return fmt.Errorf("failed to check block state has body for hash %s: %w", bd.Hash, err)
return fmt.Errorf("checking if block state has body: %w", err)
}

// while in bootstrap mode we don't need to broadcast block announcements
announceImportedBlock := s.chainSync.syncState() == tip
if hasHeader && hasBody {
// TODO: fix this; sometimes when the node shuts down the "best block" isn't stored properly,
// so when the node restarts it has blocks higher than what it thinks is the best, causing it not to sync
// if we update the node to only store finalised blocks in the database, this should be fixed and the entire
// code block can be removed (#1784)
block, err := s.blockState.GetBlockByHash(bd.Hash)
announceImportedBlock := c.chainSync.syncState() == tip
if headerInState && bodyInState {
err = c.processBlockDataWithStateHeaderAndBody(blockData, announceImportedBlock)
if err != nil {
logger.Debugf("failed to get block header for hash %s: %s", bd.Hash, err)
return err
return fmt.Errorf("processing block data with header and "+
"body in block state: %w", err)
}
return nil
}

logger.Debugf(
"skipping block number %d with hash %s, already have",
block.Header.Number, bd.Hash) // TODO is this valid?

err = s.blockState.AddBlockToBlockTree(block)
if errors.Is(err, blocktree.ErrBlockExists) {
return nil
} else if err != nil {
logger.Warnf("failed to add block with hash %s to blocktree: %s", bd.Hash, err)
return err
if blockData.Header != nil {
if blockData.Body != nil {
err = c.processBlockDataWithHeaderAndBody(blockData, announceImportedBlock)
if err != nil {
return fmt.Errorf("processing block data with header and body: %w", err)
}
logger.Debugf("block with hash %s processed", blockData.Hash)
}

if bd.Justification != nil {
logger.Debugf("handling Justification for block number %d with hash %s...", block.Header.Number, bd.Hash)
err = s.handleJustification(&block.Header, *bd.Justification)
if blockData.Justification != nil && len(*blockData.Justification) > 0 {
err = c.handleJustification(blockData.Header, *blockData.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
}
}

// TODO: this is probably unnecessary, since the state is already in the database
// however, this case shouldn't be hit often, since it's only hit if the node state
// is rewinded or if the node shuts down unexpectedly (#1784)
state, err := s.storageState.TrieState(&block.Header.StateRoot)
if err != nil {
logger.Warnf("failed to load state for block with hash %s: %s", block.Header.Hash(), err)
return err
}
err = c.blockState.CompareAndSetBlockData(&blockData)
if err != nil {
return fmt.Errorf("comparing and setting block data: %w", err)
}

if err := s.blockImportHandler.HandleBlockImport(block, state, announceImportedBlock); err != nil {
logger.Warnf("failed to handle block import: %s", err)
}
return nil
}

return nil
func (c *chainProcessor) processBlockDataWithStateHeaderAndBody(blockData types.BlockData, //nolint:revive
announceImportedBlock bool) (err error) {
// TODO: fix this; sometimes when the node shuts down the "best block" isn't stored properly,
// so when the node restarts it has blocks higher than what it thinks is the best, causing it not to sync
// if we update the node to only store finalised blocks in the database, this should be fixed and the entire
// code block can be removed (#1784)
block, err := c.blockState.GetBlockByHash(blockData.Hash)
if err != nil {
return fmt.Errorf("getting block by hash: %w", err)
}

logger.Debugf("processing block data with hash %s", bd.Hash)
err = c.blockState.AddBlockToBlockTree(block)
if errors.Is(err, blocktree.ErrBlockExists) {
logger.Debugf(
"block number %d with hash %s already exists in block tree, skipping it.",
block.Header.Number, blockData.Hash)
return nil
} else if err != nil {
return fmt.Errorf("adding block to blocktree: %w", err)
}

if bd.Header != nil && bd.Body != nil {
if err := s.babeVerifier.VerifyBlock(bd.Header); err != nil {
return err
if blockData.Justification != nil && len(*blockData.Justification) > 0 {
err = c.handleJustification(&block.Header, *blockData.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
}

s.handleBody(bd.Body)
// TODO: this is probably unnecessary, since the state is already in the database
// however, this case shouldn't be hit often, since it's only hit if the node state
// is rewinded or if the node shuts down unexpectedly (#1784)
state, err := c.storageState.TrieState(&block.Header.StateRoot)
if err != nil {
return fmt.Errorf("loading trie state: %w", err)
}

block := &types.Block{
Header: *bd.Header,
Body: *bd.Body,
}
err = c.blockImportHandler.HandleBlockImport(block, state, announceImportedBlock)
if err != nil {
return fmt.Errorf("handling block import: %w", err)
}

if err := s.handleBlock(block, announceImportedBlock); err != nil {
logger.Debugf("failed to handle block number %d: %s", block.Header.Number, err)
return err
}
return nil
}

logger.Debugf("block with hash %s processed", bd.Hash)
func (c *chainProcessor) processBlockDataWithHeaderAndBody(blockData types.BlockData, //nolint:revive
announceImportedBlock bool) (err error) {
err = c.babeVerifier.VerifyBlock(blockData.Header)
if err != nil {
return fmt.Errorf("babe verifying block: %w", err)
}

if bd.Justification != nil && bd.Header != nil {
logger.Debugf("handling Justification for block number %d with hash %s...", bd.Number(), bd.Hash)
err = s.handleJustification(bd.Header, *bd.Justification)
if err != nil {
return fmt.Errorf("handling justification: %w", err)
}
c.handleBody(blockData.Body)

block := &types.Block{
Header: *blockData.Header,
Body: *blockData.Body,
}

if err := s.blockState.CompareAndSetBlockData(bd); err != nil {
return fmt.Errorf("failed to compare and set data: %w", err)
err = c.handleBlock(block, announceImportedBlock)
if err != nil {
return fmt.Errorf("handling block: %w", err)
}

return nil
Expand Down Expand Up @@ -263,9 +281,7 @@ func (s *chainProcessor) handleBlock(block *types.Block, announceImportedBlock b
}

func (s *chainProcessor) handleJustification(header *types.Header, justification []byte) (err error) {
if len(justification) == 0 {
return nil
}
logger.Debugf("handling justification for block %d...", header.Number)

headerHash := header.Hash()
returnedJustification, err := s.finalityGadget.VerifyBlockJustification(headerHash, justification)
Expand Down
14 changes: 4 additions & 10 deletions dot/sync/chain_processor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// process response
for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}

Expand All @@ -77,7 +77,7 @@ func TestChainProcessor_HandleBlockResponse_ValidChain(t *testing.T) {

// process response
for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func TestChainProcessor_HandleBlockResponse_MissingBlocks(t *testing.T) {
require.NoError(t, err)

for _, bd := range resp.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.True(t, errors.Is(err, errFailedToGetParent))
}
}
Expand All @@ -154,12 +154,6 @@ func TestChainProcessor_handleBody_ShouldRemoveIncludedExtrinsics(t *testing.T)
require.Nil(t, inQueue, "queue should be empty")
}

func TestChainProcessor_HandleBlockResponse_NoBlockData(t *testing.T) {
syncer := newTestSyncer(t)
err := syncer.chainProcessor.(*chainProcessor).processBlockData(nil)
require.Equal(t, ErrNilBlockData, err)
}

// TODO: add test against latest gssmr runtime
// See https://github.com/ChainSafe/gossamer/issues/2703
func TestChainProcessor_HandleBlockResponse_BlockData(t *testing.T) {
Expand All @@ -186,7 +180,7 @@ func TestChainProcessor_HandleBlockResponse_BlockData(t *testing.T) {
}

for _, bd := range msg.BlockData {
err = syncer.chainProcessor.(*chainProcessor).processBlockData(bd)
err = syncer.chainProcessor.(*chainProcessor).processBlockData(*bd)
require.NoError(t, err)
}
}
Expand Down
Loading

0 comments on commit 8a17f37

Please sign in to comment.