From 1e55c82494f53a1d3ddd403e3a5b14770bd83840 Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Thu, 2 May 2024 22:58:11 -0700 Subject: [PATCH] check local collection exists before fetching --- engine/execution/ingestion/core.go | 42 +++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/engine/execution/ingestion/core.go b/engine/execution/ingestion/core.go index c0244d29b86..da118d87e38 100644 --- a/engine/execution/ingestion/core.go +++ b/engine/execution/ingestion/core.go @@ -421,10 +421,10 @@ func (e *Core) onBlockExecuted( } func (e *Core) onCollection(col *flow.Collection) error { - lg := e.log.With(). - Hex("collection_id", logging.Entity(col)). - Logger() - lg.Info().Msgf("handle collection") + colID := col.ID() + e.log.Info(). + Hex("collection_id", colID[:]). + Msgf("handle collection") // EN might request a collection from multiple collection nodes, // therefore might receive multiple copies of the same collection. // we only need to store it once. @@ -433,6 +433,10 @@ func (e *Core) onCollection(col *flow.Collection) error { return fmt.Errorf("failed to store collection %v: %w", col.ID(), err) } + return e.handleCollection(colID, col) +} + +func (e *Core) handleCollection(colID flow.Identifier, col *flow.Collection) error { // if the collection is a duplication, it's still good to add it to the block queue, // because chances are the collection was stored before a restart, and // is not in the queue after the restart. @@ -444,8 +448,10 @@ func (e *Core) onCollection(col *flow.Collection) error { return fmt.Errorf("unexpected error while adding collection to block queue") } - lg.Debug(). + e.log.Debug(). + Hex("collection_id", colID[:]). Int("executables", len(executables)).Msgf("executeConcurrently: collection is handled, ready to execute block") + e.executeConcurrently(executables) return nil @@ -506,15 +512,37 @@ func (e *Core) execute(ctx context.Context, executable *entity.ExecutableBlock) } func (e *Core) fetch(missingColls []*block_queue.MissingCollection) error { + missingCount := 0 for _, col := range missingColls { - err := e.collectionFetcher.FetchCollection(col.BlockID, col.Height, col.Guarantee) + + // if we've requested this collection, we will store it in the storage, + // so check the storage to see whether we've seen it. + collection, err := e.collections.ByID(col.Guarantee.CollectionID) + + if err == nil { + // we found the collection from storage, forward this collection to handler + err = e.handleCollection(col.Guarantee.CollectionID, collection) + if err != nil { + return fmt.Errorf("could not handle collection: %w", err) + } + + continue + } + + // check if there was exception + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("error while querying for collection: %w", err) + } + + err = e.collectionFetcher.FetchCollection(col.BlockID, col.Height, col.Guarantee) if err != nil { return fmt.Errorf("failed to fetch collection %v for block %v (height: %v): %w", col.Guarantee.ID(), col.BlockID, col.Height, err) } + missingCount++ } - if len(missingColls) > 0 { + if missingCount > 0 { e.collectionFetcher.Force() }