Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/ingest: added 'horizon_ingest_errors_total' metric key #5302

Merged
merged 7 commits into from
May 10, 2024
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/spf13/viper v1.17.0
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2
github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible
sreuland marked this conversation as resolved.
Show resolved Hide resolved
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8
github.com/xdrpp/goxdr v0.1.1
google.golang.org/api v0.170.0
Expand Down Expand Up @@ -136,7 +136,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,18 @@ github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/g
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0=
github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw=
Expand Down
26 changes: 26 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ const (
ReingestHistoryRange
)

// provide a name represention for a state
func (state State) Name() string {
switch state {
case Start:
return "start"
case Stop:
return "stop"
case Build:
return "build"
case Resume:
return "resume"
case WaitForCheckpoint:
return "waitforcheckpoint"
case StressTest:
return "stresstest"
case VerifyRange:
return "verifyrange"
case HistoryRange:
return "historyrange"
case ReingestHistoryRange:
return "reingesthistoryrange"
default:
return "none"
}
}

type stateMachineNode interface {
run(*system) (transition, error)
String() string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() {
}
s.historyQ.On("GetTx").Return(nil).Once()
s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once()
s.system.initMetrics()
}

func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() {
Expand Down
20 changes: 20 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ type Metrics struct {

// ArchiveRequestCounter counts how many http requests are sent to history server
HistoryArchiveStatsCounter *prometheus.CounterVec

// IngestionErrorCounter counts the number of times the live/forward ingestion state machine
// encounters an error condition.
IngestionErrorCounter *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -443,6 +447,16 @@ func (s *system) initMetrics() {
},
[]string{"source", "type"},
)

s.metrics.IngestionErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "errors_total",
Help: "Counters of the number of times the live/forward ingestion state machine encountered an error. " +
"'current_state' label has the name of the state where the error occurred. " +
"'next_state' label has the name of the next state requested from the current_state.",
},
[]string{"current_state", "next_state"},
)
}

func (s *system) GetCurrentState() State {
Expand Down Expand Up @@ -471,6 +485,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LoadersStatsSummary)
registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount)
registry.MustRegister(s.metrics.HistoryArchiveStatsCounter)
registry.MustRegister(s.metrics.IngestionErrorCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
}

Expand Down Expand Up @@ -643,6 +658,11 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
// so we log these errors using the info log level
logger.Info("Error in ingestion state machine")
} else {
s.Metrics().IngestionErrorCounter.
With(prometheus.Labels{
"current_state": cur.GetState().Name(),
"next_state": next.node.GetState().Name(),
}).Inc()
logger.Error("Error in ingestion state machine")
}
}
Expand Down
104 changes: 102 additions & 2 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"database/sql"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -113,10 +114,13 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()

assert.PanicsWithValue(t, "unexpected transaction", func() {
defer func() {
assertErrorRestartMetrics(reg, "", "", 0, t)
}()
system.Run()
})
}
Expand All @@ -127,12 +131,17 @@ func TestStateMachineTransition(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()
historyQ.On("Begin", mock.Anything).Return(errors.New("my error")).Once()
historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()

assert.PanicsWithValue(t, "unexpected transaction", func() {
defer func() {
// the test triggers error in the first start state exec, so metric is added
assertErrorRestartMetrics(reg, "start", "start", 1, t)
}()
system.Run()
})
}
Expand All @@ -144,12 +153,14 @@ func TestContextCancel(t *testing.T) {
historyQ: historyQ,
ctx: ctx,
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()
historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(errors.New("my error")).Once()
historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once()

cancel()
assert.NoError(t, system.runStateMachine(startState{}))
assertErrorRestartMetrics(reg, "", "", 0, t)
}

// TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError checks if the
Expand All @@ -162,12 +173,61 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.
ctx: context.Background(),
historyQ: historyQ,
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()

err := system.runStateMachine(verifyRangeState{})
assert.Error(t, err)
assert.EqualError(t, err, "invalid range: [0, 0]")
assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t)
}

func TestStateMachineRestartEmitsMetric(t *testing.T) {
historyQ := &mockDBQ{}
ledgerBackend := &mockLedgerBackend{}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

system := &system{
ctx: ctx,
historyQ: historyQ,
ledgerBackend: ledgerBackend,
}

ledgerBackend.On("IsPrepared", system.ctx, ledgerbackend.UnboundedRange(101)).Return(true, nil)
ledgerBackend.On("GetLedger", system.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 101,
LedgerVersion: xdr.Uint32(MaxSupportedProtocolVersion),
BucketListHash: xdr.Hash{1, 2, 3},
},
},
},
}, nil)

reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil)
historyQ.On("Begin", system.ctx).Return(errors.New("stop state machine"))

wg.Add(1)
go func() {
defer wg.Done()
system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100})
}()

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// this checks every 50ms up to 10s total, for at least 3 fsm retries based on a db Begin error
// this condition should be met as the fsm retries every second.
assertErrorRestartMetrics(reg, "resume", "resume", 3, c)
}, 10*time.Second, 50*time.Millisecond, "horizon_ingest_errors_total metric was not incremented on a fsm error")
}

func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) {
Expand Down Expand Up @@ -248,6 +308,7 @@ func TestCurrentStateRaceCondition(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(s)

historyQ.On("GetTx").Return(nil)
historyQ.On("Begin", s.ctx).Return(nil)
Expand Down Expand Up @@ -280,6 +341,45 @@ loop:
}
close(getCh)
<-doneCh
assertErrorRestartMetrics(reg, "", "", 0, t)
}

func setupMetrics(system *system) *prometheus.Registry {
registry := prometheus.NewRegistry()
system.initMetrics()
registry.Register(system.Metrics().IngestionErrorCounter)
return registry
}

func assertErrorRestartMetrics(reg *prometheus.Registry, assertCurrentState string, assertNextState string, assertRestartCount float64, t assert.TestingT) {
assert := assert.New(t)
metrics, err := reg.Gather()
assert.NoError(err)

for _, metricFamily := range metrics {
if metricFamily.GetName() == "horizon_ingest_errors_total" {
assert.Len(metricFamily.GetMetric(), 1)
assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount)
var metricCurrentState = ""
var metricNextState = ""
for _, label := range metricFamily.GetMetric()[0].GetLabel() {
if label.GetName() == "current_state" {
metricCurrentState = label.GetValue()
}
if label.GetName() == "next_state" {
metricNextState = label.GetValue()
}
}

assert.Equal(metricCurrentState, assertCurrentState)
assert.Equal(metricNextState, assertNextState)
return
}
}

if assertRestartCount > 0.0 {
assert.Fail("horizon_ingest_errors_total metrics were not correct")
}
}

type mockDBQ struct {
Expand Down
Loading