diff --git a/core/thegraph/state.go b/core/thegraph/state.go index 7a90cfd2c..b0f99fe54 100644 --- a/core/thegraph/state.go +++ b/core/thegraph/state.go @@ -133,7 +133,7 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block aggKeys := make(map[uint8]*core.G1Point) for _, apk := range aggregatePublicKeys { if apk.Err != nil { - ics.logger.Warn("Error getting aggregate public key", "err", apk.Err) + ics.logger.Error("Error getting aggregate public key", "err", apk.Err) continue } if apk.Err == nil && apk.AggregatePubk != nil { diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index a4b5c10e2..46f0ed794 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -565,6 +565,9 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber) if err != nil { + for _, metadata := range metadatas { + _ = e.handleFailedMetadata(ctx, metadata) + } return nil, err } @@ -576,6 +579,9 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) _, err = batchHeader.SetBatchRoot(blobHeaders) if err != nil { + for _, metadata := range metadatas { + _ = e.handleFailedMetadata(ctx, metadata) + } return nil, err } @@ -717,6 +723,9 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber) if err != nil { + for _, metadata := range metadatas { + _ = e.handleFailedMetadata(ctx, metadata) + } return nil, err } @@ -728,6 +737,9 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { tree, err := batchHeader.SetBatchRoot(blobHeaders) if err != nil { + for _, metadata := range metadatas { + _ = e.handleFailedMetadata(ctx, metadata) + } return nil, err } @@ -743,6 +755,15 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { }, nil } +func (e *EncodingStreamer) handleFailedMetadata(ctx context.Context, metadata *disperser.BlobMetadata) error { + err := e.blobStore.MarkBlobProcessing(ctx, metadata.GetBlobKey()) + if err != nil { + e.logger.Error("error marking blob as processing", "err", err) + } + + return err +} + func (e *EncodingStreamer) transitionBlobToDispersing(ctx context.Context, metadata *disperser.BlobMetadata) error { blobKey := metadata.GetBlobKey() err := e.blobStore.MarkBlobDispersing(ctx, blobKey)