Skip to content

Commit

Permalink
Exposed transaction metrics to grpc endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Jul 3, 2024
1 parent 665163d commit 31f1186
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 3 deletions.
40 changes: 38 additions & 2 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/onflow/flow-go/engine/execution/checker"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/computation/committer"
txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics"
"github.com/onflow/flow-go/engine/execution/ingestion"
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
"github.com/onflow/flow-go/engine/execution/ingestion/loader"
Expand Down Expand Up @@ -127,7 +128,7 @@ type ExecutionNode struct {

ingestionUnit *engine.Unit

collector module.ExecutionMetrics
collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
Expand Down Expand Up @@ -160,6 +161,7 @@ type ExecutionNode struct {
executionDataTracker tracker.Storage
blobService network.BlobService
blobserviceDependable *module.ProxiedReadyDoneAware
metricsProvider txmetrics.TransactionExecutionMetricsProvider
}

func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Expand Down Expand Up @@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader).
Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics).
Component("provider engine", exeNode.LoadProviderEngine).
Component("checker engine", exeNode.LoadCheckerEngine).
Component("ingestion engine", exeNode.LoadIngestionEngine).
Expand Down Expand Up @@ -544,10 +547,23 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

vmCtx := fvm.NewContext(opts...)

collector := exeNode.collector.WithTransactionCallback(
func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) {
exeNode.metricsProvider.Collect(
info.BlockID,
info.BlockHeight,
txmetrics.TransactionExecutionMetrics{
TransactionID: info.TransactionID,
ExecutionTime: dur,
ExecutionEffortWeights: stats.ComputationIntensities,
})
})

ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer)
manager, err := computation.New(
node.Logger,
exeNode.collector,
// todo inject metrics for computation intensities
collector,
node.Tracer,
node.Me,
node.State,
Expand Down Expand Up @@ -1127,6 +1143,26 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD
return exeNode.scriptsEng, nil
}

func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics(
node *NodeConfig,
) (module.ReadyDoneAware, error) {
latestFinalizedBlock, err := node.State.Final().Head()
if err != nil {
return nil, fmt.Errorf("could not get latest finalized block: %w", err)
}

metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider(
node.Logger,
exeNode.executionState,
node.Storage.Headers,
latestFinalizedBlock,
0,
)
node.ProtocolEvents.AddConsumer(metricsProvider)
exeNode.metricsProvider = metricsProvider
return metricsProvider, nil
}

func (exeNode *ExecutionNode) LoadConsensusCommittee(
node *NodeConfig,
) (
Expand Down
1 change: 0 additions & 1 deletion engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ func (e *blockComputer) queueTransactionRequests(
i == len(collection.Transactions)-1)
txnIndex += 1
}

}

systemCtx := fvm.NewContextFromParent(
Expand Down
130 changes: 130 additions & 0 deletions engine/execution/computation/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package metrics

import (
"sync"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)

type collector struct {
log zerolog.Logger

collection chan metrics

mu *sync.Mutex

latestHeight uint64
blocksAtHeight map[uint64]map[flow.Identifier]struct{}
metrics map[flow.Identifier][]TransactionExecutionMetrics
}

func newCollector(
log zerolog.Logger,
latestHeight uint64,
) *collector {
return &collector{
log: log,
latestHeight: latestHeight,

collection: make(chan metrics, 1000),
mu: &sync.Mutex{},
blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}),
metrics: make(map[flow.Identifier][]TransactionExecutionMetrics),
}
}

// Collect should never block because it's called from the execution
func (c *collector) Collect(
blockId flow.Identifier,
blockHeight uint64,
t TransactionExecutionMetrics,
) {
select {
case c.collection <- metrics{
TransactionExecutionMetrics: t,
blockHeight: blockHeight,
blockId: blockId,
}:
default:
c.log.Warn().
Uint64("height", blockHeight).
Msg("dropping metrics because the collection channel is full")
}
}

func (c *collector) metricsCollectorWorker(
ctx irrecoverable.SignalerContext,
ready component.ReadyFunc,
) {
ready()

for {
select {
case <-ctx.Done():
return
case m := <-c.collection:
c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics)
}
}
}

func (c *collector) collect(
blockId flow.Identifier,
blockHeight uint64,
t TransactionExecutionMetrics,
) {
c.mu.Lock()
defer c.mu.Unlock()

if blockHeight <= c.latestHeight {
c.log.Warn().
Uint64("height", blockHeight).
Uint64("latestHeight", c.latestHeight).
Msg("received metrics for a block that is older or equal than the most recent block")
return
}

if _, ok := c.blocksAtHeight[blockHeight]; !ok {
c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{})
}
c.blocksAtHeight[blockHeight][blockId] = struct{}{}
c.metrics[blockId] = append(c.metrics[blockId], t)
}

// Pop returns the metrics for the given block at the given height
// and clears all data up to the given height.
func (c *collector) Pop(height uint64, block flow.Identifier) []TransactionExecutionMetrics {
c.mu.Lock()
defer c.mu.Unlock()

if height <= c.latestHeight && c.latestHeight != 0 {
c.log.Warn().
Uint64("height", height).
Stringer("block", block).
Msg("requested metrics for a block that is older or equal than the most recent block")
return nil
}

metrics := c.metrics[block]

c.advanceTo(height)

return metrics
}

// advanceTo moves the latest height to the given height
// all data at lower heights will be deleted
func (c *collector) advanceTo(height uint64) {
for c.latestHeight < height {
c.latestHeight++
blocks := c.blocksAtHeight[c.latestHeight]
for block := range blocks {
delete(c.metrics, block)
}
delete(c.blocksAtHeight, c.latestHeight)
}
}
82 changes: 82 additions & 0 deletions engine/execution/computation/metrics/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package metrics

import (
"context"
"sync"
"testing"
"time"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
)

func Test_CollectorPopOnEmpty(t *testing.T) {
t.Parallel()

log := zerolog.New(zerolog.NewTestWriter(t))
latestHeight := uint64(100)

collector := newCollector(log, latestHeight)

data := collector.Pop(latestHeight, flow.ZeroID)
require.Nil(t, data)
}

func Test_CollectorCollection(t *testing.T) {
log := zerolog.New(zerolog.NewTestWriter(t))
latestHeight := uint64(100)

collector := newCollector(log, latestHeight)

ctx := context.Background()
go func() {
ictx := irrecoverable.NewMockSignalerContext(t, ctx)
collector.metricsCollectorWorker(ictx, func() {})
}()

wg := sync.WaitGroup{}

wg.Add(16 * 16 * 16)
for h := 0; h < 16; h++ {
for b := 0; b < 16; b++ {
for t := 0; t < 16; t++ {
go func(h, b, t int) {
defer wg.Done()

block := flow.Identifier{}
// 4 different blocks per height
block[0] = byte(h)
block[1] = byte(b)

collector.Collect(block, latestHeight+1+uint64(h), TransactionExecutionMetrics{
ExecutionTime: time.Duration(b + t),
})
}(h, b, t)
}
// wait a bit for the collector to process the data
<-time.After(1 * time.Millisecond)
}
}

wg.Wait()
// wait a bit for the collector to process the data
<-time.After(10 * time.Millisecond)

for h := 0; h < 16; h++ {
block := flow.Identifier{}
block[0] = byte(h)

data := collector.Pop(latestHeight+1+uint64(h), block)

require.Len(t, data, 16)
}

data := collector.Pop(latestHeight, flow.ZeroID)
require.Nil(t, data)

data = collector.Pop(latestHeight+17, flow.ZeroID)
require.Nil(t, data)
}
Loading

0 comments on commit 31f1186

Please sign in to comment.