Skip to content

Commit

Permalink
feat: remove redundant postings denormalization
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 33a6544 commit 97c9d75
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 394 deletions.
7 changes: 5 additions & 2 deletions cmd/container.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"io"

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/pkg/api"
"github.com/formancehq/ledger/pkg/bus"
Expand All @@ -16,10 +18,11 @@ import (

const ServiceName = "ledger"

func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
options := make([]fx.Option, 0)
options = append(options, fx.NopLogger)

v := viper.GetViper()
debug := v.GetBool(service.DebugFlag)
if debug {
sqlstorage.InstrumentalizeSQLDriver()
Expand All @@ -33,7 +36,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
api.Module(api.Config{
Version: Version,
}),
sqlstorage.CLIDriverModule(v),
sqlstorage.CLIDriverModule(v, output),
internal.NewAnalyticsModule(v, Version),
ledger.Module(ledger.Configuration{
AllowPastTimestamp: v.GetString(commitPolicyFlag) == "allow-past-timestamps",
Expand Down
3 changes: 1 addition & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (
cacheEvictionPeriodFlag = "cache-eviction-period"
cacheEvictionRetainDelay = "cache-eviction-retain-delay"
queryLimitReadLogsFlag = "query-limit-read-logs"

ballastSizeInBytesFlag = "ballast-size"
)

Expand All @@ -24,7 +23,7 @@ func NewServe() *cobra.Command {
Use: "serve",
RunE: func(cmd *cobra.Command, args []string) error {
return app.New(cmd.OutOrStdout(), resolveOptions(
viper.GetViper(),
cmd.OutOrStdout(),
ballast.Module(viper.GetUint(ballastSizeInBytesFlag)),
fx.Invoke(func(lc fx.Lifecycle, h chi.Router) {
lc.Append(httpserver.NewHook(viper.GetString(bindFlag), h))
Expand Down
130 changes: 70 additions & 60 deletions cmd/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,32 @@ func NewStorageInit() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
app := service.New(
cmd.OutOrStdout(),
resolveOptions(viper.GetViper(), fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := viper.GetString("name")
if name == "" {
return errors.New("name is empty")
}
s, created, err := storageDriver.GetLedgerStore(ctx, name, true)
if err != nil {
return err
}
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := viper.GetString("name")
if name == "" {
return errors.New("name is empty")
}
s, created, err := storageDriver.GetLedgerStore(ctx, name, true)
if err != nil {
return err
}

if !created {
return nil
}
if !created {
return nil
}

_, err = s.Initialize(ctx)
if err != nil {
return err
}
return nil
},
})
}))...,
_, err = s.Initialize(ctx)
if err != nil {
return err
}
return nil
},
})
}))...,
)
return app.Start(cmd.Context())
},
Expand All @@ -69,7 +71,8 @@ func NewStorageList() *cobra.Command {
Use: "list",
RunE: func(cmd *cobra.Command, args []string) error {
app := service.New(cmd.OutOrStdout(),
resolveOptions(viper.GetViper(),
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
Expand Down Expand Up @@ -100,27 +103,29 @@ func NewStorageUpgrade() *cobra.Command {
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
app := service.New(cmd.OutOrStdout(),
resolveOptions(viper.GetViper(), fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
store, _, err := storageDriver.GetLedgerStore(ctx, name, false)
if err != nil {
return err
}
modified, err := store.Initialize(ctx)
if err != nil {
return err
}
if modified {
logging.FromContext(ctx).Infof("Storage '%s' upgraded", name)
} else {
logging.FromContext(ctx).Infof("Storage '%s' is up to date", name)
}
return nil
},
})
}))...,
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
store, _, err := storageDriver.GetLedgerStore(ctx, name, false)
if err != nil {
return err
}
modified, err := store.Initialize(ctx)
if err != nil {
return err
}
if modified {
logging.FromContext(ctx).Infof("Storage '%s' upgraded", name)
} else {
logging.FromContext(ctx).Infof("Storage '%s' is up to date", name)
}
return nil
},
})
}))...,
)
return app.Start(cmd.Context())
},
Expand Down Expand Up @@ -178,7 +183,10 @@ func NewStorageScan() *cobra.Command {
})
})

