Skip to content

Commit

Permalink
feat: add waiting on cqrs to have a synchrone api (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent 402b024 commit b5eda90
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
24 changes: 21 additions & 3 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ func (l *Ledger) CreateTransaction(ctx context.Context, dryRun bool, script core
return core.NewTransactionLog(expandedTx.Transaction, accountMetadata)
})
if err == nil && !dryRun {
l.queryWorker.QueueLog(ctx, log, l.store)
// Wait for CQRS ingestion
// TODO(polo/gfyrag): add possiblity to disable this via request param
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.queryWorker.QueueLog(ctx, log, l.store):
}
}

return tx, err
Expand Down Expand Up @@ -106,7 +112,13 @@ func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.Expand
return core.NewRevertedTransactionLog(expandedTx.Timestamp, revertedTx.ID, expandedTx.Transaction)
})
if err == nil {
l.queryWorker.QueueLog(ctx, log, l.store)
// Wait for CQRS ingestion
// TODO(polo/gfyrag): add possiblity to disable this via request param
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.queryWorker.QueueLog(ctx, log, l.store):
}
}

return tx, err
Expand Down Expand Up @@ -188,7 +200,13 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter

err = l.store.AppendLog(ctx, &log)
if err == nil {
l.queryWorker.QueueLog(ctx, log, l.store)
// Wait for CQRS ingestion
// TODO(polo/gfyrag): add possiblity to disable this via request param
select {
case <-ctx.Done():
return ctx.Err()
case <-l.queryWorker.QueueLog(ctx, log, l.store):
}
}

return err
Expand Down
27 changes: 19 additions & 8 deletions pkg/ledger/query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ type workerConfig struct {
ChanSize int
}

type workerLog struct {
log core.Log
store storage.LedgerStore
type logHolder struct {
log core.Log
store storage.LedgerStore
waitChan chan struct{}
}

type Worker struct {
workerConfig
ctx context.Context
logChan chan workerLog
logChan chan logHolder
stopChan chan chan struct{}
driver storage.Driver
monitor Monitor
Expand Down Expand Up @@ -85,6 +86,7 @@ func (w *Worker) run() error {
return nil
case wl := <-w.logChan:
if w.lastProcessedLogID != nil && wl.log.ID <= *w.lastProcessedLogID {
close(wl.waitChan)
continue
}
if err := w.processLog(w.ctx, wl.store, wl.log); err != nil {
Expand All @@ -94,6 +96,7 @@ func (w *Worker) run() error {
logging.FromContext(w.ctx).Errorf("CQRS worker error: %s", err)
}

close(wl.waitChan)
// Return the error to restart the worker
return err
}
Expand All @@ -103,8 +106,11 @@ func (w *Worker) run() error {

// TODO(polo/gfyrag): add indempotency tests
// Return the error to restart the worker
close(wl.waitChan)
return err
}

close(wl.waitChan)
}
}
}
Expand Down Expand Up @@ -285,17 +291,22 @@ func (w *Worker) processLog(ctx context.Context, store storage.LedgerStore, log
return err
}

func (w *Worker) QueueLog(ctx context.Context, log core.Log, store storage.LedgerStore) {
func (w *Worker) QueueLog(ctx context.Context, log core.Log, store storage.LedgerStore) <-chan struct{} {
waitChan := make(chan struct{})

select {
case <-w.ctx.Done():
return
case w.logChan <- workerLog{log, store}:
close(waitChan)
return waitChan
case w.logChan <- logHolder{log, store, waitChan}:
}

return waitChan
}

func NewWorker(config workerConfig, driver storage.Driver, monitor Monitor) *Worker {
return &Worker{
logChan: make(chan workerLog, config.ChanSize),
logChan: make(chan logHolder, config.ChanSize),
stopChan: make(chan chan struct{}),
workerConfig: config,
driver: driver,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestWorker(t *testing.T) {
}
for _, log := range logs {
require.NoError(t, ledgerStore.AppendLog(context.Background(), &log))
worker.QueueLog(context.Background(), log, ledgerStore)
<-worker.QueueLog(context.Background(), log, ledgerStore)
}
require.Eventually(t, func() bool {
nextLogID, err := ledgerStore.GetNextLogID(context.Background())
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/sqlstorage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func NewStore(
logsBatchWorker := worker.NewWorker(s.batchLogs)
s.logsBatchWorker = logsBatchWorker

go logsBatchWorker.Run(ctx)
go logsBatchWorker.Run(logging.ContextWithLogger(
context.Background(),
logging.FromContext(ctx),
))

return s
}
Expand Down

0 comments on commit b5eda90

Please sign in to comment.