Skip to content

Commit

Permalink
feat: introduce feature MOVES_HISTORY
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 2007a08 commit b38b09d
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 141 deletions.
1 change: 1 addition & 0 deletions internal/controller/system/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *DefaultController) GetLedgerController(ctx context.Context, name string
func (c *DefaultController) CreateLedger(ctx context.Context, name string, configuration ledger.Configuration) error {
return tracing.SkipResult(tracing.Trace(ctx, "CreateLedger", tracing.NoResult(func(ctx context.Context) error {
configuration.SetDefaults()
// todo: validate queried features
l, err := ledger.New(name, configuration)
if err != nil {
return err
Expand Down
63 changes: 35 additions & 28 deletions internal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
)

const (
FeaturePostCommitVolumes = "POST_COMMIT_VOLUMES"
FeaturePostCommitEffectiveVolumes = "POST_COMMIT_EFFECTIVE_VOLUMES"
FeatureHashLogs = "HASH_LOGS"
FeatureAccountMetadataHistories = "ACCOUNT_METADATA_HISTORIES"
FeatureTransactionMetadataHistories = "TRANSACTION_METADATA_HISTORIES"
FeatureIndexAddressSegments = "INDEX_ADDRESS_SEGMENTS"
FeatureIndexTransactionAccounts = "INDEX_TRANSACTION_ACCOUNTS"
FeatureMovesHistory = "MOVES_HISTORY"
// todo: depends on FeatureMovesHistory
// todo: it should not be required as we have the information when updating volumes
FeatureMovesHistoryPostCommitVolumes = "MOVES_HISTORY_POST_COMMIT_VOLUMES"
// todo: depends on FeatureMovesHistory
FeatureMovesHistoryPostCommitEffectiveVolumes = "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES"
FeatureHashLogs = "HASH_LOGS"
FeatureAccountMetadataHistory = "ACCOUNT_METADATA_HISTORY"
FeatureTransactionMetadataHistory = "TRANSACTION_METADATA_HISTORY"
FeatureIndexAddressSegments = "INDEX_ADDRESS_SEGMENTS"
FeatureIndexTransactionAccounts = "INDEX_TRANSACTION_ACCOUNTS"

StateInitializing = "initializing"
StateInUse = "in-use"
Expand All @@ -25,31 +29,34 @@ const (

var (
DefaultFeatures = FeatureSet{
FeaturePostCommitVolumes: "SYNC",
FeaturePostCommitEffectiveVolumes: "SYNC",
FeatureHashLogs: "SYNC",
FeatureAccountMetadataHistories: "SYNC",
FeatureTransactionMetadataHistories: "SYNC",
FeatureIndexAddressSegments: "ON",
FeatureIndexTransactionAccounts: "ON",
FeatureMovesHistory: "ON",
FeatureMovesHistoryPostCommitVolumes: "SYNC",
FeatureMovesHistoryPostCommitEffectiveVolumes: "SYNC",
FeatureHashLogs: "SYNC",
FeatureAccountMetadataHistory: "SYNC",
FeatureTransactionMetadataHistory: "SYNC",
FeatureIndexAddressSegments: "ON",
FeatureIndexTransactionAccounts: "ON",
}
MinimalFeatureSet = FeatureSet{
FeaturePostCommitVolumes: "DISABLED",
FeaturePostCommitEffectiveVolumes: "DISABLED",
FeatureHashLogs: "DISABLED",
FeatureAccountMetadataHistories: "DISABLED",
FeatureTransactionMetadataHistories: "DISABLED",
FeatureIndexAddressSegments: "OFF",
FeatureIndexTransactionAccounts: "OFF",
FeatureMovesHistory: "OFF",
FeatureMovesHistoryPostCommitVolumes: "DISABLED",
FeatureMovesHistoryPostCommitEffectiveVolumes: "DISABLED",
FeatureHashLogs: "DISABLED",
FeatureAccountMetadataHistory: "DISABLED",
FeatureTransactionMetadataHistory: "DISABLED",
FeatureIndexAddressSegments: "OFF",
FeatureIndexTransactionAccounts: "OFF",
}
FeatureConfigurations = map[string][]string{
FeaturePostCommitVolumes: {"SYNC", "DISABLED"},
FeaturePostCommitEffectiveVolumes: {"SYNC", "DISABLED"},
FeatureHashLogs: {"SYNC", "DISABLED"},
FeatureAccountMetadataHistories: {"SYNC", "DISABLED"},
FeatureTransactionMetadataHistories: {"SYNC", "DISABLED"},
FeatureIndexAddressSegments: {"ON", "OFF"},
FeatureIndexTransactionAccounts: {"ON", "OFF"},
FeatureMovesHistory: {"ON", "OFF"},
FeatureMovesHistoryPostCommitVolumes: {"SYNC", "DISABLED"},
FeatureMovesHistoryPostCommitEffectiveVolumes: {"SYNC", "DISABLED"},
FeatureHashLogs: {"SYNC", "DISABLED"},
FeatureAccountMetadataHistory: {"SYNC", "DISABLED"},
FeatureTransactionMetadataHistory: {"SYNC", "DISABLED"},
FeatureIndexAddressSegments: {"ON", "OFF"},
FeatureIndexTransactionAccounts: {"ON", "OFF"},
}

ledgerNameFormat = regexp.MustCompile("^[0-9a-zA-Z_-]{1,63}$")
Expand Down
17 changes: 10 additions & 7 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
}
}

if needPCV && !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes))
if needPCV && !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitVolumes))
}

// build the query
Expand All @@ -175,7 +175,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
ret = ret.Where("accounts.first_usage <= ?", date)
}

if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
ret = ret.
Join(
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts.seq`,
Expand All @@ -186,7 +186,9 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
ret = ret.ColumnExpr("accounts.metadata")
}

if s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") && needPCV {
// todo: should join on histories only if pit is specified
// otherwise the accounts_volumes table is enough
if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") && needPCV {
ret = ret.
Join(
`left join (?) pcv on pcv.accounts_seq = accounts.seq`,
Expand All @@ -199,7 +201,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
ColumnExpr("pcv.*")
}

if s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
ret = ret.
Join(
`left join (?) pcev on pcev.accounts_seq = accounts.seq`,
Expand All @@ -223,6 +225,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
match := balanceRegex.FindAllStringSubmatch(key, 2)
asset := match[0][1]

// todo: use moves only if feature is enabled
return s.db.NewSelect().
TableExpr(
"(?) balance",
Expand All @@ -243,15 +246,15 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
String(), nil, nil

case key == "metadata":
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
key = "accounts_metadata.metadata"
}

return key + " -> ? is not null", []any{value}, nil

case metadataRegex.Match([]byte(key)):
match := metadataRegex.FindAllStringSubmatch(key, 3)
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
key = "accounts_metadata.metadata"
} else {
key = "metadata"
Expand Down
46 changes: 7 additions & 39 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"context"
"github.com/formancehq/go-libs/platform/postgres"
"github.com/pkg/errors"
"math/big"
"strings"
Expand Down Expand Up @@ -69,16 +70,16 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
var selectAccountsWithVolumes *bun.SelectQuery
if date != nil && !date.IsZero() {
if useInsertionDate {
if !s.ledger.HasFeature(ledger.FeaturePostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitVolumes))
if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "accounts_seq", "account_address", "account_address_array").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeaturePostCommitEffectiveVolumes))
if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
Expand All @@ -99,7 +100,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
TableExpr("(?) accounts_volumes", selectAccountsWithVolumes)

if needMetadata {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistories, "SYNC") && date != nil && !date.IsZero() {
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join(
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts_volumes.accounts_seq`,
Expand Down Expand Up @@ -208,7 +209,7 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
Order("account", "asset").
Scan(ctx)
if err != nil {
return nil, err
return nil, postgres.ResolveError(err)
}

ret := ledgercontroller.Balances{}
Expand All @@ -234,36 +235,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
return ret, nil
})
}