app := service.New(cmd.OutOrStdout(), resolveOptions(viper.GetViper(), opt)...)
app := service.New(cmd.OutOrStdout(), resolveOptions(
cmd.OutOrStdout(),
opt,
)...)
return app.Start(cmd.Context())
},
}
Expand All @@ -192,21 +200,23 @@ func NewStorageDelete() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
app := service.New(
cmd.OutOrStdout(),
resolveOptions(viper.GetViper(), fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
store, _, err := storageDriver.GetLedgerStore(ctx, name, false)
if err != nil {
return err
}
if err := store.Delete(ctx); err != nil {
return err
}
return nil
},
})
}))...,
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver storage.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
store, _, err := storageDriver.GetLedgerStore(ctx, name, false)
if err != nil {
return err
}
if err := store.Delete(ctx); err != nil {
return err
}
return nil
},
})
}))...,
)
return app.Start(cmd.Context())
},
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/stretchr/testify v1.8.2
github.com/uptrace/bun v1.1.12
github.com/uptrace/bun/dialect/pgdialect v1.1.12
github.com/uptrace/bun/extra/bunbig v1.1.12
github.com/uptrace/bun/extra/bundebug v1.1.12
go.nhat.io/otelsql v0.9.0
go.opentelemetry.io/otel v1.14.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0=
github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk=
github.com/uptrace/bun/dialect/pgdialect v1.1.12/go.mod h1:Ij6WIxQILxLlL2frUBxUBOZJtLElD2QQNDcu/PWDHTc=
github.com/uptrace/bun/extra/bunbig v1.1.12 h1:Dg/bCwO30zmOb/KwctsBVIoz+vOBBcKbBgHUlgK9q78=
github.com/uptrace/bun/extra/bunbig v1.1.12/go.mod h1:EU3WwCvNYFpJjCUI0EKTPVRlYW8kAXy6nUbhOlQl5NE=
github.com/uptrace/bun/extra/bundebug v1.1.12 h1:y8nrHvo7TUCR91kXngWuF7Bk0E1nCTsWzYL1CDEriTo=
github.com/uptrace/bun/extra/bundebug v1.1.12/go.mod h1:psjCrCMf5JaAyivW/A8MDBW5MwIy/jZFBCkIaBgabtM=
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.21 h1:OXsouNDvuET5o1A4uvoCnAXuuNke8JlfZWceciyUlC8=
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (o Order) String() string {
panic("should not happen")
}

func (o Order) Reverse() Order {
return (o + 1) % 2
}

type ColumnPaginatedQuery[FILTERS any] struct {
PageSize uint64 `json:"pageSize"`
Bottom *uint64 `json:"bottom"`
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/sqlstorage/cli.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package sqlstorage

import (
"io"

"github.com/formancehq/ledger/pkg/storage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/schema"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/utils"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/worker"
"github.com/formancehq/stack/libs/go-libs/health"
"github.com/formancehq/stack/libs/go-libs/service"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/uptrace/bun"
Expand Down Expand Up @@ -38,7 +41,7 @@ type ModuleConfig struct {
Debug bool
}

func CLIDriverModule(v *viper.Viper) fx.Option {
func CLIDriverModule(v *viper.Viper, output io.Writer) fx.Option {
cfg := ModuleConfig{
PostgresConfig: &PostgresConfig{
ConnString: v.GetString(StoragePostgresConnectionStringFlag),
Expand All @@ -49,12 +52,13 @@ func CLIDriverModule(v *viper.Viper) fx.Option {
MaxWriteChanSize: v.GetInt(StoreWorkerMaxWriteChanSize),
},
},
Debug: viper.GetBool(service.DebugFlag),
}

options := make([]fx.Option, 0)

options = append(options, fx.Provide(func() (*bun.DB, error) {
return utils.OpenSQLDB(cfg.PostgresConfig.ConnString, cfg.Debug)
return utils.OpenSQLDB(cfg.PostgresConfig.ConnString, cfg.Debug, output)
}))
options = append(options, fx.Provide(func(db *bun.DB) schema.DB {
return schema.NewPostgresDB(db)
Expand Down
Loading

0 comments on commit 97c9d75

Please sign in to comment.