Skip to content

Commit

Permalink
#5256: use existing ingestion state machine error trap to emit new er…
Browse files Browse the repository at this point in the history
…ror counting metrics, per review feedback
  • Loading branch information
sreuland committed May 9, 2024
1 parent 14a2085 commit 3319801
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 109 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/spf13/viper v1.17.0
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2
github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8
github.com/xdrpp/goxdr v0.1.1
google.golang.org/api v0.170.0
Expand Down Expand Up @@ -136,7 +136,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20151027082146-e0fe6f683076 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,18 @@ github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/g
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0=
github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tyler-smith/go-bip39 v0.0.0-20180618194314-52158e4697b8 h1:g3yQGZK+G6dfF/mw/SOwsTMzUVkpT4hB8pHxpbTXkKw=
Expand Down
44 changes: 26 additions & 18 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingest
import (
"context"
"fmt"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -47,6 +46,32 @@ const (
ReingestHistoryRange
)

// provide a name represention for a state
func (state State) Name() string {
switch state {
case Start:
return "start"
case Stop:
return "stop"
case Build:
return "build"
case Resume:
return "resume"
case WaitForCheckpoint:
return "waitforcheckpoint"
case StressTest:
return "stresstest"
case VerifyRange:
return "verifyrange"
case HistoryRange:
return "historyrange"
case ReingestHistoryRange:
return "reingesthistoryrange"
default:
return "none"
}
}

type stateMachineNode interface {
run(*system) (transition, error)
String() string
Expand Down Expand Up @@ -407,18 +432,6 @@ func (resumeState) GetState() State {
}

func (r resumeState) run(s *system) (transitionResult transition, errorResult error) {
defer func() {
if errorResult != nil {
// capture any restarts that are being triggered by the state
switch reflect.TypeOf(transitionResult.node) {
case (reflect.TypeFor[startState]()):
r.incrementRestartMetric(s, "start")
case (reflect.TypeFor[resumeState]()):
r.incrementRestartMetric(s, "retry")
}
}
}()

if r.latestSuccessfullyProcessedLedger == 0 {
return start(), errors.New("unexpected latestSuccessfullyProcessedLedger value")
}
Expand Down Expand Up @@ -587,11 +600,6 @@ func (r resumeState) addProcessorDurationsMetricFromMap(s *system, m map[string]
}
}

func (r resumeState) incrementRestartMetric(s *system, restartType string) {
s.Metrics().IngestionErrorRestartCounter.
With(prometheus.Labels{"type": restartType}).Inc()
}

func (r resumeState) addLoaderDurationsMetricFromMap(s *system, m map[string]time.Duration) {
for loaderName, value := range m {
s.Metrics().LoadersRunDurationSummary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (s *ReingestHistoryRangeStateTestSuite) SetupTest() {
}
s.historyQ.On("GetTx").Return(nil).Once()
s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.BoundedRange(100, 200)).Return(nil).Once()
s.system.initMetrics()
}

func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() {
Expand Down
28 changes: 15 additions & 13 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ type Metrics struct {
// ArchiveRequestCounter counts how many http requests are sent to history server
HistoryArchiveStatsCounter *prometheus.CounterVec

// IngestionErrorRestartCounter counts the number of times the live/forward ingestion state machine
// initiates a restart or retry.
IngestionErrorRestartCounter *prometheus.CounterVec
// IngestionErrorCounter counts the number of times the live/forward ingestion state machine
// encounters an error condition.
IngestionErrorCounter *prometheus.CounterVec
}

type System interface {
Expand Down Expand Up @@ -448,17 +448,14 @@ func (s *system) initMetrics() {
[]string{"source", "type"},
)

s.metrics.IngestionErrorRestartCounter = prometheus.NewCounterVec(
s.metrics.IngestionErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "error_restarts",
Help: "Counters of the number of times the live/forward ingestion state machine initiates a restart. " +
"when 'type' label is 'start' means some aspect of ledger order is out of sync between data from " +
"captive core meta pipe and horizon's db, restarting to see if condition resolves. " +
"when 'type' label is 'retry' means ingestion is getting an unexpected error while " +
"processing network ledger data which it can't resolve. If this metric is constantly increasing, " +
"it means ingestion is stuck in a retry loop on an error it can't resolve, effectively halted.",
Namespace: "horizon", Subsystem: "ingest", Name: "errors_total",
Help: "Counters of the number of times the live/forward ingestion state machine encountered an error. " +
"'current_state' label has the name of the state that error occurred. " +
"'next_state' label has the name of the next state requested from the current_state.",
},
[]string{"type"},
[]string{"current_state", "next_state"},
)
}

Expand Down Expand Up @@ -488,7 +485,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LoadersStatsSummary)
registry.MustRegister(s.metrics.StateVerifyLedgerEntriesCount)
registry.MustRegister(s.metrics.HistoryArchiveStatsCounter)
registry.MustRegister(s.metrics.IngestionErrorRestartCounter)
registry.MustRegister(s.metrics.IngestionErrorCounter)
s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon")
}

