Skip to content

Commit

Permalink
services/horizon/internal/ingest: Add metrics for ingestion loaders (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Feb 21, 2024
1 parent 84c21d0 commit b0d394e
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 38 deletions.
22 changes: 22 additions & 0 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type AccountLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

var errSealed = errors.New("cannot register more entries to loader after calling Exec()")
Expand All @@ -49,6 +50,7 @@ func NewAccountLoader() *AccountLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -99,6 +101,14 @@ func (a *AccountLoader) lookupKeys(ctx context.Context, q *Q, addresses []string
return nil
}

// LoaderStats describes the result of executing a history lookup id loader
type LoaderStats struct {
// Total is the number of elements registered to the loader
Total int
// Inserted is the number of elements inserted into the lookup table
Inserted int
}

// Exec will look up all the history account ids for the addresses registered in the loader.
// If there are no history account ids for a given set of addresses, Exec will insert rows
// into the history_accounts table to establish a mapping between address and history account id.
Expand All @@ -116,6 +126,7 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
if err := a.lookupKeys(ctx, q, addresses); err != nil {
return err
}
a.stats.Total += len(addresses)

insert := 0
for _, address := range addresses {
Expand Down Expand Up @@ -149,10 +160,21 @@ func (a *AccountLoader) Exec(ctx context.Context, session db.SessionInterface) e
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, addresses)
}

// Stats returns the number of addresses registered in the loader and the number of addresses
// inserted into the history_accounts table.
func (a *AccountLoader) Stats() LoaderStats {
return a.stats
}

func (a *AccountLoader) Name() string {
return "AccountLoader"
}

type bulkInsertField struct {
name string
dbType string
Expand Down
12 changes: 9 additions & 3 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,28 @@ func TestAccountLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(keypair.MustRandom().Address())
})

q := &Q{session}
for _, address := range addresses {
internalId, err := loader.GetNow(address)
var internalId int64
internalId, err = loader.GetNow(address)
assert.NoError(t, err)
var account Account
assert.NoError(t, q.AccountByAddress(context.Background(), &account, address))
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

_, err := loader.GetNow("not present")
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type AssetLoader struct {
sealed bool
set set.Set[AssetKey]
ids map[AssetKey]int64
stats LoaderStats
}

// NewAssetLoader will construct a new AssetLoader instance.
Expand All @@ -68,6 +69,7 @@ func NewAssetLoader() *AssetLoader {
sealed: false,
set: set.Set[AssetKey]{},
ids: map[AssetKey]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -145,6 +147,7 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
if err := a.lookupKeys(ctx, q, keys); err != nil {
return err
}
a.stats.Total += len(keys)

assetTypes := make([]string, 0, len(a.set)-len(a.ids))
assetCodes := make([]string, 0, len(a.set)-len(a.ids))
Expand Down Expand Up @@ -196,10 +199,21 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, keys)
}

// Stats returns the number of assets registered in the loader and the number of assets
// inserted into the history_assets table.
func (a *AssetLoader) Stats() LoaderStats {
return a.stats
}

func (a *AssetLoader) Name() string {
return "AssetLoader"
}

// AssetLoaderStub is a stub wrapper around AssetLoader which allows
// you to manually configure the mapping of assets to history asset ids
type AssetLoaderStub struct {
Expand Down
15 changes: 11 additions & 4 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,34 @@ func TestAssetLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture(AssetKey{Type: "invalid"})
})

q := &Q{session}
for _, key := range keys {
internalID, err := loader.GetNow(key)
var internalID int64
internalID, err = loader.GetNow(key)
assert.NoError(t, err)
var assetXDR xdr.Asset
if key.Type == "native" {
assetXDR = xdr.MustNewNativeAsset()
} else {
assetXDR = xdr.MustNewCreditAsset(key.Code, key.Issuer)
}
assetID, err := q.GetAssetID(context.Background(), assetXDR)
var assetID int64
assetID, err = q.GetAssetID(context.Background(), assetXDR)
assert.NoError(t, err)
assert.Equal(t, assetID, internalID)
}

_, err := loader.GetNow(AssetKey{})
_, err = loader.GetNow(AssetKey{})
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/claimable_balance_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ClaimableBalanceLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

// NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance.
Expand All @@ -42,6 +43,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)

insert := 0
for _, id := range ids {
Expand Down Expand Up @@ -142,6 +145,17 @@ func (a *ClaimableBalanceLoader) Exec(ctx context.Context, session db.SessionInt
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, ids)
}

// Stats returns the number of claimable balances registered in the loader and the number of claimable balances
// inserted into the history_claimable_balances table.
func (a *ClaimableBalanceLoader) Stats() LoaderStats {
return a.stats
}

