Skip to content

Commit

Permalink
feat: store worker is now configurable (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent 2a354fe commit f8b162f
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
web: bin/ledger server start --server.http.bind_address 0.0.0.0:${PORT} --storage.driver postgres --storage.postgres.conn_string ${DATABASE_URL} --lock-strategy redis --lock-strategy-redis-url ${REDIS_URL}
web: bin/ledger server start --server.http.bind_address 0.0.0.0:${PORT} --storage.driver postgres --storage-postgres-conn-string ${DATABASE_URL} --lock-strategy redis --lock-strategy-redis-url ${REDIS_URL}
8 changes: 1 addition & 7 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
api.Module(api.Config{
Version: Version,
}),
sqlstorage.DriverModule(sqlstorage.ModuleConfig{
PostgresConfig: func() *sqlstorage.PostgresConfig {
return &sqlstorage.PostgresConfig{
ConnString: v.GetString(storagePostgresConnectionStringFlag),
}
}(),
}),
sqlstorage.CLIDriverModule(v),
internal.NewAnalyticsModule(v, Version),
ledger.Module(v.GetString(commitPolicyFlag) == "allow-past-timestamps"),
)
Expand Down
6 changes: 3 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/pkg/storage/sqlstorage"
_ "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger/migrates/9-add-pre-post-volumes"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
Expand All @@ -15,8 +16,7 @@ import (
)

const (
storagePostgresConnectionStringFlag = "storage.postgres.conn_string"
bindFlag = "bind"
bindFlag = "bind"

commitPolicyFlag = "commit-policy"
)
Expand Down Expand Up @@ -57,14 +57,14 @@ func NewRootCommand() *cobra.Command {
root.AddCommand(NewDocCommand())

root.PersistentFlags().Bool(service.DebugFlag, false, "Debug mode")
root.PersistentFlags().String(storagePostgresConnectionStringFlag, "postgresql://localhost/postgres", "Postgre connection string")
root.PersistentFlags().String(bindFlag, "0.0.0.0:3068", "API bind address")
root.PersistentFlags().String(commitPolicyFlag, "", "Transaction commit policy (default or allow-past-timestamps)")

otlpmetrics.InitOTLPMetricsFlags(root.PersistentFlags())
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey)
publish.InitCLIFlags(root)
sqlstorage.InitCLIFlags(root)

