diff --git a/core/data.go b/core/data.go index bff1bf23f..e04de53d4 100644 --- a/core/data.go +++ b/core/data.go @@ -2,6 +2,7 @@ package core import ( "encoding/binary" + "encoding/hex" "errors" "fmt" "math/big" @@ -517,3 +518,27 @@ type ActiveReservation struct { type OnDemandPayment struct { CumulativePayment *big.Int // Total amount deposited by the user } + +type BlobVersion uint32 + +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + b, err := hex.DecodeString(h) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + +type BlobHeaderV2 struct { + BlobVersion BlobVersion `json:"version"` + QuorumIDs []QuorumID `json:"quorum_ids"` + BlobCommitment encoding.BlobCommitments `json:"commitments"` + + PaymentMetadata `json:"payment_metadata"` +} diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go index 8b87cb71f..32ef9e8ad 100644 --- a/disperser/common/v2/blob.go +++ b/disperser/common/v2/blob.go @@ -1,11 +1,6 @@ package v2 -import ( - "encoding/hex" - - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding" -) +import "github.com/Layr-Labs/eigenda/core" type BlobStatus uint @@ -16,36 +11,14 @@ const ( Failed ) -type BlobVersion uint32 - -type BlobKey [32]byte - -func (b BlobKey) Hex() string { - return hex.EncodeToString(b[:]) -} - -func HexToBlobKey(h string) (BlobKey, error) { - b, err := hex.DecodeString(h) - if err != nil { - return BlobKey{}, err - } - return BlobKey(b), nil -} - -type BlobHeader struct { - BlobVersion BlobVersion `json:"version"` - BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` - BlobCommitment encoding.BlobCommitments `json:"commitments"` - - core.PaymentMetadata `json:"payment_metadata"` -} - type BlobMetadata struct { - BlobHeader `json:"blob_header"` - - BlobStatus BlobStatus `json:"blob_status"` - Expiry uint64 `json:"expiry"` - NumRetries uint `json:"num_retries"` - BlobSize uint64 `json:"blob_size"` - RequestedAt uint64 `json:"requested_at"` + core.BlobHeaderV2 `json:"blob_header"` + + BlobKey core.BlobKey `json:"blob_key"` + BlobStatus BlobStatus `json:"blob_status"` + // Expiry is Unix timestamp of the blob expiry in seconds from epoch + Expiry uint64 `json:"expiry"` + NumRetries uint `json:"num_retries"` + BlobSize uint64 `json:"blob_size"` + RequestedAt uint64 `json:"requested_at"` } diff --git a/disperser/common/v2/blobstore/blob_metadata_store.go b/disperser/common/v2/blobstore/blob_metadata_store.go new file mode 100644 index 000000000..8670d60bb --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store.go @@ -0,0 +1,15 @@ +package blobstore + +import ( + "context" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" +) + +type BlobMetadataStore interface { + PutBlobMetadata(ctx context.Context, metadata *v2.BlobMetadata) error + GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) + GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) + GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_test.go b/disperser/common/v2/blobstore/blob_metadata_store_test.go new file mode 100644 index 000000000..7113bfb88 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_test.go @@ -0,0 +1,137 @@ +package blobstore_test + +import ( + "context" + "fmt" + "math/big" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/google/uuid" + + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/ory/dockertest/v3" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4571" + + dynamoClient *dynamodb.Client + blobMetadataStore blobstore.BlobMetadataStore + + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + + mockCommitment = encoding.BlobCommitments{} +) + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func setup(m *testing.M) { + + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstore.GenerateTableSchema(metadataTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment = encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 10, + } +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2.go b/disperser/common/v2/blobstore/blob_metadata_store_v2.go deleted file mode 100644 index 6808ef90f..000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store_v2.go +++ /dev/null @@ -1,551 +0,0 @@ -package blobstore - -import ( - "context" - "fmt" - "strconv" - "time" - - commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/disperser" - v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/ethereum/go-ethereum/common/hexutil" -) - -const ( - StatusIndexName = "StatusIndex" - OperatorDispersalIndexName = "OperatorDispersalIndex" - OperatorResponseIndexName = "OperatorResponseIndex" -) - -// BlobMetadataStore is a blob metadata storage backed by DynamoDB -type BlobMetadataStore struct { - dynamoDBClient *commondynamodb.Client - logger logging.Logger - tableName string - ttl time.Duration -} - -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { - logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) - return &BlobMetadataStore{ - dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStoreV2"), - tableName: tableName, - ttl: ttl, - } -} - -func (s *BlobMetadataStore) CreateBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { - item, err := MarshalBlobMetadata(blobMetadata) - if err != nil { - return err - } - - return s.dynamoDBClient.PutItem(ctx, s.tableName, item) -} - -func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*v2.BlobMetadata, error) { - item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: blobKey.BlobHash, - }, - "SK": &types.AttributeValueMemberS{ - Value: blobKey.MetadataHash, - }, - }) - - if item == nil { - return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey) - } - - if err != nil { - return nil, err - } - - metadata, err := UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - - return metadata, nil -} - -// GetBulkBlobMetadata returns the metadata for the given blob keys -// Note: ordering of items is not guaranteed -func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*v2.BlobMetadata, error) { - keys := make([]map[string]types.AttributeValue, len(blobKeys)) - for i := 0; i < len(blobKeys); i += 1 { - keys[i] = map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash}, - "MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash}, - } - } - items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) - if err != nil { - return nil, err - } - - metadata := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadata, nil -} - -// GetBlobMetadataByStatus returns all the metadata with the given status -// Because this function scans the entire index, it should only be used for status with a limited number of items. -// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. -func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }}) - if err != nil { - return nil, err - } - - metadata := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadata, nil -} - -// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status -// Because this function scans the entire index, it should only be used for status with a limited number of items. -// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. -func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) { - count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, - }) - if err != nil { - return 0, err - } - - return count, nil -} - -// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit -// along with items, also returns a pagination token that can be used to fetch the next set of items -// -// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. -// e.g 1mb limit for a single query -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*v2.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { - - var attributeMap map[string]types.AttributeValue - var err error - - // Convert the exclusive start key to a map of AttributeValue - if exclusiveStartKey != nil { - attributeMap, err = convertToAttribMap(exclusiveStartKey) - if err != nil { - return nil, nil, err - } - } - - queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, - }, limit, attributeMap) - - if err != nil { - return nil, nil, err - } - - // When no more results to fetch, the LastEvaluatedKey is nil - if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { - return nil, nil, nil - } - - metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) - for i, item := range queryResult.Items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, nil, err - } - } - - lastEvaluatedKey := queryResult.LastEvaluatedKey - if lastEvaluatedKey == nil { - return metadata, nil, nil - } - - // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey - exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) - if err != nil { - return nil, nil, err - } - return metadata, exclusiveStartKey, nil -} - -func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash", commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - }) - if err != nil { - return nil, err - } - - if len(items) == 0 { - return nil, fmt.Errorf("there is no metadata for batch %x", batchHeaderHash) - } - - metadatas := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadatas[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadatas, nil -} - -// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit -// along with items, also returns a pagination token that can be used to fetch the next set of items -// -// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. -// e.g 1mb limit for a single query -func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination( - ctx context.Context, - batchHeaderHash [32]byte, - limit int32, - exclusiveStartKey *disperser.BatchIndexExclusiveStartKey, -) ([]*v2.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { - var attributeMap map[string]types.AttributeValue - var err error - - // Convert the exclusive start key to a map of AttributeValue - if exclusiveStartKey != nil { - attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey) - if err != nil { - return nil, nil, err - } - } - - queryResult, err := s.dynamoDBClient.QueryIndexWithPagination( - ctx, - s.tableName, - batchIndexName, - "BatchHeaderHash = :batch_header_hash", - commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - }, - limit, - attributeMap, - ) - if err != nil { - return nil, nil, err - } - - s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey) - // When no more results to fetch, the LastEvaluatedKey is nil - if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { - return nil, nil, nil - } - - metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) - for i, item := range queryResult.Items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, nil, err - } - } - - lastEvaluatedKey := queryResult.LastEvaluatedKey - if lastEvaluatedKey == nil { - return metadata, nil, nil - } - - // Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey - exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey) - if err != nil { - return nil, nil, err - } - return metadata, exclusiveStartKey, nil -} - -func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - ":blob_index": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(blobIndex)), - }}) - if err != nil { - return nil, err - } - - if len(items) == 0 { - return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex) - } - - if len(items) > 1 { - s.logger.Error("there are multiple metadata for batch %s and blob index %d", hexutil.Encode(batchHeaderHash[:]), blobIndex) - } - - metadata, err := UnmarshalBlobMetadata(items[0]) - if err != nil { - return nil, err - } - return metadata, nil -} - -func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *v2.BlobMetadata) error { - _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: existingMetadata.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: existingMetadata.MetadataHash, - }, - }, commondynamodb.Item{ - "NumRetries": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(existingMetadata.NumRetries + 1)), - }, - }) - - return err -} - -func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *v2.BlobMetadata, confirmationBlockNumber uint32) error { - updated := *existingMetadata - if updated.ConfirmationInfo == nil { - return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for blob key %s", existingMetadata.GetBlobKey().String()) - } - - updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber - item, err := MarshalBlobMetadata(&updated) - if err != nil { - return err - } - - _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: existingMetadata.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: existingMetadata.MetadataHash, - }, - }, item) - - return err -} - -func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *v2.BlobMetadata) error { - item, err := MarshalBlobMetadata(updated) - if err != nil { - return err - } - - _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: metadataKey.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: metadataKey.MetadataHash, - }, - }, item) - - return err -} - -func (s *BlobMetadataStore) SetBlobStatus(ctx context.Context, metadataKey disperser.BlobKey, status disperser.BlobStatus) error { - _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: metadataKey.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: metadataKey.MetadataHash, - }, - }, commondynamodb.Item{ - "BlobStatus": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - }) - - return err -} - -func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { - return &dynamodb.CreateTableInput{ - AttributeDefinitions: []types.AttributeDefinition{ - { - AttributeName: aws.String("PK"), - AttributeType: types.ScalarAttributeTypeS, - }, - { - AttributeName: aws.String("SK"), - AttributeType: types.ScalarAttributeTypeS, - }, - }, - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("PK"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("SK"), - KeyType: types.KeyTypeRange, - }, - }, - TableName: aws.String(tableName), - GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ - { - IndexName: aws.String(StatusIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("BlobStatus"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("RequestedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - { - IndexName: aws.String(OperatorDispersalIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("OperatorID"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("DispersedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - { - IndexName: aws.String(OperatorResponseIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("OperatorID"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("RespondedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - } -} - -func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { - return attributevalue.MarshalMap(metadata) -} - -func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { - metadata := v2.BlobMetadata{} - err := attributevalue.UnmarshalMap(item, &metadata) - if err != nil { - return nil, err - } - - return &metadata, nil -} - -func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { - blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} - err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - - return &blobStoreExclusiveStartKey, nil -} - -func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) { - blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{} - err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - - return &blobStoreExclusiveStartKey, nil -} - -func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { - if blobStoreExclusiveStartKey == nil { - // Return an empty map or nil - return nil, nil - } - - avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - return avMap, nil -} - -func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) { - if blobStoreExclusiveStartKey == nil { - // Return an empty map or nil - return nil, nil - } - - avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - return avMap, nil -} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go deleted file mode 100644 index c1b1c92b7..000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go +++ /dev/null @@ -1,444 +0,0 @@ -package blobstore_test - -import ( - "context" - "testing" - "time" - - commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/disperser" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/consensys/gnark-crypto/ecc/bn254/fp" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" -) - -func TestBlobMetadataStoreOperations(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - now := time.Now() - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchedMetadata) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) - assert.NoError(t, err) - assert.Equal(t, metadata2, fetchedMetadata) - - fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2}) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchBulk[0]) - assert.Equal(t, metadata2, fetchBulk[1]) - - processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Len(t, processing, 1) - assert.Equal(t, metadata1, processing[0]) - - processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Equal(t, int32(1), processingCount) - - err = blobMetadataStore.IncrementNumRetries(ctx, metadata1) - assert.NoError(t, err) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - metadata1.NumRetries = 1 - assert.Equal(t, metadata1, fetchedMetadata) - - finalized, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Finalized) - assert.NoError(t, err) - assert.Len(t, finalized, 1) - assert.Equal(t, metadata2, finalized[0]) - - finalizedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Finalized) - assert.NoError(t, err) - assert.Equal(t, int32(1), finalizedCount) - - confirmedMetadata := getConfirmedMetadata(t, metadata1, 1) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) - assert.NoError(t, err) - - metadata, err := blobMetadataStore.GetBlobMetadataInBatch(ctx, confirmedMetadata.ConfirmationInfo.BatchHeaderHash, confirmedMetadata.ConfirmationInfo.BlobIndex) - assert.NoError(t, err) - assert.Equal(t, metadata, confirmedMetadata) - - confirmedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Confirmed) - assert.NoError(t, err) - assert.Equal(t, int32(1), confirmedCount) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - now := time.Now() - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchedMetadata) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) - assert.NoError(t, err) - assert.Equal(t, metadata2, fetchedMetadata) - - processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) - assert.NoError(t, err) - assert.Len(t, processing, 1) - assert.Equal(t, metadata1, processing[0]) - assert.NotNil(t, lastEvaluatedKey) - - finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) - assert.NoError(t, err) - assert.Len(t, finalized, 1) - assert.Equal(t, metadata2, finalized[0]) - assert.NotNil(t, lastEvaluatedKey) - - finalized, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) - assert.NoError(t, err) - assert.Len(t, finalized, 0) - assert.Nil(t, lastEvaluatedKey) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - expiry := uint64(time.Now().Add(time.Hour).Unix()) - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - confirmedMetadata1 := getConfirmedMetadata(t, metadata1, 1) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1) - assert.NoError(t, err) - - confirmedMetadata2 := getConfirmedMetadata(t, metadata2, 2) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2) - assert.NoError(t, err) - - // Fetch the blob metadata with limit 1 - metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil) - assert.NoError(t, err) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.NotNil(t, exclusiveStartKey) - assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Get the next blob metadata with limit 1 and the exclusive start key - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) - assert.NoError(t, err) - assert.Equal(t, metadata[0], confirmedMetadata2) - assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Fetching the next blob metadata should return an empty list - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) - assert.NoError(t, err) - assert.Len(t, metadata, 0) - assert.Nil(t, exclusiveStartKey) - - // Fetch the blob metadata with limit 2 - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil) - assert.NoError(t, err) - assert.Len(t, metadata, 2) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.Equal(t, metadata[1], confirmedMetadata2) - assert.NotNil(t, exclusiveStartKey) - assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Fetch the blob metadata with limit 3 should return only 2 items - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil) - assert.NoError(t, err) - assert.Len(t, metadata, 2) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.Equal(t, metadata[1], confirmedMetadata2) - assert.Nil(t, exclusiveStartKey) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { - ctx := context.Background() - // Query BlobMetadataStore for a blob that does not exist - // This should return nil for both the blob and lastEvaluatedKey - processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) - assert.NoError(t, err) - assert.Nil(t, processing) - assert.Nil(t, lastEvaluatedKey) -} - -func TestShadowWriteBlobMetadata(t *testing.T) { - ctx := context.Background() - - blobKey := disperser.BlobKey{ - BlobHash: "shadowblob", - MetadataHash: "shadowhash", - } - expiry := uint64(time.Now().Add(time.Hour).Unix()) - metadata := &disperser.BlobMetadata{ - MetadataHash: blobKey.MetadataHash, - BlobHash: blobKey.BlobHash, - BlobStatus: disperser.Processing, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - - err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) - assert.NoError(t, err) - err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) - assert.NoError(t, err) - primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) - assert.NoError(t, err) - assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) - - // Check that the shadow metadata exists but status has NOT been updated - shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ - "MetadataHash": &types.AttributeValueMemberS{ - Value: blobKey.MetadataHash, - }, - "BlobHash": &types.AttributeValueMemberS{ - Value: blobKey.BlobHash, - }, - }) - assert.NoError(t, err) - shadowMetadata := disperser.BlobMetadata{} - err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) - assert.NoError(t, err) - assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, - }, - }) -} - -func TestFilterOutExpiredBlobMetadata(t *testing.T) { - ctx := context.Background() - - blobKey := disperser.BlobKey{ - BlobHash: "blob1", - MetadataHash: "hash1", - } - now := time.Now() - metadata := &disperser.BlobMetadata{ - MetadataHash: blobKey.MetadataHash, - BlobHash: blobKey.BlobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(-1).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Add(-1000).Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata) - assert.NoError(t, err) - - processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Len(t, processing, 0) - - processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Equal(t, int32(0), processingCount) - - processing, _, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, nil) - assert.NoError(t, err) - assert.Len(t, processing, 0) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, - }, - }) -} - -func deleteItems(t *testing.T, keys []commondynamodb.Key) { - _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) - assert.NoError(t, err) -} - -func getConfirmedMetadata(t *testing.T, metadata *disperser.BlobMetadata, blobIndex uint32) *disperser.BlobMetadata { - batchHeaderHash := [32]byte{1, 2, 3} - var commitX, commitY fp.Element - _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") - assert.NoError(t, err) - _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") - assert.NoError(t, err) - commitment := &encoding.G1Commitment{ - X: commitX, - Y: commitY, - } - dataLength := 32 - batchID := uint32(99) - batchRoot := []byte("hello") - referenceBlockNumber := uint32(132) - confirmationBlockNumber := uint32(150) - sigRecordHash := [32]byte{0} - fee := []byte{0} - inclusionProof := []byte{1, 2, 3, 4, 5} - confirmationInfo := &disperser.ConfirmationInfo{ - BatchHeaderHash: batchHeaderHash, - BlobIndex: blobIndex, - SignatoryRecordHash: sigRecordHash, - ReferenceBlockNumber: referenceBlockNumber, - BatchRoot: batchRoot, - BlobInclusionProof: inclusionProof, - BlobCommitment: &encoding.BlobCommitments{ - Commitment: commitment, - Length: uint(dataLength), - }, - BatchID: batchID, - ConfirmationTxnHash: common.HexToHash("0x123"), - ConfirmationBlockNumber: confirmationBlockNumber, - Fee: fee, - } - metadata.BlobStatus = disperser.Confirmed - metadata.ConfirmationInfo = confirmationInfo - return metadata -} diff --git a/disperser/common/v2/blobstore/dynamo_store.go b/disperser/common/v2/blobstore/dynamo_store.go new file mode 100644 index 000000000..d61616b7e --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store.go @@ -0,0 +1,283 @@ +package blobstore + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +const ( + StatusIndexName = "StatusIndex" + OperatorDispersalIndexName = "OperatorDispersalIndex" + OperatorResponseIndexName = "OperatorResponseIndex" + + blobKeyPrefix = "BlobKey#" + dispersalIDPrefix = "DispersalID#" + blobMetadataSK = "BlobMetadata" + certificateSK = "Certificate" +) + +// blobMetadataStore is a blob metadata storage backed by DynamoDB +type blobMetadataStore struct { + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration +} + +var _ BlobMetadataStore = (*blobMetadataStore)(nil) + +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &blobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + +func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(blobMetadata) + if err != nil { + return err + } + + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) +} + +func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + }) + + if item == nil { + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex()) + } + + if err != nil { + return nil, err + } + + metadata, err := UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// GetBlobMetadataByStatus returns all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }}) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }) + if err != nil { + return 0, err + } + + return count, nil +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BlobStatus"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("Expiry"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("OperatorID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("DispersedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("RespondedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(StatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("Expiry"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorDispersalIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("DispersedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorResponseIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RespondedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(metadata) + if err != nil { + return nil, fmt.Errorf("failed to marshal blob metadata: %w", err) + } + + // Add PK and SK fields + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + metadata.BlobKey.Hex()} + fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} + + return fields, nil +} + +func UnmarshalBlobKey(item commondynamodb.Item) (core.BlobKey, error) { + type Blob struct { + PK string + } + + blob := Blob{} + err := attributevalue.UnmarshalMap(item, &blob) + if err != nil { + return core.BlobKey{}, err + } + + bk := strings.TrimPrefix(blob.PK, blobKeyPrefix) + return core.HexToBlobKey(bk) +} + +func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { + metadata := v2.BlobMetadata{} + err := attributevalue.UnmarshalMap(item, &metadata) + if err != nil { + return nil, err + } + blobKey, err := UnmarshalBlobKey(item) + if err != nil { + return nil, err + } + metadata.BlobKey = blobKey + + return &metadata, nil +} diff --git a/disperser/common/v2/blobstore/dynamo_store_test.go b/disperser/common/v2/blobstore/dynamo_store_test.go new file mode 100644 index 000000000..3be5ca20f --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store_test.go @@ -0,0 +1,96 @@ +package blobstore_test + +import ( + "context" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" +) + +func TestBlobMetadataStoreOperations(t *testing.T) { + ctx := context.Background() + blobHeader1 := &core.BlobHeaderV2{ + BlobVersion: 0, + QuorumIDs: []core.QuorumID{0}, + BlobCommitment: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 0, + CumulativePayment: 531, + }, + } + blobKey1 := core.BlobKey([32]byte{1, 2, 3}) + blobHeader2 := &core.BlobHeaderV2{ + BlobVersion: 0, + QuorumIDs: []core.QuorumID{1}, + BlobCommitment: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 2, + CumulativePayment: 999, + }, + } + blobKey2 := core.BlobKey([32]byte{4, 5, 6}) + + now := time.Now() + metadata1 := &v2.BlobMetadata{ + BlobHeaderV2: *blobHeader1, + BlobKey: blobKey1, + BlobStatus: v2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + } + metadata2 := &v2.BlobMetadata{ + BlobHeaderV2: *blobHeader2, + BlobKey: blobKey2, + BlobStatus: v2.Certified, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + } + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued) + assert.NoError(t, err) + assert.Len(t, queued, 1) + assert.Equal(t, metadata1, queued[0]) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified) + assert.NoError(t, err) + assert.Len(t, certified, 1) + assert.Equal(t, metadata2, certified[0]) + + queuedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, v2.Queued) + assert.NoError(t, err) + assert.Equal(t, int32(1), queuedCount) + + deleteItems(t, []commondynamodb.Key{ + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey1.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey2.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + }) +} + +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) + assert.Len(t, failed, 0) +} diff --git a/node/metrics.go b/node/metrics.go index 1eab92524..35b727475 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -252,7 +252,7 @@ func (g *Metrics) collectOnchainMetrics() { continue } _, quorumRankedOperators := operators.GetRankedOperators(state) - for q, _ := range state.Operators { + for q := range state.Operators { for i, op := range quorumRankedOperators[q] { if op.OperatorId == g.operatorId { g.allQuorumCache[q] = true