diff --git a/core/data.go b/core/data.go index aeaf082bd..bff1bf23f 100644 --- a/core/data.go +++ b/core/data.go @@ -475,14 +475,14 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { // PaymentMetadata represents the header information for a blob type PaymentMetadata struct { - // Existing fields - AccountID string + // AccountID is the ETH account address for the payer + AccountID string `json:"account_id"` - // New fields - BinIndex uint32 + // BinIndex represents the range of time at which the dispersal is made + BinIndex uint32 `json:"bin_index"` // TODO: we are thinking the contract can use uint128 for cumulative payment, // but the definition on v2 uses uint64. Double check with team. - CumulativePayment uint64 + CumulativePayment uint64 `json:"cumulative_payment"` } // Hash returns the Keccak256 hash of the PaymentMetadata diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go new file mode 100644 index 000000000..8b87cb71f --- /dev/null +++ b/disperser/common/v2/blob.go @@ -0,0 +1,51 @@ +package v2 + +import ( + "encoding/hex" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/encoding" +) + +type BlobStatus uint + +const ( + Queued BlobStatus = iota + Encoded + Certified + Failed +) + +type BlobVersion uint32 + +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + b, err := hex.DecodeString(h) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + +type BlobHeader struct { + BlobVersion BlobVersion `json:"version"` + BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` + BlobCommitment encoding.BlobCommitments `json:"commitments"` + + core.PaymentMetadata `json:"payment_metadata"` +} + +type BlobMetadata struct { + BlobHeader `json:"blob_header"` + + BlobStatus BlobStatus `json:"blob_status"` + Expiry uint64 `json:"expiry"` + NumRetries uint `json:"num_retries"` + BlobSize uint64 `json:"blob_size"` + RequestedAt uint64 `json:"requested_at"` +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2.go b/disperser/common/v2/blobstore/blob_metadata_store_v2.go new file mode 100644 index 000000000..6808ef90f --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_v2.go @@ -0,0 +1,551 @@ +package blobstore + +import ( + "context" + "fmt" + "strconv" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/disperser" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +const ( + StatusIndexName = "StatusIndex" + OperatorDispersalIndexName = "OperatorDispersalIndex" + OperatorResponseIndexName = "OperatorResponseIndex" +) + +// BlobMetadataStore is a blob metadata storage backed by DynamoDB +type BlobMetadataStore struct { + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration +} + +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &BlobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + +func (s *BlobMetadataStore) CreateBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(blobMetadata) + if err != nil { + return err + } + + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) +} + +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*v2.BlobMetadata, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + "SK": &types.AttributeValueMemberS{ + Value: blobKey.MetadataHash, + }, + }) + + if item == nil { + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey) + } + + if err != nil { + return nil, err + } + + metadata, err := UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// GetBulkBlobMetadata returns the metadata for the given blob keys +// Note: ordering of items is not guaranteed +func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*v2.BlobMetadata, error) { + keys := make([]map[string]types.AttributeValue, len(blobKeys)) + for i := 0; i < len(blobKeys); i += 1 { + keys[i] = map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash}, + "MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash}, + } + } + items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataByStatus returns all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. +func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }}) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. +func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }) + if err != nil { + return 0, err + } + + return count, nil +} + +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*v2.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMap(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, limit, attributeMap) + + if err != nil { + return nil, nil, err + } + + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + + metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, nil, err + } + } + + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + + // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil +} + +func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash", commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("there is no metadata for batch %x", batchHeaderHash) + } + + metadatas := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadatas[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadatas, nil +} + +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query +func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination( + ctx context.Context, + batchHeaderHash [32]byte, + limit int32, + exclusiveStartKey *disperser.BatchIndexExclusiveStartKey, +) ([]*v2.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination( + ctx, + s.tableName, + batchIndexName, + "BatchHeaderHash = :batch_header_hash", + commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + }, + limit, + attributeMap, + ) + if err != nil { + return nil, nil, err + } + + s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey) + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + + metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, nil, err + } + } + + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + + // Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil +} + +func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + ":blob_index": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(blobIndex)), + }}) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex) + } + + if len(items) > 1 { + s.logger.Error("there are multiple metadata for batch %s and blob index %d", hexutil.Encode(batchHeaderHash[:]), blobIndex) + } + + metadata, err := UnmarshalBlobMetadata(items[0]) + if err != nil { + return nil, err + } + return metadata, nil +} + +func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *v2.BlobMetadata) error { + _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: existingMetadata.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: existingMetadata.MetadataHash, + }, + }, commondynamodb.Item{ + "NumRetries": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(existingMetadata.NumRetries + 1)), + }, + }) + + return err +} + +func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *v2.BlobMetadata, confirmationBlockNumber uint32) error { + updated := *existingMetadata + if updated.ConfirmationInfo == nil { + return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for blob key %s", existingMetadata.GetBlobKey().String()) + } + + updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber + item, err := MarshalBlobMetadata(&updated) + if err != nil { + return err + } + + _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: existingMetadata.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: existingMetadata.MetadataHash, + }, + }, item) + + return err +} + +func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(updated) + if err != nil { + return err + } + + _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: metadataKey.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: metadataKey.MetadataHash, + }, + }, item) + + return err +} + +func (s *BlobMetadataStore) SetBlobStatus(ctx context.Context, metadataKey disperser.BlobKey, status disperser.BlobStatus) error { + _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: metadataKey.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: metadataKey.MetadataHash, + }, + }, commondynamodb.Item{ + "BlobStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + + return err +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(StatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RequestedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorDispersalIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("DispersedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorResponseIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RespondedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { + return attributevalue.MarshalMap(metadata) +} + +func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { + metadata := v2.BlobMetadata{} + err := attributevalue.UnmarshalMap(item, &metadata) + if err != nil { + return nil, err + } + + return &metadata, nil +} + +func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + +func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + +func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} + +func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go new file mode 100644 index 000000000..c1b1c92b7 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go @@ -0,0 +1,444 @@ +package blobstore_test + +import ( + "context" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func TestBlobMetadataStoreOperations(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + now := time.Now() + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2}) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchBulk[0]) + assert.Equal(t, metadata2, fetchBulk[1]) + + processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) + + processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Equal(t, int32(1), processingCount) + + err = blobMetadataStore.IncrementNumRetries(ctx, metadata1) + assert.NoError(t, err) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + metadata1.NumRetries = 1 + assert.Equal(t, metadata1, fetchedMetadata) + + finalized, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Finalized) + assert.NoError(t, err) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) + + finalizedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Finalized) + assert.NoError(t, err) + assert.Equal(t, int32(1), finalizedCount) + + confirmedMetadata := getConfirmedMetadata(t, metadata1, 1) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) + assert.NoError(t, err) + + metadata, err := blobMetadataStore.GetBlobMetadataInBatch(ctx, confirmedMetadata.ConfirmationInfo.BatchHeaderHash, confirmedMetadata.ConfirmationInfo.BlobIndex) + assert.NoError(t, err) + assert.Equal(t, metadata, confirmedMetadata) + + confirmedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Confirmed) + assert.NoError(t, err) + assert.Equal(t, int32(1), confirmedCount) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + now := time.Now() + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) + assert.NotNil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) + assert.NoError(t, err) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) + assert.NotNil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, finalized, 0) + assert.Nil(t, lastEvaluatedKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + expiry := uint64(time.Now().Add(time.Hour).Unix()) + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + confirmedMetadata1 := getConfirmedMetadata(t, metadata1, 1) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1) + assert.NoError(t, err) + + confirmedMetadata2 := getConfirmedMetadata(t, metadata2, 2) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2) + assert.NoError(t, err) + + // Fetch the blob metadata with limit 1 + metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Get the next blob metadata with limit 1 and the exclusive start key + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata2) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetching the next blob metadata should return an empty list + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Len(t, metadata, 0) + assert.Nil(t, exclusiveStartKey) + + // Fetch the blob metadata with limit 2 + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetch the blob metadata with limit 3 should return only 2 items + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.Nil(t, exclusiveStartKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { + ctx := context.Background() + // Query BlobMetadataStore for a blob that does not exist + // This should return nil for both the blob and lastEvaluatedKey + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Nil(t, processing) + assert.Nil(t, lastEvaluatedKey) +} + +func TestShadowWriteBlobMetadata(t *testing.T) { + ctx := context.Background() + + blobKey := disperser.BlobKey{ + BlobHash: "shadowblob", + MetadataHash: "shadowhash", + } + expiry := uint64(time.Now().Add(time.Hour).Unix()) + metadata := &disperser.BlobMetadata{ + MetadataHash: blobKey.MetadataHash, + BlobHash: blobKey.BlobHash, + BlobStatus: disperser.Processing, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) + assert.NoError(t, err) + primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) + + // Check that the shadow metadata exists but status has NOT been updated + shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ + "MetadataHash": &types.AttributeValueMemberS{ + Value: blobKey.MetadataHash, + }, + "BlobHash": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + }) + assert.NoError(t, err) + shadowMetadata := disperser.BlobMetadata{} + err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + }) +} + +func TestFilterOutExpiredBlobMetadata(t *testing.T) { + ctx := context.Background() + + blobKey := disperser.BlobKey{ + BlobHash: "blob1", + MetadataHash: "hash1", + } + now := time.Now() + metadata := &disperser.BlobMetadata{ + MetadataHash: blobKey.MetadataHash, + BlobHash: blobKey.BlobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(-1).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Add(-1000).Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + + processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Len(t, processing, 0) + + processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Equal(t, int32(0), processingCount) + + processing, _, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, nil) + assert.NoError(t, err) + assert.Len(t, processing, 0) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + }) +} + +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) +} + +func getConfirmedMetadata(t *testing.T, metadata *disperser.BlobMetadata, blobIndex uint32) *disperser.BlobMetadata { + batchHeaderHash := [32]byte{1, 2, 3} + var commitX, commitY fp.Element + _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") + assert.NoError(t, err) + _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") + assert.NoError(t, err) + commitment := &encoding.G1Commitment{ + X: commitX, + Y: commitY, + } + dataLength := 32 + batchID := uint32(99) + batchRoot := []byte("hello") + referenceBlockNumber := uint32(132) + confirmationBlockNumber := uint32(150) + sigRecordHash := [32]byte{0} + fee := []byte{0} + inclusionProof := []byte{1, 2, 3, 4, 5} + confirmationInfo := &disperser.ConfirmationInfo{ + BatchHeaderHash: batchHeaderHash, + BlobIndex: blobIndex, + SignatoryRecordHash: sigRecordHash, + ReferenceBlockNumber: referenceBlockNumber, + BatchRoot: batchRoot, + BlobInclusionProof: inclusionProof, + BlobCommitment: &encoding.BlobCommitments{ + Commitment: commitment, + Length: uint(dataLength), + }, + BatchID: batchID, + ConfirmationTxnHash: common.HexToHash("0x123"), + ConfirmationBlockNumber: confirmationBlockNumber, + Fee: fee, + } + metadata.BlobStatus = disperser.Confirmed + metadata.ConfirmationInfo = confirmationInfo + return metadata +}