Skip to content

Commit

Permalink
Recover failed blobs in encoding streamer (#733)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Sep 4, 2024
1 parent 9e4a8a1 commit c8f3586
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (ics *indexedChainState) GetIndexedOperatorState(ctx context.Context, block
aggKeys := make(map[uint8]*core.G1Point)
for _, apk := range aggregatePublicKeys {
if apk.Err != nil {
ics.logger.Warn("Error getting aggregate public key", "err", apk.Err)
ics.logger.Error("Error getting aggregate public key", "err", apk.Err)
continue
}
if apk.Err == nil && apk.AggregatePubk != nil {
Expand Down
21 changes: 21 additions & 0 deletions disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error)

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
for _, metadata := range metadatas {
_ = e.handleFailedMetadata(ctx, metadata)
}
return nil, err
}

Expand All @@ -576,6 +579,9 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error)

_, err = batchHeader.SetBatchRoot(blobHeaders)
if err != nil {
for _, metadata := range metadatas {
_ = e.handleFailedMetadata(ctx, metadata)
}
return nil, err
}

Expand Down Expand Up @@ -717,6 +723,9 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) {

state, err := e.getOperatorState(timeoutCtx, metadatas, e.ReferenceBlockNumber)
if err != nil {
for _, metadata := range metadatas {
_ = e.handleFailedMetadata(ctx, metadata)
}
return nil, err
}

Expand All @@ -728,6 +737,9 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) {

tree, err := batchHeader.SetBatchRoot(blobHeaders)
if err != nil {
for _, metadata := range metadatas {
_ = e.handleFailedMetadata(ctx, metadata)
}
return nil, err
}

Expand All @@ -743,6 +755,15 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) {
}, nil
}

func (e *EncodingStreamer) handleFailedMetadata(ctx context.Context, metadata *disperser.BlobMetadata) error {
err := e.blobStore.MarkBlobProcessing(ctx, metadata.GetBlobKey())
if err != nil {
e.logger.Error("error marking blob as processing", "err", err)
}

return err
}

func (e *EncodingStreamer) transitionBlobToDispersing(ctx context.Context, metadata *disperser.BlobMetadata) error {
blobKey := metadata.GetBlobKey()
err := e.blobStore.MarkBlobDispersing(ctx, blobKey)
Expand Down

0 comments on commit c8f3586

Please sign in to comment.