Skip to content

Commit

Permalink
Fix tradeagg rebuild from reingest command with parallel workers (#5168)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Jan 18, 2024
1 parent 3483910 commit 33bf9b6
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 45 deletions.
4 changes: 3 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
## Unreleased

### Fixed
- http archive requests include user agent and metrics ([5166](https://github.com/stellar/go/pull/5166))
- Trade agg rebuild errors reported on `db reingest range` with parellel workers ([5168](https://github.com/stellar/go/pull/5168))
- http archive requests include user agent ([5166](https://github.com/stellar/go/pull/5166))

### Added
- http archive requests include metrics ([5166](https://github.com/stellar/go/pull/5166))
- Add a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))

### Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
}
defer system.Shutdown()

err = system.ReingestRange(ledgerRanges, reingestForce)
err = system.ReingestRange(ledgerRanges, reingestForce, true)
if err != nil {
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (r resumeState) run(s *system) (transition, error) {
}

rebuildStart := time.Now()
err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, ingestLedger, ingestLedger, s.config.RoundingSlippageFilter)
err = s.RebuildTradeAggregationBuckets(ingestLedger, ingestLedger)
if err != nil {
return retryResume(r), errors.Wrap(err, "error rebuilding trade aggregations")
}
Expand Down Expand Up @@ -741,7 +741,7 @@ func (v verifyRangeState) run(s *system) (transition, error) {
Info("Processed ledger")
}

err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, v.fromLedger, v.toLedger, s.config.RoundingSlippageFilter)
err = s.RebuildTradeAggregationBuckets(v.fromLedger, v.toLedger)
if err != nil {
return stop(), errors.Wrap(err, "error rebuilding trade aggregations")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ func (h reingestHistoryRangeState) run(s *system) (transition, error) {
}
}

err := s.historyQ.RebuildTradeAggregationBuckets(s.ctx, h.fromLedger, h.toLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "Error rebuilding trade aggregations")
}

log.WithFields(logpkg.F{
"from": h.fromLedger,
"to": h.toLedger,
Expand Down
40 changes: 20 additions & 20 deletions services/horizon/internal/ingest/ingest_history_range_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,16 @@ func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() {
func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvalidRange() {
// Recreate mock in this single test to remove Rollback assertion.
s.historyQ = &mockDBQ{}
err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{0, 0}}, false, true)
s.Assert().EqualError(err, "Invalid range: {0 0} genesis ledger starts at 1")

err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{0, 100}}, false, true)
s.Assert().EqualError(err, "Invalid range: {0 100} genesis ledger starts at 1")

err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{100, 0}}, false, true)
s.Assert().EqualError(err, "Invalid range: {100 0} from > to")

err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false)
err = s.system.ReingestRange([]history.LedgerRange{{100, 99}}, false, true)
s.Assert().EqualError(err, "Invalid range: {100 99} from > to")
}

Expand All @@ -323,7 +323,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateInvali
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()
s.system.maxLedgerPerFlush = 0
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "invalid maxLedgerPerFlush, must be greater than 0")
}

Expand All @@ -332,28 +332,28 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateBeginR
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), nil).Once()
s.historyQ.On("Begin", s.ctx).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error starting a transaction: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateGetLastLedgerIngestNonBlockingError() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(0), errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error getting last ingested ledger: my error")
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRangeOverlaps() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(190), nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().Equal(ErrReingestRangeConflict{190}, err)
}

func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStatRangeOverlapsAtEnd() {
s.historyQ.On("GetLastLedgerIngestNonBlocking", s.ctx).Return(uint32(200), nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().Equal(ErrReingestRangeConflict{200}, err)
}

Expand All @@ -369,7 +369,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateClearH
"DeleteRangeAll", s.ctx, toidFrom.ToInt64(), toidTo.ToInt64(),
).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "error in DeleteRangeAll: my error")
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateRunTra
s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "error processing ledger range 100 - 100: my error")
}

Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateCommit
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()
}

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().EqualError(err, "Error committing db transaction: my error")
}

Expand Down Expand Up @@ -460,7 +460,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
}

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
s.runner.On("RunTransactionProcessorsOnLedgers", firstLedgersBatch).Return(nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once()
s.system.maxLedgerPerFlush = 60
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, false, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateSucces
s.ledgerBackend.On("GetLedger", s.ctx, uint32(100)).Return(meta, nil).Once()
s.runner.On("RunTransactionProcessorsOnLedgers", []xdr.LedgerCloseMeta{meta}).Return(nil).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false)
err := s.system.ReingestRange([]history.LedgerRange{{100, 100}}, false, true)
s.Assert().NoError(err)
}

Expand All @@ -543,7 +543,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceG
s.historyQ.On("Rollback").Return(nil).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(0), errors.New("my error")).Once()

err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "Error getting last ingested ledger: my error")
}

Expand Down Expand Up @@ -576,7 +576,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForce(
}

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().NoError(err)
}

Expand Down Expand Up @@ -610,7 +610,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL
s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "error getting ledger: my error")
}

Expand Down Expand Up @@ -644,7 +644,7 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceL
s.ledgerBackend.On("GetLedger", s.ctx, uint32(106)).Return(xdr.LedgerCloseMeta{}, errors.New("my error")).Once()

