Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest: Fix transaction processor metrics #5216

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func NewAccountFilter() AccountFilter {
}
}

func (filter *accountFilter) Name() string {
return "filters.accountFilter"
}

func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
// only need to re-initialize the filter config state(rules) if its cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func NewAssetFilter() AssetFilter {
}
}

func (filter *assetFilter) Name() string {
return "filters.assetFilter"
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
Expand Down
2 changes: 0 additions & 2 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ func (r resumeState) addLedgerStatsMetricFromMap(s *system, prefix string, m map

func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for processorName, value := range m {
// * is not accepted in Prometheus labels
processorName = strings.Replace(processorName, "*", "", -1)
s.Metrics().ProcessorsRunDuration.
With(prometheus.Labels{"name": processorName}).Add(value.Seconds())
s.Metrics().ProcessorsRunDurationSummary.
Expand Down
22 changes: 13 additions & 9 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ingest

import (
"context"
"fmt"
"time"

"github.com/stellar/go/ingest"
Expand Down Expand Up @@ -31,13 +30,17 @@ func newGroupChangeProcessors(processors []horizonChangeProcessor) *groupChangeP
}
}

func (g groupChangeProcessors) Name() string {
return "groupChangeProcessors"
}

func (g groupChangeProcessors) ProcessChange(ctx context.Context, change ingest.Change) error {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessChange(ctx, change); err != nil {
return errors.Wrapf(err, "error in %T.ProcessChange", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand All @@ -48,7 +51,7 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error {
if err := p.Commit(ctx); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand Down Expand Up @@ -95,7 +98,7 @@ func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta,
if err := p.ProcessTransaction(lcm, tx); err != nil {
return errors.Wrapf(err, "error in %T.ProcessTransaction", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand All @@ -110,9 +113,6 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio
}
name := loader.Name()
g.loaderRunDurations.AddRunDuration(name, startTime)
if _, ok := g.loaderStats[name]; ok {
return fmt.Errorf("%s is present multiple times", name)
}
g.loaderStats[name] = loader.Stats()
}

Expand All @@ -123,7 +123,7 @@ func (g groupTransactionProcessors) Flush(ctx context.Context, session db.Sessio
if err := p.Flush(ctx, session); err != nil {
return errors.Wrapf(err, "error in %T.Flush", p)
}
g.processorsRunDurations.AddRunDuration(fmt.Sprintf("%T", p), startTime)
g.processorsRunDurations.AddRunDuration(p.Name(), startTime)
}
return nil
}
Expand Down Expand Up @@ -153,14 +153,18 @@ func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilter
}
}

func (g *groupTransactionFilterers) Name() string {
return "groupTransactionFilterers"
}

func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) {
for _, f := range g.filterers {
startTime := time.Now()
include, err := f.FilterTransaction(ctx, tx)
if err != nil {
return false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
}
g.AddRunDuration(fmt.Sprintf("%T", f), startTime)
g.AddRunDuration(f.Name(), startTime)
if !include {
// filter out, we can return early
g.droppedTransactions++
Expand Down
8 changes: 8 additions & 0 deletions services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type mockHorizonChangeProcessor struct {
mock.Mock
}

func (m *mockHorizonChangeProcessor) Name() string {
return "mockHorizonChangeProcessor"
}

func (m *mockHorizonChangeProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
args := m.Called(ctx, change)
return args.Error(0)
Expand All @@ -39,6 +43,10 @@ type mockHorizonTransactionProcessor struct {
mock.Mock
}

func (m *mockHorizonTransactionProcessor) Name() string {
return "mockHorizonTransactionProcessor"
}

func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
args := m.Called(lcm, transaction)
return args.Error(0)
Expand Down
18 changes: 0 additions & 18 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
Expand Down Expand Up @@ -529,23 +528,6 @@ func (m *mockProcessorsRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMe
args.Error(1)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
processors.StatsLedgerTransactionProcessorResults,
runDurations,
processors.TradeStats,
runDurations,
map[string]history.LoaderStats,
error,
) {
args := m.Called(ledger)
return args.Get(0).(processors.StatsLedgerTransactionProcessorResults),
args.Get(1).(runDurations),
args.Get(2).(processors.TradeStats),
args.Get(3).(runDurations),
args.Get(4).(map[string]history.LoaderStats),
args.Error(3)
}

func (m *mockProcessorsRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error {
args := m.Called(ledgers)
return args.Error(0)
Expand Down
101 changes: 91 additions & 10 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ const (

type horizonChangeProcessor interface {
processors.ChangeProcessor
Name() string
Commit(context.Context) error
}

type horizonTransactionProcessor interface {
Name() string
processors.LedgerTransactionProcessor
}

Expand All @@ -45,6 +47,10 @@ type statsChangeProcessor struct {
*ingest.StatsChangeProcessor
}

func (statsChangeProcessor) Name() string {
return "ingest.statsChangeProcessor"
}

func (statsChangeProcessor) Commit(ctx context.Context) error {
return nil
}
Expand All @@ -70,14 +76,6 @@ type ProcessorRunnerInterface interface {
ledgerProtocolVersion uint32,
bucketListHash xdr.Hash,
) (ingest.StatsChangeProcessorResults, error)
RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
transactionStats processors.StatsLedgerTransactionProcessorResults,
transactionDurations runDurations,
tradeStats processors.TradeStats,
loaderDurations runDurations,
loaderStats map[string]history.LoaderStats,
err error,
)
RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) error
RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
stats ledgerStats,
Expand Down Expand Up @@ -243,6 +241,13 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
s.config.NetworkPassphrase,
)

if err := registerChangeProcessors(
nameRegistry{},
changeProcessor,
); err != nil {
return ingest.StatsChangeProcessorResults{}, err
}

if checkpointLedger == 1 {
if err := changeProcessor.ProcessChange(s.ctx, ingest.GenesisChange(s.config.NetworkPassphrase)); err != nil {
return changeStats.GetResults(), errors.Wrap(err, "Error ingesting genesis ledger")
Expand Down Expand Up @@ -364,7 +369,7 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta,
return nil
}

func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta) (
transactionStats processors.StatsLedgerTransactionProcessorResults,
transactionDurations runDurations,
tradeStats processors.TradeStats,
Expand All @@ -380,6 +385,15 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor)

if err = registerTransactionProcessors(
registry,
groupTransactionFilterers,
groupFilteredOutProcessors,
groupTransactionProcessors,
); err != nil {
return
}

err = s.streamLedger(ledger,
groupTransactionFilterers,
groupFilteredOutProcessors,
Expand Down Expand Up @@ -413,6 +427,64 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
return
}

// nameRegistry ensures all ingestion components have a unique name
// for metrics reporting
type nameRegistry map[string]struct{}

func (n nameRegistry) add(name string) error {
if _, ok := n[name]; ok {
return fmt.Errorf("%s is duplicated", name)
}
n[name] = struct{}{}
return nil
}

func registerChangeProcessors(
registry nameRegistry,
group *groupChangeProcessors,
) error {
for _, p := range group.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
return nil
}

func registerTransactionProcessors(
registry nameRegistry,
groupTransactionFilterers *groupTransactionFilterers,
groupFilteredOutProcessors *groupTransactionProcessors,
groupTransactionProcessors *groupTransactionProcessors,
) error {
for _, f := range groupTransactionFilterers.filterers {
if err := registry.add(f.Name()); err != nil {
return err
}
}
for _, p := range groupTransactionProcessors.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
for _, l := range groupTransactionProcessors.lazyLoaders {
if err := registry.add(l.Name()); err != nil {
return err
}
}
for _, p := range groupFilteredOutProcessors.processors {
if err := registry.add(p.Name()); err != nil {
return err
}
}
for _, l := range groupFilteredOutProcessors.lazyLoaders {
if err := registry.add(l.Name()); err != nil {
return err
}
}
return nil
}

func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta) (err error) {
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion)

Expand Down Expand Up @@ -504,12 +576,21 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
)

registry := nameRegistry{}
if err = registerChangeProcessors(
registry,
groupChangeProcessors,
); err != nil {
return
}

err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
return
}

transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.RunTransactionProcessorsOnLedger(ledger)
transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger)

stats.changeStats = changeStatsProcessor.GetResults()
stats.changeDurations = groupChangeProcessors.processorsRunDurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (p *AccountDataProcessor) reset() {
p.batchInsertBuilder = p.dataQ.NewAccountDataBatchInsertBuilder()
}

func (p *AccountDataProcessor) Name() string {
return "processors.AccountDataProcessor"
}

func (p *AccountDataProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
// We're interested in data only
if change.Type != xdr.LedgerEntryTypeData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/guregu/null/zero"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
Expand All @@ -28,6 +29,10 @@ func (p *AccountsProcessor) reset() {
p.batchInsertBuilder = p.accountsQ.NewAccountsBatchInsertBuilder()
}

func (p *AccountsProcessor) Name() string {
return "processors.AccountsProcessor"
}

func (p *AccountsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeAccount {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewAssetStatsProcessor(
return p
}

func (p *AssetStatsProcessor) Name() string {
return "processors.AssetStatsProcessor"
}

func (p *AssetStatsProcessor) ProcessChange(ctx context.Context, change ingest.Change) error {
if change.Type != xdr.LedgerEntryTypeLiquidityPool &&
change.Type != xdr.LedgerEntryTypeClaimableBalance &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewClaimableBalancesChangeProcessor(Q history.QClaimableBalances) *Claimabl
return p
}

func (p *ClaimableBalancesChangeProcessor) Name() string {
return "processors.ClaimableBalancesChangeProcessor"
}

func (p *ClaimableBalancesChangeProcessor) reset() {
p.cache = ingest.NewChangeCompactor()
p.claimantsInsertBuilder = p.qClaimableBalances.NewClaimableBalanceClaimantBatchInsertBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func NewClaimableBalancesTransactionProcessor(
}
}

func (p *ClaimableBalancesTransactionProcessor) Name() string {
return "processors.ClaimableBalancesTransactionProcessor"
}

func (p *ClaimableBalancesTransactionProcessor) ProcessTransaction(
lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction,
) error {
Expand Down
Loading
Loading