Skip to content

Commit

Permalink
feat: Skip init if DB is not initialized (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent ddba008 commit ff13b68
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 24 deletions.
1 change: 0 additions & 1 deletion pkg/api/controllers/transaction_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,6 @@ func TestPostTransactionsPreview(t *testing.T) {

internal.RunTest(t, func(api chi.Router, driver storage.Driver) {
store := internal.GetLedgerStore(t, driver, context.Background())

t.Run("postings true", func(t *testing.T) {
rsp := internal.PostTransaction(t, api, controllers.PostTransactionRequest{
Postings: core.Postings{
Expand Down
6 changes: 4 additions & 2 deletions pkg/ledger/query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (w *Worker) Stop(ctx context.Context) error {
}

func (w *Worker) initLedgers(ctx context.Context) error {

ledgers, err := w.driver.GetSystemStore().ListLedgers(ctx)
if err != nil {
return err
Expand All @@ -143,7 +142,6 @@ func (w *Worker) initLedgers(ctx context.Context) error {
}

func (w *Worker) initLedger(ctx context.Context, ledger string) error {

store, _, err := w.driver.GetLedgerStore(ctx, ledger, false)
if err != nil && err != storage.ErrLedgerStoreNotFound {
return err
Expand All @@ -152,6 +150,10 @@ func (w *Worker) initLedger(ctx context.Context, ledger string) error {
return nil
}

if !store.IsInitialized() {
return nil
}

lastReadLogID, err := store.GetNextLogID(ctx)
if err != nil {
return errors.Wrap(err, "reading last log")
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type LedgerStore interface {
Delete(ctx context.Context) error
Initialize(ctx context.Context) (bool, error)
Close(ctx context.Context) error
IsInitialized() bool
Name() string

GetNextLogID(ctx context.Context) (uint64, error)
Expand Down
22 changes: 8 additions & 14 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Driver struct {
name string
db schema.DB
systemStore *systemstore.Store
registeredLedgers map[string]struct{}
registeredLedgers map[string]storage.LedgerStore
lock sync.Mutex
}

Expand All @@ -91,7 +91,6 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
var (
created bool
schema schema.Schema
err error
)
if _, exists := d.registeredLedgers[name]; !exists {
systemStore := d.systemStore
Expand All @@ -118,19 +117,14 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
return nil, false, err
}

d.registeredLedgers[name] = struct{}{}
} else {
schema, err = d.db.Schema(ctx, name)
if err != nil {
return nil, false, errors.Wrap(err, "opening schema")
}
d.registeredLedgers[name] = ledgerstore.NewStore(ctx, schema, func(ctx context.Context) error {
return schema.Close(context.Background())
}, func(ctx context.Context) error {
return d.GetSystemStore().DeleteLedger(ctx, name)
})
}

return ledgerstore.NewStore(ctx, schema, func(ctx context.Context) error {
return schema.Close(context.Background())
}, func(ctx context.Context) error {
return d.GetSystemStore().DeleteLedger(ctx, name)
}), created, nil
return d.registeredLedgers[name], created, nil
}

func (d *Driver) Name() string {
Expand Down Expand Up @@ -174,7 +168,7 @@ func NewDriver(name string, db schema.DB) *Driver {
return &Driver{
db: db,
name: name,
registeredLedgers: map[string]struct{}{},
registeredLedgers: map[string]storage.LedgerStore{},
}
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/storage/sqlstorage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (s *Store) buildAccountsQuery(ctx context.Context, p storage.AccountsQuery)
}

func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.Cursor[core.Account], error) {
if !s.isInitialized {
return api.Cursor[core.Account]{}, ErrStoreNotInitialized
}

accounts := make([]core.Account, 0)

if q.PageSize == 0 {
Expand Down Expand Up @@ -178,6 +182,10 @@ func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.C
}

func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

query := s.schema.NewSelect(accountsTableName).
Model((*Accounts)(nil)).
Where("address = ?", addr).
Expand Down Expand Up @@ -274,16 +282,28 @@ func (s *Store) getAccountWithVolumes(ctx context.Context, exec interface {
}

func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

return s.getAccountWithVolumes(ctx, s.schema, account)
}

func (s *Store) CountAccounts(ctx context.Context, q storage.AccountsQuery) (uint64, error) {
if !s.isInitialized {
return 0, ErrStoreNotInitialized
}

sb, _ := s.buildAccountsQuery(ctx, q)
count, err := sb.Count(ctx)
return uint64(count), s.error(err)
}

func (s *Store) EnsureAccountExists(ctx context.Context, account string) error {
if !s.isInitialized {
return ErrStoreNotInitialized
}

a := &Accounts{
Address: account,
Metadata: make(map[string]interface{}),
Expand All @@ -298,6 +318,10 @@ func (s *Store) EnsureAccountExists(ctx context.Context, account string) error {
}

func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata) error {
if !s.isInitialized {
return ErrStoreNotInitialized
}

a := &Accounts{
Address: address,
Metadata: metadata,
Expand All @@ -313,6 +337,10 @@ func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metad
}

func (s *Store) ComputeAccount(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

tx, err := s.schema.BeginTx(ctx, &sql.TxOptions{
ReadOnly: true,
})
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/sqlstorage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type BalancesPaginationToken struct {
}

func (s *Store) GetBalancesAggregated(ctx context.Context, q storage.BalancesQuery) (core.AssetsBalances, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

sb := s.schema.NewSelect(volumesTableName).
Model((*Volumes)(nil)).
ColumnExpr("asset").
Expand Down Expand Up @@ -64,6 +68,10 @@ func (s *Store) GetBalancesAggregated(ctx context.Context, q storage.BalancesQue
}

func (s *Store) GetBalances(ctx context.Context, q storage.BalancesQuery) (api.Cursor[core.AccountsBalances], error) {
if !s.isInitialized {
return api.Cursor[core.AccountsBalances]{}, ErrStoreNotInitialized
}

sb := s.schema.NewSelect(volumesTableName).
Model((*Volumes)(nil)).
ColumnExpr("account").
Expand Down
32 changes: 32 additions & 0 deletions pkg/storage/sqlstorage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,18 @@ func (s *Store) batchLogs(ctx context.Context, logs []*core.Log) error {
}

func (s *Store) AppendLog(ctx context.Context, log *core.Log) error {
if !s.isInitialized {
return ErrStoreNotInitialized
}

return <-s.logsBatchWorker.WriteModels(ctx, log)
}

func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

raw := &LogsV2{}
err := s.schema.NewSelect(LogTableName).
Model(raw).
Expand Down Expand Up @@ -154,6 +162,10 @@ func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
}

func (s *Store) GetLogs(ctx context.Context, q *storage.LogsQuery) (api.Cursor[core.Log], error) {
if !s.isInitialized {
return api.Cursor[core.Log]{}, ErrStoreNotInitialized
}

res := []core.Log{}

if q.PageSize == 0 {
Expand Down Expand Up @@ -276,10 +288,18 @@ func (s *Store) getNextLogID(ctx context.Context, sq interface {
}

func (s *Store) GetNextLogID(ctx context.Context) (uint64, error) {
if !s.isInitialized {
return 0, ErrStoreNotInitialized
}

return s.getNextLogID(ctx, &s.schema)
}

func (s *Store) ReadLogsStartingFromID(ctx context.Context, id uint64) ([]core.Log, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

return s.readLogsStartingFromID(ctx, &s.schema, id)
}

Expand Down Expand Up @@ -316,6 +336,10 @@ func (s *Store) readLogsStartingFromID(ctx context.Context, exec interface {
}

func (s *Store) UpdateNextLogID(ctx context.Context, id uint64) error {
if !s.isInitialized {
return ErrStoreNotInitialized
}

_, err := s.schema.
NewInsert(LogIngestionTableName).
Model(&LogsIngestion{
Expand All @@ -328,6 +352,10 @@ func (s *Store) UpdateNextLogID(ctx context.Context, id uint64) error {
}

func (s *Store) ReadLogWithReference(ctx context.Context, reference string) (*core.Log, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

raw := &LogsV2{}
err := s.schema.
NewSelect(LogTableName).
Expand All @@ -353,6 +381,10 @@ func (s *Store) ReadLogWithReference(ctx context.Context, reference string) (*co
}

func (s *Store) ReadLastLogWithType(ctx context.Context, logTypes ...core.LogType) (*core.Log, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
}

raw := &LogsV2{}
err := s.schema.
NewSelect(LogTableName).
Expand Down
Loading

0 comments on commit ff13b68

Please sign in to comment.