Skip to content

Commit

Permalink
fix: fix memory leak with store logs worker not closed (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent 2f9190d commit ea97ba7
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
return schema.Close(context.Background())
}, func(ctx context.Context) error {
return d.GetSystemStore().DeleteLedger(ctx, name)
}, d.storeConfig)
}, d.storeConfig, true)
if err != nil {
return nil, false, errors.Wrap(err, "creating ledger store")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/sqlstorage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *Store) batchLogs(ctx context.Context, logs []*core.Log) error {
}

func (s *Store) AppendLog(ctx context.Context, log *core.Log) error {
if !s.isInitialized {
if !s.isInitialized || s.logsBatchWorker == nil {
return storageerrors.StorageError(storage.ErrStoreNotInitialized)
}
recordMetrics := s.instrumentalized(ctx, "append_log")
Expand Down
32 changes: 23 additions & 9 deletions pkg/storage/sqlstorage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (s *Store) Initialize(ctx context.Context) (bool, error) {
}

func (s *Store) Close(ctx context.Context) error {
if s.logsBatchWorker != nil {
if err := s.logsBatchWorker.Stop(ctx); err != nil {
return err
}
}

return s.onClose(ctx)
}

Expand All @@ -98,9 +104,14 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context
newStore, err := NewStore(
ctx,
schema.NewSchema(tx.Tx, s.schema.Name()),
s.onClose,
s.onDelete,
func(ctx context.Context) error {
return nil
},
func(ctx context.Context) error {
return nil
},
s.storeConfig,
false,
)
if err != nil {
return errors.Wrap(err, "creating new store")
Expand All @@ -110,6 +121,7 @@ func (s *Store) RunInTransaction(ctx context.Context, f func(ctx context.Context

defer func() {
_ = tx.Rollback()
_ = newStore.Close(context.Background())
}()

err = f(ctx, newStore)
Expand Down Expand Up @@ -138,6 +150,7 @@ func NewStore(
schema schema.Schema,
onClose, onDelete func(ctx context.Context) error,
storeConfig StoreConfig,
createLogsWorker bool,
) (*Store, error) {
s := &Store{
schema: schema,
Expand All @@ -146,19 +159,20 @@ func NewStore(
storeConfig: storeConfig,
}

logsBatchWorker := worker.NewWorker(s.batchLogs, storeConfig.StoreWorkerConfig)
s.logsBatchWorker = logsBatchWorker

metricsRegistry, err := metrics.RegisterSQLStorageMetrics(s.schema.Name())
if err != nil {
return nil, errors.Wrap(err, "registering metrics")
}
s.metricsRegistry = metricsRegistry

go logsBatchWorker.Run(logging.ContextWithLogger(
context.Background(),
logging.FromContext(ctx),
))
if createLogsWorker {
logsBatchWorker := worker.NewWorker(s.batchLogs, storeConfig.StoreWorkerConfig)
s.logsBatchWorker = logsBatchWorker
go logsBatchWorker.Run(logging.ContextWithLogger(
context.Background(),
logging.FromContext(ctx),
))
}

return s, nil
}
Expand Down

0 comments on commit ea97ba7

Please sign in to comment.