Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add metrics to execution state indexer #4801

Merged
merged 1 commit into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess

indexerCore, err := indexer.New(
builder.Logger,
metrics.NewExecutionStateIndexerCollector(),
builder.DB,
builder.Storage.RegisterIndex,
builder.Storage.Headers,
Expand Down
8 changes: 8 additions & 0 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ type ExecutionDataRequesterMetrics interface {
FetchRetried()
}

type ExecutionStateIndexerMetrics interface {
// BlockIndexed records metrics from indexing execution data from a single block.
BlockIndexed(height uint64, duration time.Duration, events, registers, transactionResults int)

// BlockReindexed records that a previously indexed block was indexed again.
BlockReindexed()
}

type RuntimeMetrics interface {
// RuntimeTransactionParsed reports the time spent parsing a single transaction
RuntimeTransactionParsed(dur time.Duration)
Expand Down
90 changes: 90 additions & 0 deletions module/metrics/execution_state_indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package metrics

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/onflow/flow-go/module"
)

var _ module.ExecutionStateIndexerMetrics = (*ExecutionStateIndexerCollector)(nil)

type ExecutionStateIndexerCollector struct {
indexDuration prometheus.Histogram
highestIndexedHeight prometheus.Gauge

indexedEvents prometheus.Counter
indexedRegisters prometheus.Counter
indexedTransactionResults prometheus.Counter
reindexedHeightCount prometheus.Counter
}

func NewExecutionStateIndexerCollector() module.ExecutionStateIndexerMetrics {
indexDuration := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "index_duration_ms",
Help: "the duration of execution state indexing operation",
Buckets: []float64{1, 5, 10, 50, 100},
})

highestIndexedHeight := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "highest_indexed_height",
Help: "highest block height that has been indexed",
})

indexedEvents := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_events",
Help: "number of events indexed",
})

indexedRegisters := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_registers",
Help: "number of registers indexed",
})

indexedTransactionResults := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "indexed_transaction_results",
Help: "number of transaction results indexed",
})

reindexedHeightCount := promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespaceAccess,
Subsystem: subsystemExecutionStateIndexer,
Name: "reindexed_height_count",
Help: "number of times a previously indexed height is reindexed",
})

return &ExecutionStateIndexerCollector{
indexDuration: indexDuration,
highestIndexedHeight: highestIndexedHeight,
indexedEvents: indexedEvents,
indexedRegisters: indexedRegisters,
indexedTransactionResults: indexedTransactionResults,
reindexedHeightCount: reindexedHeightCount,
}
}

// BlockIndexed records metrics from indexing execution data from a single block.
func (c *ExecutionStateIndexerCollector) BlockIndexed(height uint64, duration time.Duration, registers, events, transactionResults int) {
c.indexDuration.Observe(float64(duration.Milliseconds()))
c.highestIndexedHeight.Set(float64(height))
c.indexedEvents.Add(float64(events))
c.indexedRegisters.Add(float64(registers))
c.indexedTransactionResults.Add(float64(transactionResults))
}

// BlockReindexed records that a previously indexed block was indexed again.
func (c *ExecutionStateIndexerCollector) BlockReindexed() {
c.reindexedHeightCount.Inc()
}
1 change: 1 addition & 0 deletions module/metrics/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
subsystemExeDataProvider = "provider"
subsystemExeDataPruner = "pruner"
subsystemExecutionDataRequester = "execution_data_requester"
subsystemExecutionStateIndexer = "execution_state_indexer"
subsystemExeDataBlobstore = "blobstore"
)

