Skip to content

Commit

Permalink
Merge pull request #5548 from onflow/janez/improve-tps-metering
Browse files Browse the repository at this point in the history
Improve CI TPS metering input
  • Loading branch information
janezpodhostnik authored Apr 12, 2024
2 parents 55fd3d0 + fa9f719 commit 2b7a321
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 804 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ generate-mocks: install-mock-generators
mockery --name '(Connector|PingInfoProvider)' --dir=network/p2p --case=underscore --output="./network/mocknetwork" --outpkg="mocknetwork"
CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=storage/mocks/storage.go -package=mocks github.com/onflow/flow-go/storage Blocks,Headers,Payloads,Collections,Commits,Events,ServiceEvents,TransactionResults
CGO_CFLAGS=$(CRYPTO_FLAG) mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network EngineRegistry
mockery --name='.*' --dir=integration/benchmark/mocksiface --case=underscore --output="integration/benchmark/mock" --outpkg="mock"
mockery --name=ExecutionDataStore --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name=Downloader --dir=module/executiondatasync/execution_data --case=underscore --output="./module/executiondatasync/execution_data/mock" --outpkg="mock"
mockery --name '(ExecutionDataRequester|IndexReporter)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization"
Expand Down
119 changes: 75 additions & 44 deletions integration/benchmark/cmd/ci/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,22 @@ package main
import (
"context"
"flag"
"net"
"os"
"strings"
"time"

"github.com/onflow/flow-go/integration/benchmark/load"

"github.com/prometheus/client_golang/prometheus"

"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v3"

flowsdk "github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/access"
client "github.com/onflow/flow-go-sdk/access/grpc"

"github.com/onflow/flow-go/integration/benchmark"
pb "github.com/onflow/flow-go/integration/benchmark/proto"
"github.com/onflow/flow-go/integration/benchmark/load"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
Expand All @@ -33,7 +30,7 @@ type BenchmarkInfo struct {

// Hardcoded CI values
const (
defaultLoadType = "token-transfer"
defaultLoadType = load.TokenTransferLoadType
metricport = uint(8080)
accessNodeAddress = "127.0.0.1:4001"
pushgateway = "127.0.0.1:9091"
Expand All @@ -45,35 +42,43 @@ const (
defaultMetricCollectionInterval = 20 * time.Second

// gRPC constants
defaultMaxMsgSize = 1024 * 1024 * 16 // 16 MB
defaultGRPCAddress = "127.0.0.1:4777"
defaultMaxMsgSize = 1024 * 1024 * 16 // 16 MB
)

func main() {
logLvl := flag.String("log-level", "info", "set log level")

// CI relevant flags
grpcAddressFlag := flag.String("grpc-address", defaultGRPCAddress, "listen address for gRPC server")
initialTPSFlag := flag.Int("tps-initial", 10, "starting transactions per second")
maxTPSFlag := flag.Int("tps-max", *initialTPSFlag, "maximum transactions per second allowed")
minTPSFlag := flag.Int("tps-min", *initialTPSFlag, "minimum transactions per second allowed")
loadTypeFlag := flag.String("load-type", string(defaultLoadType), "load type (token-transfer / const-exec / evm) from the load config file")
loadConfigFileLocationFlag := flag.String("load-config", "", "load config file location. If not provided, default config will be used.")

adjustIntervalFlag := flag.Duration("tps-adjust-interval", defaultAdjustInterval, "interval for adjusting TPS")
adjustDelayFlag := flag.Duration("tps-adjust-delay", 120*time.Second, "delay before adjusting TPS")
statIntervalFlag := flag.Duration("stat-interval", defaultMetricCollectionInterval, "")
durationFlag := flag.Duration("duration", 10*time.Minute, "test duration")

statIntervalFlag := flag.Duration("stat-interval", defaultMetricCollectionInterval, "")
gitRepoPathFlag := flag.String("git-repo-path", "../..", "git repo path of the filesystem")
gitRepoURLFlag := flag.String("git-repo-url", "https://github.com/onflow/flow-go.git", "git repo URL")
bigQueryUpload := flag.Bool("bigquery-upload", true, "whether to upload results to BigQuery (true / false)")
bigQueryProjectFlag := flag.String("bigquery-project", "dapperlabs-data", "project name for the bigquery uploader")
bigQueryDatasetFlag := flag.String("bigquery-dataset", "dev_src_flow_tps_metrics", "dataset name for the bigquery uploader")
bigQueryRawTableFlag := flag.String("bigquery-raw-table", "rawResults", "table name for the bigquery raw results")
loadTypeFlag := flag.String("load-type", defaultLoadType, "load type (token-transfer / const-exec / evm)")
flag.Parse()

loadType := *loadTypeFlag

log := setupLogger(logLvl)

loadConfig := getLoadConfig(
log,
*loadConfigFileLocationFlag,
*loadTypeFlag,
*minTPSFlag,
*maxTPSFlag,
*initialTPSFlag,
)

if *gitRepoPathFlag == "" {
flag.PrintDefaults()
log.Fatal().Msg("git repo path is required")
Expand All @@ -86,26 +91,6 @@ func main() {
<-server.Ready()
loaderMetrics := metrics.NewLoaderCollector()

grpcServerOptions := []grpc.ServerOption{
grpc.MaxRecvMsgSize(defaultMaxMsgSize),
grpc.MaxSendMsgSize(defaultMaxMsgSize),
}
grpcServer := grpc.NewServer(grpcServerOptions...)
defer grpcServer.Stop()

pb.RegisterBenchmarkServer(grpcServer, &benchmarkServer{})

grpcListener, err := net.Listen("tcp", *grpcAddressFlag)
if err != nil {
log.Fatal().Err(err).Str("address", *grpcAddressFlag).Msg("failed to listen")
}

go func() {
if err := grpcServer.Serve(grpcListener); err != nil {
log.Fatal().Err(err).Msg("failed to serve")
}
}()

sp := benchmark.NewStatsPusher(ctx, log, pushgateway, "loader", prometheus.DefaultGatherer)
defer sp.Stop()

Expand Down Expand Up @@ -136,10 +121,7 @@ func main() {

// prepare load generator
log.Info().
Str("load_type", loadType).
Int("initialTPS", *initialTPSFlag).
Int("minTPS", *minTPSFlag).
Int("maxTPS", *maxTPSFlag).
Interface("loadConfig", loadConfig).
Dur("duration", *durationFlag).
Msg("Running load case")

Expand All @@ -164,7 +146,7 @@ func main() {
},
benchmark.LoadParams{
NumberOfAccounts: maxInflight,
LoadType: load.LoadType(loadType),
LoadConfig: loadConfig,
FeedbackEnabled: feedbackEnabled,
},
)
Expand All @@ -187,9 +169,9 @@ func main() {
AdjusterParams{
Delay: *adjustDelayFlag,
Interval: *adjustIntervalFlag,
InitialTPS: uint(*initialTPSFlag),
MinTPS: uint(*minTPSFlag),
MaxTPS: uint(*maxTPSFlag),
InitialTPS: uint(loadConfig.TPSInitial),
MinTPS: uint(loadConfig.TpsMin),
MaxTPS: uint(loadConfig.TpsMax),
MaxInflight: uint(maxInflight / 2),
},
)
Expand Down Expand Up @@ -218,7 +200,7 @@ func main() {
// only upload valid data
if *bigQueryUpload {
repoInfo := MustGetRepoInfo(log, *gitRepoURLFlag, *gitRepoPathFlag)
mustUploadData(ctx, log, recorder, repoInfo, *bigQueryProjectFlag, *bigQueryDatasetFlag, *bigQueryRawTableFlag, loadType)
mustUploadData(ctx, log, recorder, repoInfo, *bigQueryProjectFlag, *bigQueryDatasetFlag, *bigQueryRawTableFlag, loadConfig.LoadName)
} else {
log.Info().Int("raw_tps_size", len(recorder.BenchmarkResults.RawTPS)).Msg("logging tps results locally")
// log results locally when not uploading to BigQuery
Expand All @@ -228,6 +210,55 @@ func main() {
}
}

func getLoadConfig(
log zerolog.Logger,
loadConfigLocation string,
load string,
minTPS int,
maxTPS int,
initialTPS int,
) benchmark.LoadConfig {
if loadConfigLocation == "" {
lc := benchmark.LoadConfig{
LoadName: load,
LoadType: load,
TpsMax: maxTPS,
TpsMin: minTPS,
TPSInitial: initialTPS,
}

log.Info().
Interface("loadConfig", lc).
Msg("Load config file not provided, using parameters supplied in TPS flags")
return lc
}

var loadConfigs map[string]benchmark.LoadConfig

// check if the file exists
if _, err := os.Stat(loadConfigLocation); os.IsNotExist(err) {
log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("load config file not found")
}

yamlFile, err := os.ReadFile(loadConfigLocation)
if err != nil {
log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("failed to read load config file")
}

err = yaml.Unmarshal(yamlFile, &loadConfigs)
if err != nil {
log.Fatal().Err(err).Str("loadConfigLocation", loadConfigLocation).Msg("failed to unmarshal load config file")
}

lc, ok := loadConfigs[load]
if !ok {
log.Fatal().Str("load", load).Msg("load not found in load config file")
}
lc.LoadName = load

return lc
}

// setupLogger parses log level and apply to logger
func setupLogger(logLvl *string) zerolog.Logger {
log := zerolog.New(os.Stderr).
Expand All @@ -252,7 +283,7 @@ func mustUploadData(
bigQueryProject string,
bigQueryDataset string,
bigQueryRawTable string,
loadType string,
loadName string,
) {
log.Info().Msg("Initializing BigQuery")
db, err := NewDB(ctx, log, bigQueryProject)
Expand All @@ -278,7 +309,7 @@ func mustUploadData(
bigQueryRawTable,
recorder.BenchmarkResults,
*repoInfo,
BenchmarkInfo{BenchmarkType: loadType},
BenchmarkInfo{BenchmarkType: loadName},
MustGetDefaultEnvironment(),
)
if err != nil {
Expand Down
28 changes: 0 additions & 28 deletions integration/benchmark/cmd/ci/server.go

This file was deleted.

12 changes: 8 additions & 4 deletions integration/benchmark/cmd/manual/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"strings"
"time"

"github.com/onflow/flow-go/integration/benchmark/load"

"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"google.golang.org/grpc"
Expand Down Expand Up @@ -132,8 +130,14 @@ func main() {
},
benchmark.LoadParams{
NumberOfAccounts: int(maxTPS) * *accountMultiplierFlag,
LoadType: load.LoadType(*loadTypeFlag),
FeedbackEnabled: *feedbackEnabled,
LoadConfig: benchmark.LoadConfig{
LoadName: *loadTypeFlag,
LoadType: *loadTypeFlag,
TpsMax: int(maxTPS),
TpsMin: int(maxTPS),
TPSInitial: int(maxTPS),
},
FeedbackEnabled: *feedbackEnabled,
},
)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions integration/benchmark/contLoadGenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,20 @@ type NetworkParams struct {
ChainId flow.ChainID
}

type LoadConfig struct {
// LoadName is the name of the load. This can be different from the LoadType
// and is used to identify the load in the results. The use case is when a single
// load type is used to run multiple loads with different parameters.
LoadName string `yaml:"-"`
LoadType string `yaml:"load_type"`
TpsMax int `yaml:"tps_max"`
TpsMin int `yaml:"tps_min"`
TPSInitial int `yaml:"tps_initial"`
}

type LoadParams struct {
NumberOfAccounts int
LoadType load.LoadType
LoadConfig LoadConfig

// TODO(rbtz): inject a TxFollower
FeedbackEnabled bool
Expand Down Expand Up @@ -157,7 +168,7 @@ func New(
Proposer: servAcc,
}

l := load.CreateLoadType(log, loadParams.LoadType)
l := load.CreateLoadType(log, load.LoadType(loadParams.LoadConfig.LoadType))

err = l.Setup(log, lc)
if err != nil {
Expand Down
10 changes: 0 additions & 10 deletions integration/benchmark/mocksiface/mocks.go

This file was deleted.

3 changes: 0 additions & 3 deletions integration/benchmark/proto/generate.go

This file was deleted.

Loading

0 comments on commit 2b7a321

Please sign in to comment.