Skip to content

Commit

Permalink
feat: move model in core
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 566f25f commit 15ae219
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 62 deletions.
10 changes: 10 additions & 0 deletions internal/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/time"
"github.com/uptrace/bun"
"math/big"
)

const (
Expand All @@ -21,3 +22,12 @@ type Account struct {
Volumes VolumesByAssets `json:"volumes,omitempty" bun:"pcv,scanonly"`
EffectiveVolumes VolumesByAssets `json:"effectiveVolumes,omitempty" bun:"pcev,scanonly"`
}

type AccountsVolumes struct {
bun.BaseModel `bun:"accounts_volumes"`

Account string `bun:"accounts_address,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Input *big.Int `bun:"input,type:numeric"`
Output *big.Int `bun:"output,type:numeric"`
}
50 changes: 14 additions & 36 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,22 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
}
}

accountsVolumes := make([]AccountsVolumes, 0)
type AccountsVolumesWithLedger struct {
ledger.AccountsVolumes `bun:",extend"`
Ledger string `bun:"ledger,type:varchar"`
}

accountsVolumes := make([]AccountsVolumesWithLedger, 0)
for account, assets := range query {
for _, asset := range assets {
accountsVolumes = append(accountsVolumes, AccountsVolumes{
Ledger: s.ledger.Name,
Account: account,
Asset: asset,
Input: new(big.Int),
Output: new(big.Int),
accountsVolumes = append(accountsVolumes, AccountsVolumesWithLedger{
Ledger: s.ledger.Name,
AccountsVolumes: ledger.AccountsVolumes{
Account: account,
Asset: asset,
Input: new(big.Int),
Output: new(big.Int),
},
})
}
}
Expand Down Expand Up @@ -262,32 +269,3 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
return ret, nil
})
}

/**
SELECT to_json(array_agg(json_build_object('asset', accounts.asset, 'input', (accounts.volumes->>'input')::numeric, 'output', (accounts.volumes->>'output')::numeric))) AS aggregated
FROM (
SELECT *
FROM (
SELECT *, accounts.address_array
FROM (
SELECT "asset", "accounts_address", post_commit_volumes AS volumes
FROM (
SELECT DISTINCT ON (accounts_address, asset)
"accounts_address",
"asset",
first_value(post_commit_volumes) OVER (PARTITION BY (accounts_address, asset) ORDER BY seq DESC) AS post_commit_volumes
FROM (
SELECT *
FROM "87b28082".moves
WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z')
ORDER BY "seq" DESC
) moves
WHERE (ledger = '87b28082') AND (insertion_date <= '2024-09-26T14:45:10.568382Z')
) moves
) accounts_volumes
JOIN "87b28082".accounts accounts ON accounts.address = accounts_volumes.accounts_address
) accounts
WHERE (jsonb_array_length(accounts.address_array) = 2
AND accounts.address_array @@ ('$[0] == "users"')::jsonpath)
) accounts
*/
3 changes: 1 addition & 2 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func TestBalancesGet(t *testing.T) {
_, err := store.UpsertAccount(ctx, world)
require.NoError(t, err)

_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
_, err = store.updateVolumes(ctx, ledger.AccountsVolumes{
Account: "world",
Asset: "USD",
Input: new(big.Int),
Expand Down
9 changes: 4 additions & 5 deletions internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}
}

postCommitVolumes, err := s.updateVolumes(ctx, volumeUpdates(s.ledger.Name, tx)...)
postCommitVolumes, err := s.updateVolumes(ctx, volumeUpdates(tx)...)
if err != nil {
return errors.Wrap(err, "failed to update balances")
}
Expand Down Expand Up @@ -560,7 +560,7 @@ func filterAccountAddressOnTransactions(address string, source, destination bool
}
}

func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes {
func volumeUpdates(transaction *ledger.Transaction) []ledger.AccountsVolumes {
aggregatedVolumes := make(map[string]map[string][]ledger.Posting)
for _, posting := range transaction.Postings {
if _, ok := aggregatedVolumes[posting.Source]; !ok {
Expand All @@ -578,7 +578,7 @@ func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes
aggregatedVolumes[posting.Destination][posting.Asset] = append(aggregatedVolumes[posting.Destination][posting.Asset], posting)
}

ret := make([]AccountsVolumes, 0)
ret := make([]ledger.AccountsVolumes, 0)
for account, movesByAsset := range aggregatedVolumes {
for asset, postings := range movesByAsset {
volumes := ledger.NewEmptyVolumes()
Expand All @@ -591,8 +591,7 @@ func volumeUpdates(l string, transaction *ledger.Transaction) []AccountsVolumes
}
}

ret = append(ret, AccountsVolumes{
Ledger: l,
ret = append(ret, ledger.AccountsVolumes{
Account: account,
Asset: asset,
Input: volumes.Input,
Expand Down
27 changes: 14 additions & 13 deletions internal/storage/ledger/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package ledger
import (
"context"
"fmt"
"math/big"

"github.com/formancehq/go-libs/collectionutils"
"github.com/formancehq/go-libs/platform/postgres"

"github.com/formancehq/ledger/internal/tracing"
Expand All @@ -17,21 +16,23 @@ import (
"github.com/uptrace/bun"
)

type AccountsVolumes struct {
bun.BaseModel `bun:"accounts_volumes"`
func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...ledger.AccountsVolumes) (ledger.PostCommitVolumes, error) {
return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (ledger.PostCommitVolumes, error) {

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"accounts_address,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Input *big.Int `bun:"input,type:numeric"`
Output *big.Int `bun:"output,type:numeric"`
}
type AccountsVolumesWithLedger struct {
ledger.AccountsVolumes `bun:",extend"`
Ledger string `bun:"ledger,type:varchar"`
}

func (s *Store) updateVolumes(ctx context.Context, accountVolumes ...AccountsVolumes) (ledger.PostCommitVolumes, error) {
return tracing.TraceWithLatency(ctx, "UpdateBalances", func(ctx context.Context) (ledger.PostCommitVolumes, error) {
accountsVolumesWithLedger := collectionutils.Map(accountVolumes, func(from ledger.AccountsVolumes) AccountsVolumesWithLedger {
return AccountsVolumesWithLedger{
AccountsVolumes: from,
Ledger: s.ledger.Name,
}
})

_, err := s.db.NewInsert().
Model(&accountVolumes).
Model(&accountsVolumesWithLedger).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
On("conflict (ledger, accounts_address, asset) do update").
Set("input = accounts_volumes.input + excluded.input").
Expand Down
9 changes: 3 additions & 6 deletions internal/storage/ledger/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,7 @@ func TestUpdateVolumes(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

volumes, err := store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
volumes, err := store.updateVolumes(ctx, ledger.AccountsVolumes{
Account: "world",
Asset: "USD/2",
Input: big.NewInt(0),
Expand All @@ -709,8 +708,7 @@ func TestUpdateVolumes(t *testing.T) {
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
volumes, err = store.updateVolumes(ctx, ledger.AccountsVolumes{
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Expand All @@ -723,8 +721,7 @@ func TestUpdateVolumes(t *testing.T) {
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
volumes, err = store.updateVolumes(ctx, ledger.AccountsVolumes{
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Expand Down

0 comments on commit 15ae219

Please sign in to comment.