/**
SELECT *
FROM (
SELECT *, accounts.address_array AS account_address_array
FROM (
SELECT
"asset",
"accounts_seq",
"account_address",
"account_address_array",
post_commit_volumes AS volumes
FROM (
SELECT DISTINCT ON (accounts_seq, account_address, asset)
"accounts_seq",
"account_address",
"asset",
first_value(account_address_array) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS account_address_array,
first_value(post_commit_volumes) OVER (PARTITION BY (accounts_seq, account_address, asset) ORDER BY seq DESC) AS post_commit_volumes
FROM (
SELECT *
FROM "7c44551f".moves
WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')
ORDER BY "seq" DESC
) moves
WHERE (ledger = '7c44551f') AND (insertion_date <= '2024-09-25T12:01:13.895812Z')) moves
) accounts_volumes
JOIN "7c44551f".accounts accounts ON accounts.seq = accounts_volumes.accounts_seq
) accounts
WHERE (jsonb_array_length(account_address_array) = 2 AND account_address_array @@ ('$[0] == "users"')::jsonpath)
*/
8 changes: 4 additions & 4 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ select setval('"{{.Bucket}}"."log_id_{{.ID}}"', coalesce((


-- enable post commit volumes synchronously
{{ if .HasFeature "POST_COMMIT_VOLUMES" "SYNC" }}
{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_VOLUMES" "SYNC" }}
create index "pcv_{{.ID}}" on "{{.Bucket}}".moves (accounts_seq, asset, seq) where ledger = '{{.Name}}';

create trigger "set_volumes_{{.ID}}"
Expand All @@ -34,7 +34,7 @@ execute procedure "{{.Bucket}}".set_volumes();

-- enable post commit effective volumes synchronously

{{ if .HasFeature "POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }}
{{ if .HasFeature "MOVES_HISTORY_POST_COMMIT_EFFECTIVE_VOLUMES" "SYNC" }}
create index "pcev_{{.ID}}" on "{{.Bucket}}".moves (accounts_seq, asset, effective_date desc) where ledger = '{{.Name}}';

create trigger "set_effective_volumes_{{.ID}}"
Expand Down Expand Up @@ -69,7 +69,7 @@ when (
execute procedure "{{.Bucket}}".set_log_hash();
{{ end }}

{{ if .HasFeature "ACCOUNT_METADATA_HISTORIES" "SYNC" }}
{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }}
create trigger "update_account_metadata_history_{{.ID}}"
after update
on "{{.Bucket}}"."accounts"
Expand All @@ -89,7 +89,7 @@ when (
execute procedure "{{.Bucket}}".insert_account_metadata_history();
{{ end }}

{{ if .HasFeature "TRANSACTION_METADATA_HISTORIES" "SYNC" }}
{{ if .HasFeature "TRANSACTION_METADATA_HISTORY" "SYNC" }}
create trigger "update_transaction_metadata_history_{{.ID}}"
after update
on "{{.Bucket}}"."transactions"
Expand Down
Loading

0 comments on commit b38b09d

Please sign in to comment.