// system.maxLedgerPerFlush has been set by default to 1 in test suite setup
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().EqualError(err, "Error committing db transaction: error getting ledger: my error")
}

Expand Down Expand Up @@ -686,6 +686,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestReingestHistoryRangeStateForceW
s.runner.On("RunTransactionProcessorsOnLedgers", secondLedgersBatch).Return(nil).Once()

s.system.maxLedgerPerFlush = 60
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true)
err := s.system.ReingestRange([]history.LedgerRange{{100, 200}}, true, true)
s.Assert().NoError(err)
}
15 changes: 13 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ type System interface {
StressTest(numTransactions, changesPerTransaction int) error
VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
BuildState(sequence uint32, skipChecks bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool) error
ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error
BuildGenesisState() error
Shutdown()
GetCurrentState() State
RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error
}

type system struct {
Expand Down Expand Up @@ -521,7 +522,7 @@ func validateRanges(ledgerRanges []history.LedgerRange) error {

// ReingestRange runs the ingestion pipeline on the range of ledgers ingesting
// history data only.
func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error {
func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error {
if err := validateRanges(ledgerRanges); err != nil {
return err
}
Expand All @@ -542,10 +543,20 @@ func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool) e
if err != nil {
return err
}
if rebuildTradeAgg {
err = s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence)
if err != nil {
return errors.Wrap(err, "Error rebuilding trade aggregations")
}
}
}
return nil
}

func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error {
return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter)
}

// BuildGenesisState runs the ingestion pipeline on genesis ledger. Transitions
// to stopState when done.
func (s *system) BuildGenesisState() error {
Expand Down
9 changes: 7 additions & 2 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,8 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error {
return args.Error(0)
}

func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool) error {
args := m.Called(ledgerRanges, force)
func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error {
args := m.Called(ledgerRanges, force, rebuildTradeAgg)
return args.Error(0)
}

Expand All @@ -607,6 +607,11 @@ func (m *mockSystem) GetCurrentState() State {
return args.Get(0).(State)
}

func (m *mockSystem) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) error {
args := m.Called(fromLedger, toLedger)
return args.Error(0)
}

func (m *mockSystem) Shutdown() {
m.Called()
}
Expand Down
36 changes: 32 additions & 4 deletions services/horizon/internal/ingest/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingest

import (
"fmt"
"math"
"sync"

"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, rei
case <-stop:
return rangeError{}
case reingestRange := <-reingestJobQueue:
err := s.ReingestRange([]history.LedgerRange{reingestRange}, false)
err := s.ReingestRange([]history.LedgerRange{reingestRange}, false, false)
if err != nil {
return rangeError{
err: err,
Expand All @@ -73,7 +74,24 @@ func (ps *ParallelSystems) runReingestWorker(s System, stop <-chan struct{}, rei
}
}

func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) {
func (ps *ParallelSystems) rebuildTradeAggRanges(ledgerRanges []history.LedgerRange) error {
s, err := ps.systemFactory(ps.config)
if err != nil {
return err
}

for _, cur := range ledgerRanges {
err := s.RebuildTradeAggregationBuckets(cur.StartSequence, cur.EndSequence)
if err != nil {
return errors.Wrapf(err, "Error rebuilding trade aggregations for range start=%v, stop=%v", cur.StartSequence, cur.EndSequence)
}
}
return nil
}

// returns the lowest ledger to start from of all ledgerRanges
func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, stop <-chan struct{}, reingestJobQueue chan<- history.LedgerRange) uint32 {
lowestLedger := uint32(math.MaxUint32)
for _, cur := range ledgerRanges {
for subRangeFrom := cur.StartSequence; subRangeFrom < cur.EndSequence; {
// job queuing
Expand All @@ -83,12 +101,16 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32,
}
select {
case <-stop:
return
return lowestLedger
case reingestJobQueue <- history.LedgerRange{StartSequence: subRangeFrom, EndSequence: subRangeTo}:
}
if subRangeFrom < lowestLedger {
lowestLedger = subRangeFrom
}
subRangeFrom = subRangeTo + 1
}
}
return lowestLedger
}

func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 {
Expand Down Expand Up @@ -166,7 +188,7 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat
}()
}

enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue)
lowestLedger := enqueueReingestTasks(ledgerRanges, batchSize, stop, reingestJobQueue)

stopOnce.Do(func() {
close(stop)
Expand All @@ -176,7 +198,13 @@ func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, bat

if lowestRangeErr != nil {
lastLedger := ledgerRanges[len(ledgerRanges)-1].EndSequence
if err := ps.rebuildTradeAggRanges([]history.LedgerRange{{StartSequence: lowestLedger, EndSequence: lowestRangeErr.ledgerRange.StartSequence}}); err != nil {
log.WithError(err).Errorf("error when trying to rebuild trade agg for partially completed portion of overall parallel reingestion range, start=%v, stop=%v", lowestLedger, lowestRangeErr.ledgerRange.StartSequence)
}
return errors.Wrapf(lowestRangeErr, "job failed, recommended restart range: [%d, %d]", lowestRangeErr.ledgerRange.StartSequence, lastLedger)
}
if err := ps.rebuildTradeAggRanges(ledgerRanges); err != nil {
return err
}
return nil
}
Loading

0 comments on commit 33bf9b6

Please sign in to comment.