if err := viper.BindPFlags(root.PersistentFlags()); err != nil {
panic(err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"testing"

"github.com/formancehq/ledger/pkg/storage/sqlstorage"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
"github.com/google/uuid"
"github.com/spf13/viper"
Expand All @@ -12,7 +13,7 @@ import (
func Test_StorageCommands(t *testing.T) {
db := pgtesting.NewPostgresDatabase(t)

viper.Set(storagePostgresConnectionStringFlag, db.ConnString())
viper.Set(sqlstorage.StoragePostgresConnectionStringFlag, db.ConnString())

require.NoError(t, NewStorageList().Execute())

Expand Down
33 changes: 31 additions & 2 deletions pkg/storage/sqlstorage/module.go → pkg/storage/sqlstorage/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,36 @@ import (
"database/sql"

"github.com/formancehq/ledger/pkg/storage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/schema"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/worker"
"github.com/formancehq/stack/libs/go-libs/health"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"go.uber.org/fx"
)

const (
StoreWorkerMaxPendingSize = "store-worker-max-pending-size"
StoreWorkerMaxWriteChanSize = "store-worker-max-write-chan-size"
StoragePostgresConnectionStringFlag = "storage-postgres-conn-string"
)

func InitCLIFlags(cmd *cobra.Command) {
cmd.PersistentFlags().Int(StoreWorkerMaxPendingSize, 0, "Max pending size for store worker")
cmd.PersistentFlags().Int(StoreWorkerMaxWriteChanSize, 1024, "Max write channel size for store worker")
cmd.PersistentFlags().String(StoragePostgresConnectionStringFlag, "postgresql://localhost/postgres", "Postgres connection string")
}

type PostgresConfig struct {
ConnString string
}

type ModuleConfig struct {
PostgresConfig *PostgresConfig
StoreConfig ledgerstore.StoreConfig
}

func OpenSQLDB(dataSourceName string) (*bun.DB, error) {
Expand All @@ -34,7 +51,19 @@ func OpenSQLDB(dataSourceName string) (*bun.DB, error) {
return db, nil
}

func DriverModule(cfg ModuleConfig) fx.Option {
func CLIDriverModule(v *viper.Viper) fx.Option {
cfg := ModuleConfig{
PostgresConfig: &PostgresConfig{
ConnString: v.GetString(StoragePostgresConnectionStringFlag),
},
StoreConfig: ledgerstore.StoreConfig{
StoreWorkerConfig: worker.WorkerConfig{
MaxPendingSize: v.GetInt(StoreWorkerMaxPendingSize),
MaxWriteChanSize: v.GetInt(StoreWorkerMaxWriteChanSize),
},
},
}

options := make([]fx.Option, 0)

options = append(options, fx.Provide(func() (*bun.DB, error) {
Expand All @@ -44,7 +73,7 @@ func DriverModule(cfg ModuleConfig) fx.Option {
return schema.NewPostgresDB(db)
}))
options = append(options, fx.Provide(func(db schema.DB) (*Driver, error) {
return NewDriver("postgres", db), nil
return NewDriver("postgres", db, cfg.StoreConfig), nil
}))
options = append(options, health.ProvideHealthCheck(func(db *bun.DB) health.NamedCheck {
return health.NewNamedCheck("postgres", health.CheckFn(db.PingContext))
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Driver struct {
systemStore *systemstore.Store
registeredLedgers map[string]storage.LedgerStore
lock sync.Mutex
storeConfig ledgerstore.StoreConfig
}

func (d *Driver) GetSystemStore() storage.SystemStore {
Expand Down Expand Up @@ -121,7 +122,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
return schema.Close(context.Background())
}, func(ctx context.Context) error {
return d.GetSystemStore().DeleteLedger(ctx, name)
})
}, d.storeConfig)
if err != nil {
return nil, false, errors.Wrap(err, "creating ledger store")
}
Expand Down Expand Up @@ -169,11 +170,12 @@ func (d *Driver) Close(ctx context.Context) error {
return d.db.Close(ctx)
}

func NewDriver(name string, db schema.DB) *Driver {
func NewDriver(name string, db schema.DB, storeConfig ledgerstore.StoreConfig) *Driver {
return &Driver{
db: db,
name: name,
registeredLedgers: map[string]storage.LedgerStore{},
storeConfig: storeConfig,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/sqlstorage/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/sqlstorage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/schema"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
Expand Down Expand Up @@ -37,7 +38,7 @@ func newLedgerStore(t *testing.T) storage.LedgerStore {
require.NoError(t, db.Close())
})

driver := sqlstorage.NewDriver("postgres", schema.NewPostgresDB(db))
driver := sqlstorage.NewDriver("postgres", schema.NewPostgresDB(db), ledgerstore.DefaultStoreConfig)
require.NoError(t, driver.Initialize(context.Background()))

ledgerStore, _, err := driver.GetLedgerStore(context.Background(), uuid.NewString(), true)
Expand Down
22 changes: 18 additions & 4 deletions pkg/storage/sqlstorage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
type Store struct {
schema schema.Schema
metricsRegistry *metrics.SQLStorageMetricsRegistry
storeConfig StoreConfig
onClose func(ctx context.Context) error
onDelete func(ctx context.Context) error

Expand All @@ -33,6 +34,16 @@ type Store struct {
isInitialized bool
}

type StoreConfig struct {
StoreWorkerConfig worker.WorkerConfig
}

var (
DefaultStoreConfig = StoreConfig{
StoreWorkerConfig: worker.DefaultConfig,
}
)

func (s *Store) Schema() schema.Schema {
return s.schema
}
Expand Down Expand Up @@ -86,6 +97,7 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context
schema.NewSchema(tx.Tx, s.schema.Name()),
s.onClose,
s.onDelete,
s.storeConfig,
)
if err != nil {
return errors.Wrap(err, "creating new store")
Expand Down Expand Up @@ -124,14 +136,16 @@ func NewStore(
ctx context.Context,
schema schema.Schema,
onClose, onDelete func(ctx context.Context) error,
storeConfig StoreConfig,
) (*Store, error) {
s := &Store{
schema: schema,
onClose: onClose,
onDelete: onDelete,
schema: schema,
onClose: onClose,
onDelete: onDelete,
storeConfig: storeConfig,
}

logsBatchWorker := worker.NewWorker(s.batchLogs)
logsBatchWorker := worker.NewWorker(s.batchLogs, storeConfig.StoreWorkerConfig)
s.logsBatchWorker = logsBatchWorker

metricsRegistry, err := metrics.RegisterSQLStorageMetrics(s.schema.Name())
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/sqlstorage/sqlstoragetesting/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sqlstoragetesting

import (
"github.com/formancehq/ledger/pkg/storage/sqlstorage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/schema"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
"github.com/stretchr/testify/require"
Expand All @@ -17,5 +18,5 @@ func StorageDriver(t pgtesting.TestingT) *sqlstorage.Driver {
db.Close()
})

return sqlstorage.NewDriver("postgres", schema.NewPostgresDB(db))
return sqlstorage.NewDriver("postgres", schema.NewPostgresDB(db), ledgerstore.DefaultStoreConfig)
}
18 changes: 15 additions & 3 deletions pkg/storage/sqlstorage/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,24 @@ type Worker[MODEL any] struct {
stopChan chan chan struct{}
}

func NewWorker[MODEL any](workerJob Job[MODEL]) *Worker[MODEL] {
type WorkerConfig struct {
MaxPendingSize int
MaxWriteChanSize int
}

var (
DefaultConfig = WorkerConfig{
MaxPendingSize: 0,
MaxWriteChanSize: 1024,
}
)

func NewWorker[MODEL any](workerJob Job[MODEL], cfg WorkerConfig) *Worker[MODEL] {
return &Worker[MODEL]{
workerJob: workerJob,
pending: make([]modelsHolder[MODEL], 0), // TODO(gfyrag): we need to limit the worker capacity
pending: make([]modelsHolder[MODEL], cfg.MaxPendingSize),
jobs: make(chan []modelsHolder[MODEL]),
writeChannel: make(chan modelsHolder[MODEL], 1024), // TODO(gfyrag): Make configurable
writeChannel: make(chan modelsHolder[MODEL], cfg.MaxWriteChanSize),
releasedJob: make(chan struct{}, 1),
stopChan: make(chan chan struct{}, 1),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/sqlstorage/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestSimpleWorker(t *testing.T) {
ctx := context.Background()
db := NewMockDB()

w := worker.NewWorker(db.Write)
w := worker.NewWorker(db.Write, worker.DefaultConfig)
go w.Run(ctx)
defer func() {
require.NoError(t, w.Stop(context.Background()))
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestBatchWorker(t *testing.T) {
ctx := context.Background()
db := NewMockDB()

w := worker.NewWorker(db.Write)
w := worker.NewWorker(db.Write, worker.DefaultConfig)
go w.Run(ctx)

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down

0 comments on commit f8b162f

Please sign in to comment.