Skip to content

Commit

Permalink
Merge pull request #5839 from onflow/leo/v0.33-check-local-collection…
Browse files Browse the repository at this point in the history
…-if-exist

Backport v0.33: [Execution] Check local collection exists before fetching
  • Loading branch information
zhangchiqing authored May 3, 2024
2 parents 451a31e + 1e55c82 commit c7570a2
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions engine/execution/ingestion/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,10 @@ func (e *Core) onBlockExecuted(
}

func (e *Core) onCollection(col *flow.Collection) error {
lg := e.log.With().
Hex("collection_id", logging.Entity(col)).
Logger()
lg.Info().Msgf("handle collection")
colID := col.ID()
e.log.Info().
Hex("collection_id", colID[:]).
Msgf("handle collection")
// EN might request a collection from multiple collection nodes,
// therefore might receive multiple copies of the same collection.
// we only need to store it once.
Expand All @@ -433,6 +433,10 @@ func (e *Core) onCollection(col *flow.Collection) error {
return fmt.Errorf("failed to store collection %v: %w", col.ID(), err)
}

return e.handleCollection(colID, col)
}

func (e *Core) handleCollection(colID flow.Identifier, col *flow.Collection) error {
// if the collection is a duplication, it's still good to add it to the block queue,
// because chances are the collection was stored before a restart, and
// is not in the queue after the restart.
Expand All @@ -444,8 +448,10 @@ func (e *Core) onCollection(col *flow.Collection) error {
return fmt.Errorf("unexpected error while adding collection to block queue")
}

lg.Debug().
e.log.Debug().
Hex("collection_id", colID[:]).
Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block")

e.executeConcurrently(executables)

return nil
Expand Down Expand Up @@ -506,15 +512,37 @@ func (e *Core) execute(ctx context.Context, executable *entity.ExecutableBlock)
}

func (e *Core) fetch(missingColls []*block_queue.MissingCollection) error {
missingCount := 0
for _, col := range missingColls {
err := e.collectionFetcher.FetchCollection(col.BlockID, col.Height, col.Guarantee)

// if we've requested this collection, we will store it in the storage,
// so check the storage to see whether we've seen it.
collection, err := e.collections.ByID(col.Guarantee.CollectionID)

if err == nil {
// we found the collection from storage, forward this collection to handler
err = e.handleCollection(col.Guarantee.CollectionID, collection)
if err != nil {
return fmt.Errorf("could not handle collection: %w", err)
}

continue
}

// check if there was exception
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("error while querying for collection: %w", err)
}

err = e.collectionFetcher.FetchCollection(col.BlockID, col.Height, col.Guarantee)
if err != nil {
return fmt.Errorf("failed to fetch collection %v for block %v (height: %v): %w",
col.Guarantee.ID(), col.BlockID, col.Height, err)
}
missingCount++
}

if len(missingColls) > 0 {
if missingCount > 0 {
e.collectionFetcher.Force()
}

Expand Down

0 comments on commit c7570a2

Please sign in to comment.