Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest: Fix deadlock in parallel ingestion #5263

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/fsm_history_range_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func (h historyRangeState) run(s *system) (transition, error) {
ledgers = append(ledgers, ledgerCloseMeta)

if len(ledgers) == cap(ledgers) {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil {
return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
ledgers = ledgers[0:0]
}
}

if len(ledgers) > 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, false); err != nil {
return start(), errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ func (reingestHistoryRangeState) GetState() State {
return ReingestHistoryRange
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32) error {
if s.historyQ.GetTx() == nil {
return errors.New("expected transaction to be present")
}

func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger uint32, execBatchInTx bool) error {
if s.maxLedgerPerFlush < 1 {
return errors.New("invalid maxLedgerPerFlush, must be greater than 0")
}
Expand Down Expand Up @@ -75,15 +71,15 @@ func (h reingestHistoryRangeState) ingestRange(s *system, fromLedger, toLedger u
ledgers = append(ledgers, ledgerCloseMeta)

if len(ledgers)%int(s.maxLedgerPerFlush) == 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil {
return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
ledgers = ledgers[0:0]
}
}

if len(ledgers) > 0 {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers); err != nil {
if err = s.runner.RunTransactionProcessorsOnLedgers(ledgers, execBatchInTx); err != nil {
return errors.Wrapf(err, "error processing ledger range %d - %d", ledgers[0].LedgerSequence(), ledgers[len(ledgers)-1].LedgerSequence())
}
}
Expand Down Expand Up @@ -142,7 +138,7 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
return stop(), errors.Wrap(err, getLastIngestedErrMsg)
}

if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger); ingestErr != nil {
if ingestErr := h.ingestRange(s, h.fromLedger, h.toLedger, false); ingestErr != nil {
if err := s.historyQ.Commit(); err != nil {
return stop(), errors.Wrap(ingestErr, commitErrMsg)
}
Expand All @@ -169,18 +165,9 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
}

startTime = time.Now()
if err := s.historyQ.Begin(s.ctx); err != nil {
return stop(), errors.Wrap(err, "Error starting a transaction")
}
defer s.historyQ.Rollback()

if e := h.ingestRange(s, h.fromLedger, h.toLedger); e != nil {
if e := h.ingestRange(s, h.fromLedger, h.toLedger, true); e != nil {
return stop(), e
}

if e := s.historyQ.Commit(); e != nil {
return stop(), errors.Wrap(e, commitErrMsg)
}
}

log.WithFields(logpkg.F{
Expand Down
63 changes: 40 additions & 23 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,49 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error {
return nil
}

type groupLoaders struct {
lazyLoaders []horizonLazyLoader
runDurations runDurations
stats map[string]history.LoaderStats
}

func newGroupLoaders(lazyLoaders []horizonLazyLoader) groupLoaders {
return groupLoaders{
lazyLoaders: lazyLoaders,
runDurations: make(map[string]time.Duration),
stats: make(map[string]history.LoaderStats),
}
}

func (g groupLoaders) Flush(ctx context.Context, session db.SessionInterface, execInTx bool) error {
if execInTx {
if err := session.Begin(ctx); err != nil {
return err
}
defer session.Rollback()
}

for _, loader := range g.lazyLoaders {
startTime := time.Now()
if err := loader.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader)
}
name := loader.Name()
g.runDurations.AddRunDuration(name, startTime)
g.stats[name] = loader.Stats()
}

if execInTx {
if err := session.Commit(); err != nil {
return err
}
}
return nil
}

type groupTransactionProcessors struct {
processors []horizonTransactionProcessor
lazyLoaders []horizonLazyLoader
processorsRunDurations runDurations
loaderRunDurations runDurations
loaderStats map[string]history.LoaderStats
transactionStatsProcessor *processors.StatsLedgerTransactionProcessor
tradeProcessor *processors.TradeProcessor
}
Expand All @@ -76,17 +113,13 @@ type groupTransactionProcessors struct {
//
// so group processing will reset stats as needed
func newGroupTransactionProcessors(processors []horizonTransactionProcessor,
lazyLoaders []horizonLazyLoader,
transactionStatsProcessor *processors.StatsLedgerTransactionProcessor,
tradeProcessor *processors.TradeProcessor,
) *groupTransactionProcessors {

return &groupTransactionProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
loaderRunDurations: make(map[string]time.Duration),
loaderStats: make(map[string]history.LoaderStats),
lazyLoaders: lazyLoaders,
transactionStatsProcessor: transactionStatsProcessor,
tradeProcessor: tradeProcessor,
}
Expand All @@ -104,20 +137,6 @@ func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta,
}

func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error {
// need to trigger all lazy loaders to now resolve their future placeholders
// with real db values first
for _, loader := range g.lazyLoaders {
startTime := time.Now()
if err := loader.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader)
}
name := loader.Name()
g.loaderRunDurations.AddRunDuration(name, startTime)
g.loaderStats[name] = loader.Stats()
}

// now flush each processor which may call loader.GetNow(), which
// required the prior loader.Exec() to have been called.
for _, p := range g.processors {
startTime := time.Now()
if err := p.Flush(ctx, session); err != nil {
Expand All @@ -130,8 +149,6 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio

func (g *groupTransactionProcessors) ResetStats() {
g.processorsRunDurations = make(map[string]time.Duration)
g.loaderRunDurations = make(map[string]time.Duration)
g.loaderStats = make(map[string]history.LoaderStats)
if g.tradeProcessor != nil {
g.tradeProcessor.ResetStats()
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() {
s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{
s.processorA,
s.processorB,
}, nil, statsProcessor, tradesProcessor)
}, statsProcessor, tradesProcessor)
s.session = &db.MockSession{}
}

Expand Down
Loading
Loading