Skip to content

Commit

Permalink
feat: Add migration for pre/post commit volumes (#277)
Browse files Browse the repository at this point in the history
feat: Add a new migration to populate preCommitVolumes and postCommitVolumes properties of the transactions
  • Loading branch information
gfyrag authored Jul 20, 2022
1 parent 19304cb commit 9f04683
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 6 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/numary/ledger/cmd/internal"
"github.com/numary/ledger/pkg/redis"
_ "github.com/numary/ledger/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/lib/pq v1.10.2
github.com/numary/go-libs v0.0.0-20220609103351-69aecd5d4097
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.12
github.com/xdg-go/scram v1.1.0
gopkg.in/segmentio/analytics-go.v3 v3.1.0
Expand Down Expand Up @@ -135,7 +136,6 @@ require (
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/segmentio/backo-go v1.0.0 // indirect
github.com/spf13/afero v1.7.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,6 @@ github.com/numary/ledger v0.0.0-20210702172952-a5bd30e551d0/go.mod h1:u2K28z9TDY
github.com/numary/ledger v0.0.0-20211227131550-dc7b78f85b5b/go.mod h1:uovuDsK7Gs7duqKQ9PgaFulJnPTDftGdR/n3rBRzNIs=
github.com/numary/machine v0.0.0-20210702091459-23a82555adbf/go.mod h1:WAFvefAGYNjdDmPtDoZ305F58QDtUJyB0QWN3vzSZao=
github.com/numary/machine v0.0.0-20210831114934-e54c99840e08/go.mod h1:KulcZIlMidEjXmuFSGNckmk0pKr4HFKFYy3bB+ksWSQ=
github.com/numary/machine v1.1.1 h1:ESJJ7pCIzeMPS3V6esffIPBBiGXjKRdgB5aqfqND4FM=
github.com/numary/machine v1.1.1/go.mod h1:lSdeCwegoylxgHOl6wBC9BgOo2N35ra53aTsRybmJsc=
github.com/numary/machine v1.1.2-0.20220707224429-2a09b3f81249 h1:02EHCDlgU2aNV/B1r9jyUthlKaSHil28rPvvtAoVafM=
github.com/numary/machine v1.1.2-0.20220707224429-2a09b3f81249/go.mod h1:lSdeCwegoylxgHOl6wBC9BgOo2N35ra53aTsRybmJsc=
github.com/numary/machine v1.1.2 h1:MoPu4I2NvkM/V0CT8xNngsZrKrA3X5jA8Ge1ZCDamXA=
github.com/numary/machine v1.1.2/go.mod h1:lSdeCwegoylxgHOl6wBC9BgOo2N35ra53aTsRybmJsc=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down
51 changes: 51 additions & 0 deletions pkg/core/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,57 @@ func (v AssetsVolumes) Balances() AssetsBalances {

type AccountsAssetsVolumes map[string]AssetsVolumes

func (a AccountsAssetsVolumes) GetVolumes(account, asset string) Volumes {
if assetsVolumes, ok := a[account]; !ok {
return Volumes{}
} else {
return assetsVolumes[asset]
}
}

func (a AccountsAssetsVolumes) SetVolumes(account, asset string, volumes Volumes) {
if assetsVolumes, ok := a[account]; !ok {
a[account] = map[string]Volumes{
asset: volumes,
}
} else {
assetsVolumes[asset] = volumes
}
}

func (a AccountsAssetsVolumes) AddInput(account, asset string, input int64) {
if assetsVolumes, ok := a[account]; !ok {
a[account] = map[string]Volumes{
asset: {
Input: input,
},
}
} else {
volumes := assetsVolumes[asset]
volumes.Input += input
assetsVolumes[asset] = volumes
}
}

func (a AccountsAssetsVolumes) AddOutput(account, asset string, output int64) {
if assetsVolumes, ok := a[account]; !ok {
a[account] = map[string]Volumes{
asset: {
Output: output,
},
}
} else {
volumes := assetsVolumes[asset]
volumes.Output += output
assetsVolumes[asset] = volumes
}
}

func (a AccountsAssetsVolumes) HasAccount(account string) bool {
_, ok := a[account]
return ok
}

// Scan - Implement the database/sql scanner interface
func (a *AccountsAssetsVolumes) Scan(value interface{}) error {
if value == nil {
Expand Down
118 changes: 118 additions & 0 deletions pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes/any.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package add_pre_post_volumes

import (
"context"
"database/sql"
"encoding/json"

"github.com/huandu/go-sqlbuilder"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/storage/sqlstorage"
"github.com/pkg/errors"
)

func init() {
sqlstorage.RegisterGoMigration(Upgrade)
}

type Transaction struct {
ID uint64 `json:"txid"`
Postings core.Postings `json:"postings"`
}

func Upgrade(ctx context.Context, schema sqlstorage.Schema, sqlTx *sql.Tx) error {
sb := sqlbuilder.NewSelectBuilder()
sb.
From(schema.Table("log")).
Select("data").
Where(sb.E("type", core.NewTransactionType)).
OrderBy("id").
Asc()

sqlq, args := sb.BuildWithFlavor(schema.Flavor())
rows, err := sqlTx.QueryContext(ctx, sqlq, args...)
if err != nil {
return errors.Wrap(err, "querying rows")
}
defer rows.Close()

updates := make([]*sqlbuilder.UpdateBuilder, 0)

aggregatedVolumes := core.AccountsAssetsVolumes{}
for rows.Next() {
var data string
err := rows.Scan(&data)
if err != nil {
return errors.Wrap(err, "scanning row")
}

var tx Transaction
err = json.Unmarshal([]byte(data), &tx)
if err != nil {
return errors.Wrap(err, "decoding transaction")
}

preCommitVolumes := core.AccountsAssetsVolumes{}
postCommitVolumes := core.AccountsAssetsVolumes{}
for _, posting := range tx.Postings {

preCommitVolumes.SetVolumes(posting.Source, posting.Asset,
aggregatedVolumes.GetVolumes(posting.Source, posting.Asset))
preCommitVolumes.SetVolumes(posting.Destination, posting.Asset,
aggregatedVolumes.GetVolumes(posting.Destination, posting.Asset))

if !postCommitVolumes.HasAccount(posting.Source) {
postCommitVolumes.SetVolumes(posting.Source, posting.Asset,
preCommitVolumes.GetVolumes(posting.Source, posting.Asset))
}
if !postCommitVolumes.HasAccount(posting.Destination) {
postCommitVolumes.SetVolumes(posting.Destination, posting.Asset,
preCommitVolumes.GetVolumes(posting.Destination, posting.Asset))
}

postCommitVolumes.AddOutput(posting.Source, posting.Asset, posting.Amount)
postCommitVolumes.AddInput(posting.Destination, posting.Asset, posting.Amount)
}

for account, accountVolumes := range postCommitVolumes {
for asset, volumes := range accountVolumes {
aggregatedVolumes.SetVolumes(account, asset, volumes)
}
}

preCommitVolumesData, err := json.Marshal(preCommitVolumes)
if err != nil {
return err
}

postCommitVolumesData, err := json.Marshal(postCommitVolumes)
if err != nil {
return err
}

ub := sqlbuilder.NewUpdateBuilder()
ub.Update(schema.Table("transactions"))
ub.Set(
ub.Assign("pre_commit_volumes", preCommitVolumesData),
ub.Assign("post_commit_volumes", postCommitVolumesData),
)
ub.Where(ub.E("id", tx.ID))

updates = append(updates, ub)
}
err = rows.Close()
if err != nil {
return err
}

for _, update := range updates {
sqlq, args := update.BuildWithFlavor(schema.Flavor())

_, err = sqlTx.ExecContext(ctx, sqlq, args...)
if err != nil {
return errors.Wrap(err, "executing update")
}
}

return nil
}
Loading

0 comments on commit 9f04683

Please sign in to comment.