func (a *ClaimableBalanceLoader) Name() string {
return "ClaimableBalanceLoader"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"database/sql/driver"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -39,24 +40,31 @@ func TestClaimableBalanceLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture("not-present")
})

q := &Q{session}
for i, id := range ids {
future := futures[i]
internalID, err := future.Value()
var internalID driver.Value
internalID, err = future.Value()
assert.NoError(t, err)
cb, err := q.ClaimableBalanceByID(context.Background(), id)
var cb HistoryClaimableBalance
cb, err = q.ClaimableBalanceByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, cb.BalanceID, id)
assert.Equal(t, cb.InternalID, internalID)
}

futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader}
_, err := futureCb.Value()
_, err = futureCb.Value()
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
14 changes: 14 additions & 0 deletions services/horizon/internal/db2/history/liquidity_pool_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LiquidityPoolLoader struct {
sealed bool
set set.Set[string]
ids map[string]int64
stats LoaderStats
}

// NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance.
Expand All @@ -42,6 +43,7 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader {
sealed: false,
set: set.Set[string]{},
ids: map[string]int64{},
stats: LoaderStats{},
}
}

Expand Down Expand Up @@ -109,6 +111,7 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
if err := a.lookupKeys(ctx, q, ids); err != nil {
return err
}
a.stats.Total += len(ids)

insert := 0
for _, id := range ids {
Expand Down Expand Up @@ -142,10 +145,21 @@ func (a *LiquidityPoolLoader) Exec(ctx context.Context, session db.SessionInterf
if err != nil {
return err
}
a.stats.Inserted += insert

return a.lookupKeys(ctx, q, ids)
}

// Stats returns the number of liquidity pools registered in the loader and the number of liquidity pools
// inserted into the history_liquidity_pools table.
func (a *LiquidityPoolLoader) Stats() LoaderStats {
return a.stats
}

func (a *LiquidityPoolLoader) Name() string {
return "LiquidityPoolLoader"
}

// LiquidityPoolLoaderStub is a stub wrapper around LiquidityPoolLoader which allows
// you to manually configure the mapping of liquidity pools to history liquidity ppol ids
type LiquidityPoolLoaderStub struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ func TestLiquidityPoolLoader(t *testing.T) {
assert.Equal(t, future, duplicateFuture)
}

assert.NoError(t, loader.Exec(context.Background(), session))
err := loader.Exec(context.Background(), session)
assert.NoError(t, err)
assert.Equal(t, LoaderStats{
Total: 100,
Inserted: 100,
}, loader.Stats())
assert.Panics(t, func() {
loader.GetFuture("not-present")
})

q := &Q{session}
for _, id := range ids {
internalID, err := loader.GetNow(id)
var internalID int64
internalID, err = loader.GetNow(id)
assert.NoError(t, err)
lp, err := q.LiquidityPoolByID(context.Background(), id)
var lp HistoryLiquidityPool
lp, err = q.LiquidityPoolByID(context.Background(), id)
assert.NoError(t, err)
assert.Equal(t, lp.PoolID, id)
assert.Equal(t, lp.InternalID, internalID)
}

_, err := loader.GetNow("not present")
_, err = loader.GetNow("not present")
assert.Error(t, err)
assert.Contains(t, err.Error(), `was not found`)
}
27 changes: 27 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
Expand Down Expand Up @@ -523,6 +524,8 @@ func (r resumeState) run(s *system) (transition, error) {
tradeStatsMap := stats.tradeStats.Map()
r.addLedgerStatsMetricFromMap(s, "trades", tradeStatsMap)
r.addProcessorDurationsMetricFromMap(s, stats.transactionDurations)
r.addLoaderDurationsMetricFromMap(s, stats.transactionDurations)
r.addLoaderStatsMetric(s, stats.loaderStats)

// since a single system instance is shared throughout all states,
// this will sweep up increments to history archive counters
Expand Down Expand Up @@ -573,6 +576,30 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for loaderName, value := range m {
s.Metrics().LoadersRunDurationSummary.
With(prometheus.Labels{"name": loaderName}).Observe(value.Seconds())
}
}

func (r resumeState) addLoaderStatsMetric(s *system, loaderSTats map[string]history.LoaderStats) {
for loaderName, stats := range loaderSTats {
s.Metrics().LoadersStatsSummary.
With(prometheus.Labels{
"name": loaderName,
"stat": "total_queried",
}).
Observe(float64(stats.Total))
s.Metrics().LoadersStatsSummary.
With(prometheus.Labels{
"name": loaderName,
"stat": "total_inserted",
}).
Observe(float64(stats.Inserted))
}
}

func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStats) {
for _, historyServerStat := range stats {
s.Metrics().HistoryArchiveStatsCounter.
Expand Down
Loading

0 comments on commit b0d394e

Please sign in to comment.