Expand Down Expand Up @@ -661,6 +658,11 @@ func (s *system) runStateMachine(cur stateMachineNode) error {
// so we log these errors using the info log level
logger.Info("Error in ingestion state machine")
} else {
s.Metrics().IngestionErrorCounter.
With(prometheus.Labels{
"current_state": cur.GetState().Name(),
"next_state": next.node.GetState().Name(),
}).Inc()
logger.Error("Error in ingestion state machine")
}
}
Expand Down
104 changes: 102 additions & 2 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"database/sql"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -113,10 +114,13 @@ func TestStateMachineRunReturnsUnexpectedTransaction(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()

assert.PanicsWithValue(t, "unexpected transaction", func() {
defer func() {
assertErrorRestartMetrics(reg, "", "", 0, t)
}()
system.Run()
})
}
Expand All @@ -127,12 +131,17 @@ func TestStateMachineTransition(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()
historyQ.On("Begin", mock.Anything).Return(errors.New("my error")).Once()
historyQ.On("GetTx").Return(&sqlx.Tx{}).Once()

assert.PanicsWithValue(t, "unexpected transaction", func() {
defer func() {
// the test triggers error in the first start state exec, so metric is added
assertErrorRestartMetrics(reg, "start", "start", 1, t)
}()
system.Run()
})
}
Expand All @@ -144,12 +153,14 @@ func TestContextCancel(t *testing.T) {
historyQ: historyQ,
ctx: ctx,
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()
historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(errors.New("my error")).Once()
historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once()

Check failure on line 159 in services/horizon/internal/ingest/main_test.go

View workflow job for this annotation

GitHub Actions / golangci

historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)

cancel()
assert.NoError(t, system.runStateMachine(startState{}))
assertErrorRestartMetrics(reg, "", "", 0, t)
}

// TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError checks if the
Expand All @@ -162,12 +173,61 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing.
ctx: context.Background(),
historyQ: historyQ,
}
reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil).Once()

err := system.runStateMachine(verifyRangeState{})
assert.Error(t, err)
assert.EqualError(t, err, "invalid range: [0, 0]")
assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t)
}

func TestStateMachineRestartEmitsMetric(t *testing.T) {
historyQ := &mockDBQ{}
ledgerBackend := &mockLedgerBackend{}
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

system := &system{
ctx: ctx,
historyQ: historyQ,
ledgerBackend: ledgerBackend,
}

ledgerBackend.On("IsPrepared", system.ctx, ledgerbackend.UnboundedRange(101)).Return(true, nil)

Check failure on line 202 in services/horizon/internal/ingest/main_test.go

View workflow job for this annotation

GitHub Actions / golangci

ledgerBackend.On undefined (type *mockLedgerBackend has no field or method On) (typecheck)
ledgerBackend.On("GetLedger", system.ctx, uint32(101)).Return(xdr.LedgerCloseMeta{

Check failure on line 203 in services/horizon/internal/ingest/main_test.go

View workflow job for this annotation

GitHub Actions / golangci

ledgerBackend.On undefined (type *mockLedgerBackend has no field or method On) (typecheck)
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 101,
LedgerVersion: xdr.Uint32(MaxSupportedProtocolVersion),
BucketListHash: xdr.Hash{1, 2, 3},
},
},
},
}, nil)

reg := setupMetrics(system)

historyQ.On("GetTx").Return(nil)

Check failure on line 217 in services/horizon/internal/ingest/main_test.go

View workflow job for this annotation

GitHub Actions / golangci

historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)
historyQ.On("Begin", system.ctx).Return(errors.New("stop state machine"))

Check failure on line 218 in services/horizon/internal/ingest/main_test.go

View workflow job for this annotation

GitHub Actions / golangci

historyQ.On undefined (type *mockDBQ has no field or method On) (typecheck)

wg.Add(1)
go func() {
defer wg.Done()
system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100})
}()

assert.EventuallyWithT(t, func(c *assert.CollectT) {
// this checks every 50ms up to 10s total, for at least 3 fsm retries based on a db Begin error
// this condition should be met as the fsm retries every second.
assertErrorRestartMetrics(reg, "resume", "resume", 3, c)
}, 10*time.Second, 50*time.Millisecond, "horizon_ingest_errors_total metric was not incremented on a fsm error")
}

func TestMaybeVerifyStateGetExpStateInvalidError(t *testing.T) {
Expand Down Expand Up @@ -248,6 +308,7 @@ func TestCurrentStateRaceCondition(t *testing.T) {
historyQ: historyQ,
ctx: context.Background(),
}
reg := setupMetrics(s)

historyQ.On("GetTx").Return(nil)
historyQ.On("Begin", s.ctx).Return(nil)
Expand Down Expand Up @@ -280,6 +341,45 @@ loop:
}
close(getCh)
<-doneCh
assertErrorRestartMetrics(reg, "", "", 0, t)
}

func setupMetrics(system *system) *prometheus.Registry {
registry := prometheus.NewRegistry()
system.initMetrics()
registry.Register(system.Metrics().IngestionErrorCounter)
return registry
}

func assertErrorRestartMetrics(reg *prometheus.Registry, assertCurrentState string, assertNextState string, assertRestartCount float64, t assert.TestingT) {
assert := assert.New(t)
metrics, err := reg.Gather()
assert.NoError(err)

for _, metricFamily := range metrics {
if metricFamily.GetName() == "horizon_ingest_errors_total" {
assert.Len(metricFamily.GetMetric(), 1)
assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), assertRestartCount)
var metricCurrentState = ""
var metricNextState = ""
for _, label := range metricFamily.GetMetric()[0].GetLabel() {
if label.GetName() == "current_state" {
metricCurrentState = label.GetValue()
}
if label.GetName() == "next_state" {
metricNextState = label.GetValue()
}
}

assert.Equal(metricCurrentState, assertCurrentState)
assert.Equal(metricNextState, assertNextState)
return
}
}

if assertRestartCount > 0.0 {
assert.Fail("horizon_ingest_errors_total metrics were not correct")
}
}

type mockDBQ struct {
Expand Down
Loading

0 comments on commit 3319801

Please sign in to comment.