-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Execution] Ingestion - add ingestion core #5288
Conversation
0b6a5ba
to
0ed04d1
Compare
1385460
to
8573168
Compare
2e1f681
to
2ddeaf7
Compare
d9658cc
to
c2253ad
Compare
8b7b15b
to
732360d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly minor comments.
// when the block queue decides to fetch missing collections, it forwards to the collection fetcher | ||
// when a block is executed, it notifies the block queue and forwards to execution state to save them. | ||
type Core struct { | ||
unit *engine.Unit // for async block execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to avoid using unit
or would that require a refactor of other modules as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried, but wasn't as easy as using unit and cannot get it to work. See my changes in this PR
engine/execution/ingestion/core.go
Outdated
} | ||
|
||
func (e *Core) enqueuBlock(block *flow.Block, blockID flow.Identifier) ( | ||
[]*block_queue.MissingCollection, []*entity.ExecutableBlock, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[]*block_queue.MissingCollection, []*entity.ExecutableBlock, error) { | |
[]*block_queue.MissingCollection, | |
[]*entity.ExecutableBlock, | |
error, | |
) { |
engine/execution/ingestion/core.go
Outdated
// qc.Block is equivalent to header.ID() | ||
err := e.throttle.OnBlock(qc.BlockID) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msgf("error processing block %v (%v)", header.Height, header.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
e.log.Fatal().Err(err).Msgf("error processing block %v (%v)", header.Height, header.ID()) | |
e.log.Fatal().Err(err).Msgf("error processing block %v (%v)", header.Height, qc.BlockID) |
engine/execution/ingestion/core.go
Outdated
|
||
// handle exception | ||
if !errors.Is(err, storage.ErrNotFound) { | ||
return nil, nil, fmt.Errorf("failed to get parent state commitment for block %v: %w", block.Header.ParentID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this list the enqueued block?
return nil, nil, fmt.Errorf("failed to get parent state commitment for block %v: %w", block.Header.ParentID, err) | |
return nil, nil, fmt.Errorf("failed to get parent state commitment for block %v: %w", blockID, err) |
engine/execution/ingestion/core.go
Outdated
|
||
parentCommitment, err := e.execState.StateCommitmentByBlockID(block.Header.ParentID) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to get parent state commitment when re-enqueue block %v (parent:%v): %w", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, nil, fmt.Errorf("failed to get parent state commitment when re-enqueue block %v (parent:%v): %w", | |
return nil, nil, fmt.Errorf("failed to get parent state commitment when re-enqueue block %v (parent: %v): %w", |
engine/execution/ingestion/core.go
Outdated
// and before HandleBlock was called, therefore, we should re-enqueue the block with the | ||
// parent commit. It's necessary to check again whether the parent block is executed after the call. | ||
lg.Warn().Msgf( | ||
"block is missing parent block, re-enqueueing %v(parent: %v)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"block is missing parent block, re-enqueueing %v(parent: %v)", | |
"block is missing parent block, re-enqueueing %v (parent: %v)", |
|
||
wg := sync.WaitGroup{} | ||
wg.Add(1) | ||
defer wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be synchronized so BeforeComputationResultSaved
is guaranteed to be called before OnComputationResultSaved
? it's currently possible for them to be out of order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not need to be synchronized, in fact we want them to be performed parallel.
The use case is to performance some action while saving the execution result. In our scenario, it's to upload the execution result to GCP (which is deprecated). Since the execution result is deterministic, it's OK to upload while saving it to database. I called it Before...Saved
just to indicate this callback can be used to performance actions that are parallel to saving the execution result.
732360d
to
9a78f53
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5288 +/- ##
==========================================
- Coverage 55.65% 55.51% -0.14%
==========================================
Files 1041 1042 +1
Lines 101935 102210 +275
==========================================
+ Hits 56729 56744 +15
- Misses 40846 41110 +264
+ Partials 4360 4356 -4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
ba562db
to
e2a16a6
Compare
return e.unit.Done() | ||
} | ||
|
||
func (e *Core) OnBlock(header *flow.Header, qc *flow.QuorumCertificate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnBlock
and OnCollection
are not supposed to block right? should these just push to a channel?
They could be made using the NewComponentManagerBuilder. This way the fatal error log can instead be a irrecoverable.SignalerContext.Throw
. See StopControl.BlockFinalized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OnBlock
will pass the block to throttle.OnBlock
which is pushing the block to a processables
channel, which currently can hold 10000 blocks. If the channel gets full, then yes it will block, which is probably good, since it's more or less a bug. Note processing a block from the processables
channel is to add it to the block queue, which is just in-memory operation, it doesn't involve block execution or heavy IO, so should be fast. If consuming a block from the processables
channel is fast, but the processables
channel is still full, then it's good to block, and slow down.
OnCollection
is called by the Collection Fetcher, which uses a common requester engine to fetch collection. Once collection is received, it creates a goroutine to deliver the collection to the ingestion core. So, the OnCollection method is just blocking that goroutine, it won't block any engine. Note, this is the same behavior as the old ingestion engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way the fatal error log can instead be a irrecoverable.SignalerContext.Throw. See StopControl.BlockFinalized
Right, in terms of error handling (using irrecoverable.Throw), I will try in a separate PR along with replacing the old engine.Unit with component manager, since this PR still uses the old engine.Unit.
Working towards #5297
The ingestion core implements the main business logic of execution node continuously executing blocks.