From b38b09db7536b869836c70cc380781cddd117214 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 25 Sep 2024 19:50:30 +0200 Subject: [PATCH] feat: introduce feature MOVES_HISTORY --- internal/controller/system/controller.go | 1 + internal/ledger.go | 63 ++++++----- internal/storage/ledger/accounts.go | 17 +-- internal/storage/ledger/balances.go | 46 ++------ .../ledger/migrations/0-add-sequences.sql | 8 +- internal/storage/ledger/transactions.go | 104 ++++++++---------- internal/storage/ledger/transactions_test.go | 2 +- internal/storage/ledger/volumes.go | 4 + internal/tracing/tracer.go | 1 + test/e2e/stress_test.go | 11 +- 10 files changed, 116 insertions(+), 141 deletions(-) diff --git a/internal/controller/system/controller.go b/internal/controller/system/controller.go index b6ef53bdf..e252c1e43 100644 --- a/internal/controller/system/controller.go +++ b/internal/controller/system/controller.go @@ -55,6 +55,7 @@ func (c *DefaultController) GetLedgerController(ctx context.Context, name string func (c *DefaultController) CreateLedger(ctx context.Context, name string, configuration ledger.Configuration) error { return tracing.SkipResult(tracing.Trace(ctx, "CreateLedger", tracing.NoResult(func(ctx context.Context) error { configuration.SetDefaults() + // todo: validate queried features l, err := ledger.New(name, configuration) if err != nil { return err diff --git a/internal/ledger.go b/internal/ledger.go index 8d6bb7051..249369792 100644 --- a/internal/ledger.go +++ b/internal/ledger.go @@ -9,13 +9,17 @@ import ( ) const ( - FeaturePostCommitVolumes = "POST_COMMIT_VOLUMES" - FeaturePostCommitEffectiveVolumes = "POST_COMMIT_EFFECTIVE_VOLUMES" - FeatureHashLogs = "HASH_LOGS" - FeatureAccountMetadataHistories = "ACCOUNT_METADATA_HISTORIES" - FeatureTransactionMetadataHistories = "TRANSACTION_METADATA_HISTORIES" - FeatureIndexAddressSegments = "INDEX_ADDRESS_SEGMENTS" - FeatureIndexTransactionAccounts = "INDEX_TRANSACTION_ACCOUNTS" + FeatureMovesHistory = "MOVES_HISTORY" + // todo: depends on FeatureMovesHistory + // todo: it should not be required as we have the information when updating volumes + FeatureMovesHistoryPostCommitVolumes = "MOVES_HISTORY_POST_COMMIT_VOLUMES" + // todo: depends on FeatureMovesHistory + FeatureMovesHistoryPostCommitEffectiveVolumes = "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" + FeatureHashLogs = "HASH_LOGS" + FeatureAccountMetadataHistory = "ACCOUNT_METADATA_HISTORY" + FeatureTransactionMetadataHistory = "TRANSACTION_METADATA_HISTORY" + FeatureIndexAddressSegments = "INDEX_ADDRESS_SEGMENTS" + FeatureIndexTransactionAccounts = "INDEX_TRANSACTION_ACCOUNTS" StateInitializing = "initializing" StateInUse = "in-use" @@ -25,31 +29,34 @@ const ( var ( DefaultFeatures = FeatureSet{ - FeaturePostCommitVolumes: "SYNC", - FeaturePostCommitEffectiveVolumes: "SYNC", - FeatureHashLogs: "SYNC", - FeatureAccountMetadataHistories: "SYNC", - FeatureTransactionMetadataHistories: "SYNC", - FeatureIndexAddressSegments: "ON", - FeatureIndexTransactionAccounts: "ON", + FeatureMovesHistory: "ON", + FeatureMovesHistoryPostCommitVolumes: "SYNC", + FeatureMovesHistoryPostCommitEffectiveVolumes: "SYNC", + FeatureHashLogs: "SYNC", + FeatureAccountMetadataHistory: "SYNC", + FeatureTransactionMetadataHistory: "SYNC", + FeatureIndexAddressSegments: "ON", + FeatureIndexTransactionAccounts: "ON", } MinimalFeatureSet = FeatureSet{ - FeaturePostCommitVolumes: "DISABLED", - FeaturePostCommitEffectiveVolumes: "DISABLED", - FeatureHashLogs: "DISABLED", - FeatureAccountMetadataHistories: "DISABLED", - FeatureTransactionMetadataHistories: "DISABLED", - FeatureIndexAddressSegments: "OFF", - FeatureIndexTransactionAccounts: "OFF", + FeatureMovesHistory: "OFF", + FeatureMovesHistoryPostCommitVolumes: "DISABLED", + FeatureMovesHistoryPostCommitEffectiveVolumes: "DISABLED", + FeatureHashLogs: "DISABLED", + FeatureAccountMetadataHistory: "DISABLED", + FeatureTransactionMetadataHistory: "DISABLED", + FeatureIndexAddressSegments: "OFF", + FeatureIndexTransactionAccounts: "OFF", } FeatureConfigurations = map[string][]string{ - FeaturePostCommitVolumes: {"SYNC", "DISABLED"}, - FeaturePostCommitEffectiveVolumes: {"SYNC", "DISABLED"}, - FeatureHashLogs: {"SYNC", "DISABLED"}, - FeatureAccountMetadataHistories: {"SYNC", "DISABLED"}, - FeatureTransactionMetadataHistories: {"SYNC", "DISABLED"}, - FeatureIndexAddressSegments: {"ON", "OFF"}, - FeatureIndexTransactionAccounts: {"ON", "OFF"}, + FeatureMovesHistory: {"ON", "OFF"}, + FeatureMovesHistoryPostCommitVolumes: {"SYNC", "DISABLED"}, + FeatureMovesHistoryPostCommitEffectiveVolumes: {"SYNC", "DISABLED"}, + FeatureHashLogs: {"SYNC", "DISABLED"}, + FeatureAccountMetadataHistory: {"SYNC", "DISABLED"}, + FeatureTransactionMetadataHistory: {"SYNC", "DISABLED"}, + FeatureIndexAddressSegments: {"ON", "OFF"}, + FeatureIndexTransactionAccounts: {"ON", "OFF"}, } ledgerNameFormat = regexp.MustCompile("^[0-9a-zA-Z_-]{1,63}$") diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index c12540f6f..db6e65220 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -160,8 +160,8 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo } } - if needPCV && !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) + if needPCV && !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitVolumes)) } // build the query @@ -175,7 +175,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo ret = ret.Where("accounts.first_usage <= ?", date) } - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { + if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { ret = ret. Join( `left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts.seq`, @@ -186,7 +186,9 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo ret = ret.ColumnExpr("accounts.metadata") } - if s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") && needPCV { + // todo: should join on histories only if pit is specified + // otherwise the accounts_volumes table is enough + if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") && needPCV { ret = ret. Join( `left join (?) pcv on pcv.accounts_seq = accounts.seq`, @@ -199,7 +201,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo ColumnExpr("pcv.*") } - if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { + if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { ret = ret. Join( `left join (?) pcev on pcev.accounts_seq = accounts.seq`, @@ -223,6 +225,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo match := balanceRegex.FindAllStringSubmatch(key, 2) asset := match[0][1] + // todo: use moves only if feature is enabled return s.db.NewSelect(). TableExpr( "(?) balance", @@ -243,7 +246,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo String(), nil, nil case key == "metadata": - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { + if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { key = "accounts_metadata.metadata" } @@ -251,7 +254,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo case metadataRegex.Match([]byte(key)): match := metadataRegex.FindAllStringSubmatch(key, 3) - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { + if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { key = "accounts_metadata.metadata" } else { key = "metadata" diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index 62e2e133d..af33ab8e4 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -2,6 +2,7 @@ package ledger import ( "context" + "github.com/formancehq/go-libs/platform/postgres" "github.com/pkg/errors" "math/big" "strings" @@ -69,16 +70,16 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, var selectAccountsWithVolumes *bun.SelectQuery if date != nil && !date.IsZero() { if useInsertionDate { - if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) + if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitVolumes)) } selectAccountsWithVolumes = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). Column("asset", "accounts_seq", "account_address", "account_address_array"). ColumnExpr("post_commit_volumes as volumes") } else { - if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes)) + if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes)) } selectAccountsWithVolumes = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)). @@ -99,7 +100,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, TableExpr("(?) accounts_volumes", selectAccountsWithVolumes) if needMetadata { - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { + if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { selectAccountsWithVolumes = selectAccountsWithVolumes. Join( `left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts_volumes.accounts_seq`, @@ -208,7 +209,7 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ Order("account", "asset"). Scan(ctx) if err != nil { - return nil, err + return nil, postgres.ResolveError(err) } ret := ledgercontroller.Balances{} @@ -234,36 +235,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ return ret, nil }) } - -/** - -SELECT * -FROM ( - SELECT *, accounts.address_array AS account_address_array - FROM ( - SELECT - "asset", - "accounts_seq", - "account_address", - "account_address_array", - post_commit_volumes AS volumes - FROM ( - SELECT DISTINCT ON (accounts_seq, account_address, asset) - "accounts_seq", - "account_address", - "asset", - first_value(account_address_array) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS account_address_array, - first_value(post_commit_volumes) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS post_commit_volumes - FROM ( - SELECT * - FROM "7c44551f".moves - WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z') - ORDER BY "seq" DESC - ) moves - WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')) moves - ) accounts_volumes - JOIN "7c44551f".accounts accounts ON accounts.seq = accounts_volumes.accounts_seq -) accounts -WHERE (jsonb_array_length(account_address_array) = 2 AND account_address_array @@ ('$[0] == "users"')::jsonpath) - -*/ diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql index 2efb09bd5..b60cd83b6 100644 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ b/internal/storage/ledger/migrations/0-add-sequences.sql @@ -19,7 +19,7 @@ select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce(( -- enable post commit volumes synchronously -{{ if .HasFeature "POST_COMMIT_VOLUMES" "SYNC" }} +{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_VOLUMES" "SYNC" }} create index "pcv_{{.ID}}" on "{{.Bucket}}".moves (accounts_seq, asset, seq) where ledger = '{{.Name}}'; create trigger "set_volumes_{{.ID}}" @@ -34,7 +34,7 @@ execute procedure "{{.Bucket}}".set_volumes(); -- enable post commit effective volumes synchronously -{{ if .HasFeature "POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }} +{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }} create index "pcev_{{.ID}}" on "{{.Bucket}}".moves (accounts_seq, asset, effective_date desc) where ledger = '{{.Name}}'; create trigger "set_effective_volumes_{{.ID}}" @@ -69,7 +69,7 @@ when ( execute procedure "{{.Bucket}}".set_log_hash(); {{ end }} -{{ if .HasFeature "ACCOUNT_METADATA_HISTORIES" "SYNC" }} +{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }} create trigger "update_account_metadata_history_{{.ID}}" after update on "{{.Bucket}}"."accounts" @@ -89,7 +89,7 @@ when ( execute procedure "{{.Bucket}}".insert_account_metadata_history(); {{ end }} -{{ if .HasFeature "TRANSACTION_METADATA_HISTORIES" "SYNC" }} +{{ if .HasFeature "TRANSACTION_METADATA_HISTORY" "SYNC" }} create trigger "update_transaction_metadata_history_{{.ID}}" after update on "{{.Bucket}}"."transactions" diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index f23f57324..b2e04a2e0 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -86,12 +86,13 @@ func (s *Store) selectDistinctTransactionMetadataHistories(date *time.Time) *bun func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffectiveVolumes bool, q query.Builder) *bun.SelectQuery { ret := s.db.NewSelect() - if expandVolumes && !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes)) + // todo: no need this feature to grab pcv since those are included in transaction table + if expandVolumes && !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitVolumes)) } - if expandEffectiveVolumes && !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { - return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes)) + if expandEffectiveVolumes && !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes)) } if q != nil { @@ -156,7 +157,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti ret = ret.Where("timestamp <= ?", date) } - if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() { + if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() { ret = ret. Join( `left join (?) transactions_metadata on transactions_metadata.transactions_seq = transactions.seq`, @@ -167,23 +168,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti ret = ret.ColumnExpr("metadata") } - /** - select transactions_seq, jsonb_merge_agg(pcev::jsonb) as post_commit_effective_volumes - from ( - SELECT - distinct on (transactions_seq, account_address, asset) - "transactions_seq", - json_build_object( - moves.account_address, - json_build_object( - moves.asset, - first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc))) as pcev - FROM moves - ) data - group by transactions_seq; - */ - - if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { + if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes { ret = ret. Join( `join (?) pcev on pcev.transactions_seq = transactions.seq`, @@ -196,7 +181,8 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti DistinctOn("transactions_seq, account_address, asset"). ModelTableExpr(s.GetPrefixedRelationName("moves")). Column("transactions_seq"). - ColumnExpr(` + // use strings.Replace for logs + ColumnExpr(strings.Replace(` json_build_object( moves.account_address, json_build_object( @@ -204,7 +190,7 @@ func (s *Store) selectTransactions(date *time.Time, expandVolumes, expandEffecti first_value(moves.post_commit_effective_volumes) over (partition by (transactions_seq, account_address, asset) order by seq desc) ) ) as pcev - `), + `, "\n", "", -1)), ). Group("transactions_seq"), ). @@ -343,37 +329,43 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e return errors.Wrap(err, "failed to insert transaction") } - moves := Moves{} - for _, p := range tx.Postings { - moves = append(moves, []*Move{ - { - Ledger: s.ledger.Name, - IsSource: true, - Account: p.Source, - AccountAddressArray: strings.Split(p.Source, ":"), - Amount: (*bunpaginate.BigInt)(p.Amount), - Asset: p.Asset, - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.Timestamp, - TransactionSeq: mappedTx.Seq, - AccountSeq: accounts[p.Source].Seq, - }, - { - Ledger: s.ledger.Name, - Account: p.Destination, - AccountAddressArray: strings.Split(p.Destination, ":"), - Amount: (*bunpaginate.BigInt)(p.Amount), - Asset: p.Asset, - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.Timestamp, - TransactionSeq: mappedTx.Seq, - AccountSeq: accounts[p.Destination].Seq, - }, - }...) - } + if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") { + moves := Moves{} + for _, p := range tx.Postings { + moves = append(moves, []*Move{ + { + Ledger: s.ledger.Name, + IsSource: true, + Account: p.Source, + AccountAddressArray: strings.Split(p.Source, ":"), + Amount: (*bunpaginate.BigInt)(p.Amount), + Asset: p.Asset, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, + TransactionSeq: mappedTx.Seq, + AccountSeq: accounts[p.Source].Seq, + }, + { + Ledger: s.ledger.Name, + Account: p.Destination, + AccountAddressArray: strings.Split(p.Destination, ":"), + Amount: (*bunpaginate.BigInt)(p.Amount), + Asset: p.Asset, + InsertionDate: tx.InsertedAt, + EffectiveDate: tx.Timestamp, + TransactionSeq: mappedTx.Seq, + AccountSeq: accounts[p.Destination].Seq, + }, + }...) + } + + if err := s.insertMoves(ctx, moves...); err != nil { + return errors.Wrap(err, "failed to insert moves") + } - if err := s.insertMoves(ctx, moves...); err != nil { - return errors.Wrap(err, "failed to insert moves") + if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") { + tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes() + } } tx.ID = mappedTx.ID @@ -381,10 +373,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e tx.Timestamp = mappedTx.Timestamp tx.InsertedAt = mappedTx.InsertedAt - if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { - tx.PostCommitEffectiveVolumes = moves.ComputePostCommitEffectiveVolumes() - } - return nil } diff --git a/internal/storage/ledger/transactions_test.go b/internal/storage/ledger/transactions_test.go index 2e2f21795..92bf7225e 100644 --- a/internal/storage/ledger/transactions_test.go +++ b/internal/storage/ledger/transactions_test.go @@ -653,7 +653,7 @@ func TestTransactionsList(t *testing.T) { expected: []ledger.Transaction{tx3, tx2, tx1}, }, { - name: "filter using exists metadata and pit", + name: "filter using metadata and pit", query: ledgercontroller.NewPaginatedQueryOptions(ledgercontroller.PITFilterWithVolumes{ PITFilter: ledgercontroller.PITFilter{ PIT: pointer.For(tx3.Timestamp), diff --git a/internal/storage/ledger/volumes.go b/internal/storage/ledger/volumes.go index 40f365dad..b52fc50b1 100644 --- a/internal/storage/ledger/volumes.go +++ b/internal/storage/ledger/volumes.go @@ -60,6 +60,10 @@ func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVol func (s *Store) selectVolumes(oot, pit *time.Time, useInsertionDate bool, groupLevel int, q lquery.Builder) *bun.SelectQuery { ret := s.db.NewSelect() + if !s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") { + return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistory)) + } + var useMetadata bool if q != nil { err := q.Walk(func(operator, key string, value any) error { diff --git a/internal/tracing/tracer.go b/internal/tracing/tracer.go index 7d1af209f..8e605d76f 100644 --- a/internal/tracing/tracer.go +++ b/internal/tracing/tracer.go @@ -27,6 +27,7 @@ func TraceWithLatency[RET any]( now := time.Now() ret, err := fn(ctx) if err != nil { + trace.SpanFromContext(ctx).RecordError(err) var zeroRet RET return zeroRet, err } diff --git a/test/e2e/stress_test.go b/test/e2e/stress_test.go index 900e528aa..198044ffb 100644 --- a/test/e2e/stress_test.go +++ b/test/e2e/stress_test.go @@ -38,8 +38,8 @@ var _ = Context("Ledger stress tests", func() { const ( countLedgers = 30 - countBuckets = 3 - countTransactions = 500 + countBuckets = 10 + countTransactions = 300 countAccounts = 20 ) @@ -51,8 +51,11 @@ var _ = Context("Ledger stress tests", func() { err := CreateLedger(ctx, testServer.GetValue(), operations.V2CreateLedgerRequest{ Ledger: ledgerName, V2CreateLedgerRequest: &components.V2CreateLedgerRequest{ - Bucket: &bucketName, - Features: ledger.MinimalFeatureSet.With(ledger.FeaturePostCommitVolumes, "SYNC"), + Bucket: &bucketName, + Features: ledger.MinimalFeatureSet. + // todo: as we are interested only by aggregated volumes at current date, these features should not be required + With(ledger.FeatureMovesHistory, "ON"). + With(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC"), }, }) Expect(err).ShouldNot(HaveOccurred())