Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest: Add metrics for ingestion loaders #5209

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type AccountLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

var errSealed = errors.New("cannot register more entries to loader after calling Exec()")
Expand All @@ -49,6 +50,7 @@ func NewAccountLoader() *AccountLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -99,6 +101,14 @@ func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string
return nil
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

// Exec will look up all the history account ids for the addresses registered in the loader.
// If there are no history account ids for a given set of addresses, Exec will insert rows
// into the history_accounts table to establish a mapping between address and history account id.
Expand All @@ -116,6 +126,7 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
if err := a.lookupKeys(ctx, q, addresses); err != nil {
return err
}
a.stats.Total += len(addresses)

insert := 0
for _, address := range addresses {
Expand Down Expand Up @@ -149,10 +160,21 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, addresses)
}

// Stats returns the number of addresses registered in the loader and the number of addresses
// inserted into the history_accounts table.
func (a *AccountLoader) Stats() LoaderStats {
return a.stats
}

func (a *AccountLoader) Name() string {
return "AccountLoader"
}

type bulkInsertField struct {
name string
dbType string
Expand Down
12 changes: 9 additions & 3 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,28 @@
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
})

q := &Q{session}
for _, address := range addresses {
internalId, err := loader.GetNow(address)
var internalId int64

Check failure on line 46 in services/horizon/internal/db2/history/account_loader_test.go

View workflow job for this annotation

GitHub Actions / golangci

ST1003: var internalId should be internalID (stylecheck)
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type AssetLoader struct {
sealed bool
set set.Set[AssetKey]
ids map[AssetKey]int64
stats LoaderStats
}

// NewAssetLoader will construct a new AssetLoader instance.
Expand All @@ -68,6 +69,7 @@ func NewAssetLoader() *AssetLoader {
sealed: false,
set: set.Set[AssetKey]{},
ids: map[AssetKey]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -145,6 +147,7 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
if err := a.lookupKeys(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)

assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
Expand Down Expand Up @@ -196,10 +199,21 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, keys)
}

// Stats returns the number of assets registered in the loader and the number of assets
// inserted into the history_assets table.
func (a *AssetLoader) Stats() LoaderStats {
return a.stats
}

func (a *AssetLoader) Name() string {
return "AssetLoader"
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
// you to manually configure the mapping of assets to history asset ids
type AssetLoaderStub struct {
Expand Down
15 changes: 11 additions & 4 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,34 @@ func TestAssetLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(AssetKey{Type: "invalid"})
})

q := &Q{session}
for _, key := range keys {
internalID, err := loader.GetNow(key)
var internalID int64
internalID, err = loader.GetNow(key)
assert.NoError(t, err)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
} else {
assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer)
}
assetID, err := q.GetAssetID(context.Background(), assetXDR)
var assetID int64
assetID, err = q.GetAssetID(context.Background(), assetXDR)
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
_, err = loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/claimable_balance_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClaimableBalanceLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

// NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance.
Expand All @@ -42,6 +43,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)

insert := 0
for _, id := range ids {
Expand Down Expand Up @@ -142,6 +145,17 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, ids)
}

// Stats returns the number of claimable balances registered in the loader and the number of claimable balances
// inserted into the history_claimable_balances table.
func (a *ClaimableBalanceLoader) Stats() LoaderStats {
return a.stats
}

func (a *ClaimableBalanceLoader) Name() string {
return "ClaimableBalanceLoader"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"database/sql/driver"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -39,24 +40,31 @@ func TestClaimableBalanceLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture("not-present")
})

q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID, err := future.Value()
var internalID driver.Value
internalID, err = future.Value()
assert.NoError(t, err)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
var cb HistoryClaimableBalance
cb, err = q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
_, err = futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LiquidityPoolLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

// NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance.
Expand All @@ -42,6 +43,7 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)

insert := 0
for _, id := range ids {
Expand Down Expand Up @@ -142,10 +145,21 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, ids)
}

// Stats returns the number of liquidity pools registered in the loader and the number of liquidity pools
// inserted into the history_liquidity_pools table.
func (a *LiquidityPoolLoader) Stats() LoaderStats {
return a.stats
}

func (a *LiquidityPoolLoader) Name() string {
return "LiquidityPoolLoader"
}

// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows
// you to manually configure the mapping of liquidity pools to history liquidity ppol ids
type LiquidityPoolLoaderStub struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ func TestLiquidityPoolLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture("not-present")
})

q := &Q{session}
for _, id := range ids {
internalID, err := loader.GetNow(id)
var internalID int64
internalID, err = loader.GetNow(id)
assert.NoError(t, err)
lp, err := q.LiquidityPoolByID(context.Background(), id)
var lp HistoryLiquidityPool
lp, err = q.LiquidityPoolByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, lp.PoolID, id)
assert.Equal(t, lp.InternalID, internalID)
}

_, err := loader.GetNow("not present")
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
27 changes: 27 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -523,6 +524,8 @@ func (r resumeState) run(s *system) (transition, error) {
tradeStatsMap := stats.tradeStats.Map()
r.addLedgerStatsMetricFromMap(s, "trades", tradeStatsMap)
r.addProcessorDurationsMetricFromMap(s, stats.transactionDurations)
r.addLoaderDurationsMetricFromMap(s, stats.transactionDurations)
r.addLoaderStatsMetric(s, stats.loaderStats)

// since a single system instance is shared throughout all states,
// this will sweep up increments to history archive counters
Expand Down Expand Up @@ -573,6 +576,30 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for loaderName, value := range m {
s.Metrics().LoadersRunDurationSummary.
With(prometheus.Labels{"name": loaderName}).Observe(value.Seconds())
}
}

func (r resumeState) addLoaderStatsMetric(s *system, loaderSTats map[string]history.LoaderStats) {
for loaderName, stats := range loaderSTats {
s.Metrics().LoadersStatsSummary.
With(prometheus.Labels{
"name": loaderName,
"stat": "total_queried",
}).
Observe(float64(stats.Total))
s.Metrics().LoadersStatsSummary.
With(prometheus.Labels{
"name": loaderName,
"stat": "total_inserted",
}).
Observe(float64(stats.Inserted))
}
}

func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.Metrics().HistoryArchiveStatsCounter.
Expand Down
Loading
Loading