Expand Down
114 changes: 59 additions & 55 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,61 +169,60 @@ func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string,
func (nc *NoopCollector) ExecutionBlockCachedPrograms(programs int) {}
func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, _ int, _, _ uint64, _, _ int, _ bool) {
}
func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {}
func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {}
func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {}
func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCount(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {}
func (nc *NoopCollector) LatestTrieRegSize(size uint64) {}
func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {}
func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {}
func (nc *NoopCollector) UpdateCount() {}
func (nc *NoopCollector) ProofSize(bytes uint32) {}
func (nc *NoopCollector) UpdateValuesNumber(number uint64) {}
func (nc *NoopCollector) UpdateValuesSize(byte uint64) {}
func (nc *NoopCollector) UpdateDuration(duration time.Duration) {}
func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ReadValuesNumber(number uint64) {}
func (nc *NoopCollector) ReadValuesSize(byte uint64) {}
func (nc *NoopCollector) ReadDuration(duration time.Duration) {}
func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ExecutionCollectionRequestSent() {}
func (nc *NoopCollector) ExecutionCollectionRequestRetried() {}
func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {}
func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {}
func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {}
func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {}
func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {}
func (nc *NoopCollector) ScriptExecutionResultMismatch() {}
func (nc *NoopCollector) ScriptExecutionResultMatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMismatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMatch() {}
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionSubmissionFailed() {}
func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {}
func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {}
func (nc *NoopCollector) ChunkDataPackRequestProcessed() {}
func (nc *NoopCollector) ExecutionSync(syncing bool) {}
func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {}
func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {}
func (nc *NoopCollector) ExecutionComputationResultUploaded() {}
func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {}
func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {}
func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {}
func (nc *NoopCollector) AddBlobsFailed() {}
func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {}
func (nc *NoopCollector) ReceiptSkipped() {}
func (nc *NoopCollector) RequestSucceeded(blockHeight uint64, duration time.Duration, totalSize uint64, numberOfAttempts int) {
}
func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {}
func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {}
func (nc *NoopCollector) ForestApproxMemorySize(bytes uint64) {}
func (nc *NoopCollector) ForestNumberOfTrees(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCount(number uint64) {}
func (nc *NoopCollector) LatestTrieRegCountDiff(number int64) {}
func (nc *NoopCollector) LatestTrieRegSize(size uint64) {}
func (nc *NoopCollector) LatestTrieRegSizeDiff(size int64) {}
func (nc *NoopCollector) LatestTrieMaxDepthTouched(maxDepth uint16) {}
func (nc *NoopCollector) UpdateCount() {}
func (nc *NoopCollector) ProofSize(bytes uint32) {}
func (nc *NoopCollector) UpdateValuesNumber(number uint64) {}
func (nc *NoopCollector) UpdateValuesSize(byte uint64) {}
func (nc *NoopCollector) UpdateDuration(duration time.Duration) {}
func (nc *NoopCollector) UpdateDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ReadValuesNumber(number uint64) {}
func (nc *NoopCollector) ReadValuesSize(byte uint64) {}
func (nc *NoopCollector) ReadDuration(duration time.Duration) {}
func (nc *NoopCollector) ReadDurationPerItem(duration time.Duration) {}
func (nc *NoopCollector) ExecutionCollectionRequestSent() {}
func (nc *NoopCollector) ExecutionCollectionRequestRetried() {}
func (nc *NoopCollector) RuntimeTransactionParsed(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionChecked(dur time.Duration) {}
func (nc *NoopCollector) RuntimeTransactionInterpreted(dur time.Duration) {}
func (nc *NoopCollector) RuntimeSetNumberOfAccounts(count uint64) {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheMiss() {}
func (nc *NoopCollector) RuntimeTransactionProgramsCacheHit() {}
func (nc *NoopCollector) ScriptExecuted(dur time.Duration, size int) {}
func (nc *NoopCollector) ScriptExecutionErrorOnArchiveNode() {}
func (nc *NoopCollector) ScriptExecutionErrorOnExecutionNode() {}
func (nc *NoopCollector) ScriptExecutionResultMismatch() {}
func (nc *NoopCollector) ScriptExecutionResultMatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMismatch() {}
func (nc *NoopCollector) ScriptExecutionErrorMatch() {}
func (nc *NoopCollector) TransactionResultFetched(dur time.Duration, size int) {}
func (nc *NoopCollector) TransactionReceived(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionFinalized(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExecuted(txID flow.Identifier, when time.Time) {}
func (nc *NoopCollector) TransactionExpired(txID flow.Identifier) {}
func (nc *NoopCollector) TransactionSubmissionFailed() {}
func (nc *NoopCollector) UpdateExecutionReceiptMaxHeight(height uint64) {}
func (nc *NoopCollector) UpdateLastFullBlockHeight(height uint64) {}
func (nc *NoopCollector) ChunkDataPackRequestProcessed() {}
func (nc *NoopCollector) ExecutionSync(syncing bool) {}
func (nc *NoopCollector) ExecutionBlockDataUploadStarted() {}
func (nc *NoopCollector) ExecutionBlockDataUploadFinished(dur time.Duration) {}
func (nc *NoopCollector) ExecutionComputationResultUploaded() {}
func (nc *NoopCollector) ExecutionComputationResultUploadRetried() {}
func (nc *NoopCollector) RootIDComputed(duration time.Duration, numberOfChunks int) {}
func (nc *NoopCollector) AddBlobsSucceeded(duration time.Duration, totalSize uint64) {}
func (nc *NoopCollector) AddBlobsFailed() {}
func (nc *NoopCollector) FulfilledHeight(blockHeight uint64) {}
func (nc *NoopCollector) ReceiptSkipped() {}
func (nc *NoopCollector) RequestSucceeded(uint64, time.Duration, uint64, int) {}
func (nc *NoopCollector) RequestFailed(duration time.Duration, retryable bool) {}
func (nc *NoopCollector) RequestCanceled() {}
func (nc *NoopCollector) ResponseDropped() {}
Expand Down Expand Up @@ -312,3 +311,8 @@ func (nc *NoopCollector) OnViolationReportSkipped() {}
var _ ObserverMetrics = (*NoopCollector)(nil)

func (nc *NoopCollector) RecordRPC(handler, rpc string, code codes.Code) {}

var _ module.ExecutionStateIndexerMetrics = (*NoopCollector)(nil)

func (nc *NoopCollector) BlockIndexed(uint64, time.Duration, int, int, int) {}
func (nc *NoopCollector) BlockReindexed() {}
21 changes: 17 additions & 4 deletions module/state_synchronization/indexer/indexer_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
Expand All @@ -18,11 +19,13 @@ import (

// IndexerCore indexes the execution state.
type IndexerCore struct {
log zerolog.Logger
metrics module.ExecutionStateIndexerMetrics

registers storage.RegisterIndex
headers storage.Headers
events storage.Events
results storage.LightTransactionResults
log zerolog.Logger
batcher bstorage.BatchBuilder
}

Expand All @@ -31,6 +34,7 @@ type IndexerCore struct {
// won't be initialized to ensure we have bootstrapped the storage first.
func New(
log zerolog.Logger,
metrics module.ExecutionStateIndexerMetrics,
batcher bstorage.BatchBuilder,
registers storage.RegisterIndex,
headers storage.Headers,
Expand All @@ -39,6 +43,7 @@ func New(
) (*IndexerCore, error) {
return &IndexerCore{
log: log.With().Str("component", "execution_indexer").Logger(),
metrics: metrics,
batcher: batcher,
registers: registers,
headers: headers,
Expand Down Expand Up @@ -92,6 +97,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
// for indexing resources which might fail to update the values, so this enables rerunning and reindexing those resources
if block.Height == latest {
lg.Warn().Msg("reindexing block data")
c.metrics.BlockReindexed()
}

start := time.Now()
Expand All @@ -103,6 +109,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
// downloaded and indexed before the block is sealed. However, when a node is catching up, it
// may download the execution data first. In that case, we should index the collections here.

var eventCount, resultCount, registerCount int
g.Go(func() error {
start := time.Now()

Expand Down Expand Up @@ -130,9 +137,12 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("batch flush error: %w", err)
}

eventCount = len(events)
resultCount = len(results)

lg.Debug().
Int("event_count", len(events)).
Int("result_count", len(results)).
Int("event_count", eventCount).
Int("result_count", resultCount).
Dur("duration_ms", time.Since(start)).
Msg("indexed badger data")

Expand Down Expand Up @@ -168,8 +178,10 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("could not index register payloads at height %d: %w", block.Height, err)
}

registerCount = len(payloads)

lg.Debug().
Int("register_count", len(payloads)).
Int("register_count", registerCount).
Dur("duration_ms", time.Since(start)).
Msg("indexed registers")

Expand All @@ -181,6 +193,7 @@ func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEnti
return fmt.Errorf("failed to index block data at height %d: %w", block.Height, err)
}

c.metrics.BlockIndexed(block.Height, time.Since(start), registerCount, eventCount, resultCount)
lg.Debug().
Dur("duration_ms", time.Since(start)).
Msg("indexed block data")
Expand Down
Loading
Loading