Skip to content

Commit

Permalink
fix stop control to check when ingestion engine is about to execute a…
Browse files Browse the repository at this point in the history
… block
  • Loading branch information
zhangchiqing committed Feb 1, 2024
1 parent adbf9d0 commit c21cda3
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
13 changes: 10 additions & 3 deletions engine/execution/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ func (e *Engine) BlockProcessable(b *flow.Header, _ *flow.QuorumCertificate) {

// TODO: this should not be blocking: https://github.com/onflow/flow-go/issues/4400

// skip if stopControl tells to skip
// skip if stopControl tells to skip, so that we can avoid fetching collections
// for this block
if !e.stopControl.ShouldExecuteBlock(b) {
return
}
Expand Down Expand Up @@ -363,6 +364,12 @@ func (e *Engine) executeBlock(
ctx context.Context,
executableBlock *entity.ExecutableBlock,
) {

// don't execute the block if the stop control says no
if !e.stopControl.ShouldExecuteBlock(executableBlock.Block.Header) {
return
}

lg := e.log.With().
Hex("block_id", logging.Entity(executableBlock)).
Uint64("height", executableBlock.Block.Header.Height).
Expand Down Expand Up @@ -445,6 +452,8 @@ func (e *Engine) executeBlock(
Int64("timeSpentInMS", time.Since(startedAt).Milliseconds()).
Msg("block executed")

e.stopControl.OnBlockExecuted(executableBlock.Block.Header)

err = e.onBlockExecuted(executableBlock, finalEndState)
if err != nil {
lg.Err(err).Msg("failed in process block's children")
Expand All @@ -454,8 +463,6 @@ func (e *Engine) executeBlock(
e.executionDataPruner.NotifyFulfilledHeight(executableBlock.Height())
}

e.stopControl.OnBlockExecuted(executableBlock.Block.Header)

e.unit.Ctx()

}
Expand Down
7 changes: 5 additions & 2 deletions engine/execution/state/unittest/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@ func ComputationResultForBlockFixture(

numberOfChunks := len(collections) + 1
ceds := make([]*execution_data.ChunkExecutionData, numberOfChunks)
startState := *completeBlock.StartState
for i := 0; i < numberOfChunks; i++ {
ceds[i] = unittest.ChunkExecutionDataFixture(t, 1024)
endState := unittest.StateCommitmentFixture()
computationResult.CollectionExecutionResultAt(i).UpdateExecutionSnapshot(StateInteractionsFixture())
computationResult.AppendCollectionAttestationResult(
*completeBlock.StartState,
*completeBlock.StartState,
startState,
endState,
nil,
unittest.IdentifierFixture(),
ceds[i],
)
startState = endState
}
bed := unittest.BlockExecutionDataFixture(
unittest.WithBlockExecutionDataBlockID(completeBlock.Block.ID()),
Expand Down

0 comments on commit c21cda3

Please sign in to comment.