From aea0db9cce70da387b6aa6ad9de3ec862b8778dc Mon Sep 17 00:00:00 2001 From: Kirill-Churkin Date: Mon, 16 Jan 2023 18:21:18 +0300 Subject: [PATCH] bench: cluster patch --- CHANGELOG.md | 8 ++ cli/bench/bench.go | 201 ++++++--------------------------- cli/bench/cluster.go | 231 ++++++++++++++++++++++++++++++++++++++ cli/bench/config.go | 28 ----- cli/bench/connection.go | 51 +++++++++ cli/bench/one_instance.go | 78 +++++++++++++ cli/bench/requests.go | 60 ++++++++-- cli/bench/types.go | 36 +++++- cli/bench/utils.go | 141 +++++++++++++++++++++++ cli/commands/bench.go | 2 + cli/context/context.go | 26 +++-- 11 files changed, 642 insertions(+), 220 deletions(-) create mode 100644 cli/bench/cluster.go create mode 100644 cli/bench/connection.go create mode 100644 cli/bench/one_instance.go create mode 100644 cli/bench/utils.go diff --git a/CHANGELOG.md b/CHANGELOG.md index dcd4bc1b2..0391398bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Tarantool benchmark tool update (cluster bench): + * option --leader has been added - sest array of url's for leaders. + * option --replica has been added - sest array of url's for replicas. + ## [2.12.4] - 2022-12-31 ### Changed diff --git a/cli/bench/bench.go b/cli/bench/bench.go index 72521b00d..2c4f9725d 100644 --- a/cli/bench/bench.go +++ b/cli/bench/bench.go @@ -1,115 +1,38 @@ package bench import ( - bctx "context" "fmt" "math/rand" - "sync" "time" - "github.com/FZambia/tarantool" "github.com/tarantool/cartridge-cli/cli/context" ) -// printResults outputs benchmark foramatted results. -func printResults(results Results) { - fmt.Printf("\nResults:\n") - fmt.Printf("\tSuccess operations: %d\n", results.successResultCount) - fmt.Printf("\tFailed operations: %d\n", results.failedResultCount) - fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount) - fmt.Printf("\tTime (seconds): %f\n", results.duration) - fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond) -} - -// verifyOperationsPercentage checks that the amount of operations percentage is 100. -func verifyOperationsPercentage(ctx *context.BenchCtx) error { - entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount - if entire_percentage != 100 { - return fmt.Errorf( - "The number of operations as a percentage should be equal to 100, " + - "note that by default the percentage of inserts is 100") - } - return nil -} - -// spacePreset prepares space for a benchmark. -func spacePreset(tarantoolConnection *tarantool.Connection) error { - dropBenchmarkSpace(tarantoolConnection) - return createBenchmarkSpace(tarantoolConnection) -} - -// incrementRequest increases the counter of successful/failed requests depending on the presence of an error. -func (results *Results) incrementRequestsCounters(err error) { - if err == nil { - results.successResultCount++ - } else { - results.failedResultCount++ - } - results.handledRequestsCount++ -} +// Main benchmark function. +func Run(ctx context.BenchCtx) error { + rand.Seed(time.Now().UnixNano()) -// requestsLoop continuously executes the insert query until the benchmark time runs out. -func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) { - for { - select { - case <-backgroundCtx.Done(): - return - default: - request := requestsSequence.getNext() - request.operation(&request) - } + if err := verifyOperationsPercentage(&ctx); err != nil { + return err } -} -// connectionLoop runs "ctx.SimultaneousRequests" requests execution goroutines -// through the same connection. -func connectionLoop( - ctx *context.BenchCtx, - requestsSequence *RequestsSequence, - backgroundCtx bctx.Context, -) { - var connectionWait sync.WaitGroup - for i := 0; i < ctx.SimultaneousRequests; i++ { - connectionWait.Add(1) - go func() { - defer connectionWait.Done() - requestsLoop(requestsSequence, backgroundCtx) - }() + // Check cluster topology for further actions. + cluster, err := isCluster(ctx) + if err != nil { + return err } - connectionWait.Wait() -} - -// preFillBenchmarkSpaceIfRequired fills benchmark space -// if insert count = 0 or PreFillingCount flag is explicitly specified. -func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx, connectionPool []*tarantool.Connection) error { - if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount { - fmt.Println("\nThe pre-filling of the space has started,\n" + - "because the insert operation is not specified\n" + - "or there was an explicit instruction for pre-filling.") - fmt.Println("...") - filledCount, err := fillBenchmarkSpace(ctx, connectionPool) - if err != nil { + if cluster { + // Check cluster for wrong topology. + if err := verifyClusterTopology(ctx); err != nil { return err } - fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount) - } - return nil -} - -// Main benchmark function. -func Run(ctx context.BenchCtx) error { - rand.Seed(time.Now().UnixNano()) - - if err := verifyOperationsPercentage(&ctx); err != nil { - return err + // Get url of one of instances in cluster for space preset and prefill. + ctx.URL = (*ctx.Leaders)[0] } // Connect to tarantool and preset space for benchmark. - tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{ - User: ctx.User, - Password: ctx.Password, - }) + tarantoolConnection, err := createConnection(ctx) if err != nil { return fmt.Errorf( "Couldn't connect to Tarantool %s.", @@ -123,87 +46,37 @@ func Run(ctx context.BenchCtx) error { return err } - /// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements. - connectionPool := make([]*tarantool.Connection, ctx.Connections) - for i := 0; i < ctx.Connections; i++ { - connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{ - User: ctx.User, - Password: ctx.Password, - }) - if err != nil { - return err - } - defer connectionPool[i].Close() - } - - if err := preFillBenchmarkSpaceIfRequired(ctx, connectionPool); err != nil { + if err := preFillBenchmarkSpaceIfRequired(ctx); err != nil { return err } fmt.Println("Benchmark start") fmt.Println("...") - // The "context" will be used to stop all "connectionLoop" when the time is out. - backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) - var waitGroup sync.WaitGroup - results := Results{} - - startTime := time.Now() - timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second))) - - // Start detached connections. - for i := 0; i < ctx.Connections; i++ { - waitGroup.Add(1) - go func(connection *tarantool.Connection) { - defer waitGroup.Done() - requestsSequence := RequestsSequence{ - []RequestsGenerator{ - { - Request{ - insertOperation, - ctx, - connection, - &results, - }, - ctx.InsertCount, - }, - { - Request{ - selectOperation, - ctx, - connection, - &results, - }, - ctx.SelectCount, - }, - { - Request{ - updateOperation, - ctx, - connection, - &results, - }, - ctx.UpdateCount, - }, - }, - 0, - ctx.InsertCount, - sync.Mutex{}, - } - connectionLoop(&ctx, &requestsSequence, backgroundCtx) - }(connectionPool[i]) + // Bench one instance by default. + benchStart := benchOneInstance + if cluster { + benchStart = benchCluster + } + + // Prepare data for bench. + benchData := getBenchData(ctx) + + // Start benching. + if err := benchStart(ctx, &benchData); err != nil { + return err } - // Sends "signal" to all "connectionLoop" and waits for them to complete. - <-timer.C - cancel() - waitGroup.Wait() - results.duration = time.Since(startTime).Seconds() - results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration) + // Calculate results. + benchData.results.duration = time.Since(benchData.startTime).Seconds() + benchData.results.requestsPerSecond = int(float64(benchData.results.handledRequestsCount) / benchData.results.duration) - dropBenchmarkSpace(tarantoolConnection) - fmt.Println("Benchmark stop") + // Benchmark space must exist after bench. + if err := dropBenchmarkSpace(tarantoolConnection); err != nil { + return err + } + fmt.Println("Benchmark stop.") - printResults(results) + printResults(benchData.results) return nil } diff --git a/cli/bench/cluster.go b/cli/bench/cluster.go new file mode 100644 index 000000000..c65f3c3e0 --- /dev/null +++ b/cli/bench/cluster.go @@ -0,0 +1,231 @@ +package bench + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/common" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// verifyLeaders check each replica have leader. +func verifyReplicas(ctx context.BenchCtx) error { + foundLeaders := make(map[string]int) + for _, replica_url := range *ctx.Replicas { + haveLeader := false + tarantoolConnection, err := createConnection(ctx, replica_url) + if err != nil { + return err + } + defer tarantoolConnection.Close() + command := "return box.info.replication" + replication, err := tarantoolConnection.Exec(tarantool.Eval(command, []interface{}{})) + if err != nil { + return err + } + replicationValue := reflect.ValueOf(replication.Data).Index(0).Elem() + replicationIterator := replicationValue.MapRange() + for replicationIterator.Next() { + //fmt.Println(replicationIterator.Key(), ": ", replicationIterator.Value()) + upstream := replicationIterator.Value().Elem().MapIndex(reflect.ValueOf("upstream")) + if upstream.IsValid() && !upstream.IsZero() { + //fmt.Println(upstream) + peer := upstream.Elem().MapIndex(reflect.ValueOf("peer")) + if peer.IsValid() && !peer.IsZero() { + //fmt.Println(peer) + if common.StringSliceContains(*ctx.Leaders, peer.Elem().String()) { + haveLeader = true + foundLeaders[peer.Elem().String()] += 1 + } else { + return fmt.Errorf("Replica has a leader outside the cluster") + } + } + } + } + if !haveLeader { + return fmt.Errorf("Replica has no leader") + } + } + + if reflect.DeepEqual(reflect.ValueOf(foundLeaders).MapKeys(), *&ctx.Leaders) { + return fmt.Errorf("There are extra leaders") + } + return nil +} + +// verifyLeaders check each leader have replica. +func verifyLeaders(ctx context.BenchCtx) error { + for _, leader_url := range *ctx.Leaders { + haveReplica := false + tarantoolConnection, err := createConnection(ctx, leader_url) + defer tarantoolConnection.Close() + if err != nil { + return err + } + command := "return box.info.replication" + replication, err := tarantoolConnection.Exec(tarantool.Eval(command, []interface{}{})) + if err != nil { + return err + } + replicationValue := reflect.ValueOf(replication.Data).Index(0).Elem() + replicationIterator := replicationValue.MapRange() + for replicationIterator.Next() { + downstream := replicationIterator.Value().Elem().MapIndex(reflect.ValueOf("downstream")) + if downstream.IsValid() && !downstream.IsZero() { + haveReplica = true + } + } + if !haveReplica { + return fmt.Errorf("Leader has no replica") + } + } + return nil +} + +// verifyClusterTopology check cluster for wrong topology. +func verifyClusterTopology(ctx context.BenchCtx) error { + if err := verifyReplicas(ctx); err != nil { + return err + } + if err := verifyLeaders(ctx); err != nil { + return err + } + return nil +} + +// createNodesConnectionsPools creates connections pool for every node in cluster. +func createNodesConnectionsPools(ctx context.BenchCtx) (map[string][]RotaryConnectionsPool, error) { + nodesConnectionsPools := make(map[string][]RotaryConnectionsPool) + nodesConnectionsPools["leaders"] = make([]RotaryConnectionsPool, len(*ctx.Leaders)) + nodesConnectionsPools["replicas"] = make([]RotaryConnectionsPool, len(*ctx.Replicas)) + + for i, leader_url := range *ctx.Leaders { + connectionsPool, err := createConnectionsPool(ctx, leader_url) + if err != nil { + return nil, err + } + nodesConnectionsPools["leaders"][i] = RotaryConnectionsPool{ + connectionsPool: connectionsPool, + } + } + + for i, replica_url := range *ctx.Replicas { + connectionsPool, err := createConnectionsPool(ctx, replica_url) + if err != nil { + return nil, err + } + nodesConnectionsPools["replicas"][i] = RotaryConnectionsPool{ + connectionsPool: connectionsPool, + } + } + + return nodesConnectionsPools, nil +} + +// deleteNodesConnectionsPools delete all connections pools. +func deleteNodesConnectionsPools(nodesConnectionsPools map[string][]RotaryConnectionsPool) { + for _, connectionsPools := range nodesConnectionsPools { + for i := range connectionsPools { + deleteConnectionsPool(connectionsPools[i].connectionsPool) + } + } +} + +// getNextConnection retrun next connection of node connections pool. +func (rotaryConnectionsPool *RotaryConnectionsPool) getNextConnection() *tarantool.Connection { + rotaryConnectionsPool.mutex.Lock() + returnConnection := rotaryConnectionsPool.connectionsPool[rotaryConnectionsPool.currentIndex] + rotaryConnectionsPool.currentIndex++ + rotaryConnectionsPool.currentIndex %= len(rotaryConnectionsPool.connectionsPool) + rotaryConnectionsPool.mutex.Unlock() + return returnConnection +} + +// getNextConnectionsPool return next node represented by connections pool. +func (rotaryNodesConnectionsPools *RotaryNodesConnectionsPools) getNextConnectionsPool() *RotaryConnectionsPool { + rotaryNodesConnectionsPools.mutex.Lock() + returnConnectionsPool := &rotaryNodesConnectionsPools.rotaryConnectionsPool[rotaryNodesConnectionsPools.currentIndex] + rotaryNodesConnectionsPools.currentIndex++ + rotaryNodesConnectionsPools.currentIndex %= len(rotaryNodesConnectionsPools.rotaryConnectionsPool) + rotaryNodesConnectionsPools.mutex.Unlock() + return returnConnectionsPool +} + +// benchCluster execute bench algorithm for cluster. +func benchCluster(ctx context.BenchCtx, benchData *BenchmarkData) error { + // Сreate connections pools for all nodes in cluster before starting the benchmark + // to exclude the connection establishment time from measurements. + nodesConnectionsPools, err := createNodesConnectionsPools(ctx) + if err != nil { + return err + } + defer deleteNodesConnectionsPools(nodesConnectionsPools) + + mutationConnections := nodesConnectionsPools["leaders"] + selectConnections := nodesConnectionsPools["replicas"] + + if len(*ctx.Replicas) == 0 { + selectConnections = append(selectConnections, mutationConnections...) + } + + benchData.startTime = time.Now() + + // Start detached connections. + for i := 0; i < ctx.Connections; i++ { + benchData.waitGroup.Add(1) + go func() { + defer benchData.waitGroup.Done() + requestsSequence := RequestsSequence{ + requests: []RequestsGenerator{ + { + request: Request{ + operation: clusterInsertOperation, + ctx: ctx, + clusterNodesConnections: RotaryNodesConnectionsPools{rotaryConnectionsPool: mutationConnections}, + results: &benchData.results, + }, + count: ctx.InsertCount, + }, + { + request: Request{ + operation: clusterSelectOperation, + ctx: ctx, + clusterNodesConnections: RotaryNodesConnectionsPools{rotaryConnectionsPool: selectConnections}, + results: &benchData.results, + }, + count: ctx.SelectCount, + }, + { + request: Request{ + operation: clusterUpdateOperation, + ctx: ctx, + clusterNodesConnections: RotaryNodesConnectionsPools{rotaryConnectionsPool: mutationConnections}, + results: &benchData.results, + }, + count: ctx.UpdateCount, + }, + }, + currentRequestIndex: 0, + currentCounter: ctx.InsertCount, + findNewRequestsGeneratorMutex: sync.Mutex{}, + } + + // Start looped requests in connection. + var requestsWait sync.WaitGroup + for i := 0; i < ctx.SimultaneousRequests; i++ { + requestsWait.Add(1) + go func() { + defer requestsWait.Done() + requestsLoop(&requestsSequence, benchData.backgroundCtx) + }() + } + requestsWait.Wait() + }() + } + + waitBenchEnd(benchData) + return nil +} diff --git a/cli/bench/config.go b/cli/bench/config.go index b13ac4a48..bb1dc74c1 100644 --- a/cli/bench/config.go +++ b/cli/bench/config.go @@ -2,11 +2,6 @@ package bench import ( "fmt" - "os" - "text/tabwriter" - - "github.com/FZambia/tarantool" - "github.com/tarantool/cartridge-cli/cli/context" ) var ( @@ -19,26 +14,3 @@ var ( benchSpacePrimaryIndexName, ) ) - -// printConfig output formatted config parameters. -func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) { - fmt.Printf("%s\n", tarantoolConnection.Greeting().Version) - fmt.Printf("Parameters:\n") - fmt.Printf("\tURL: %s\n", ctx.URL) - fmt.Printf("\tuser: %s\n", ctx.User) - fmt.Printf("\tconnections: %d\n", ctx.Connections) - fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests) - fmt.Printf("\tduration: %d seconds\n", ctx.Duration) - fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize) - fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize) - fmt.Printf("\tinsert: %d percentages\n", ctx.InsertCount) - fmt.Printf("\tselect: %d percentages\n", ctx.SelectCount) - fmt.Printf("\tupdate: %d percentages\n\n", ctx.UpdateCount) - - fmt.Printf("Data schema\n") - w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) - fmt.Fprintf(w, "|\tkey\t|\tvalue\n") - fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n") - fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize) - w.Flush() -} diff --git a/cli/bench/connection.go b/cli/bench/connection.go new file mode 100644 index 000000000..507f696ba --- /dev/null +++ b/cli/bench/connection.go @@ -0,0 +1,51 @@ +package bench + +import ( + "fmt" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// createConnection creates connection to tarantool, +// using specified url if necessary. +func createConnection(ctx context.BenchCtx, url_optional ...string) (*tarantool.Connection, error) { + connect_url := ctx.URL + if len(url_optional) > 1 { + return nil, fmt.Errorf("Otpional url is more than one") + } + if len(url_optional) == 1 { + connect_url = url_optional[0] + } + tarantoolConnection, err := tarantool.Connect(connect_url, tarantool.Opts{ + User: ctx.User, + Password: ctx.Password, + }) + if err != nil { + return nil, fmt.Errorf( + "Couldn't connect to Tarantool %s.", + connect_url) + } + return tarantoolConnection, nil +} + +// createConnection creates connections pool to tarantool, +// using specified url if necessary. +func createConnectionsPool(ctx context.BenchCtx, url_optional ...string) ([]*tarantool.Connection, error) { + connectionPool := make([]*tarantool.Connection, ctx.Connections) + var err error + for i := 0; i < ctx.Connections; i++ { + connectionPool[i], err = createConnection(ctx, url_optional...) + if err != nil { + return nil, err + } + } + return connectionPool, nil +} + +// deleteConnectionsPool delete connection pool. +func deleteConnectionsPool(connectionPool []*tarantool.Connection) { + for _, connection := range connectionPool { + connection.Close() + } +} diff --git a/cli/bench/one_instance.go b/cli/bench/one_instance.go new file mode 100644 index 000000000..208314da9 --- /dev/null +++ b/cli/bench/one_instance.go @@ -0,0 +1,78 @@ +package bench + +import ( + "sync" + "time" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// benchOneInstance execute bench algorithm for only one tarantool instance. +func benchOneInstance(ctx context.BenchCtx, benchData *BenchmarkData) error { + // Сreate a "connectionsPool" before starting the benchmark + // to exclude the connection establishment time from measurements. + connectionsPool, err := createConnectionsPool(ctx) + if err != nil { + return err + } + defer deleteConnectionsPool(connectionsPool) + + benchData.startTime = time.Now() + + // Start detached connections. + for i := 0; i < ctx.Connections; i++ { + benchData.waitGroup.Add(1) + go func(connection *tarantool.Connection) { + defer benchData.waitGroup.Done() + requestsSequence := RequestsSequence{ + requests: []RequestsGenerator{ + { + request: Request{ + operation: insertOperation, + ctx: ctx, + tarantoolConnection: connection, + results: &benchData.results, + }, + count: ctx.InsertCount, + }, + { + request: Request{ + operation: selectOperation, + ctx: ctx, + tarantoolConnection: connection, + results: &benchData.results, + }, + count: ctx.SelectCount, + }, + { + request: Request{ + operation: updateOperation, + ctx: ctx, + tarantoolConnection: connection, + results: &benchData.results, + }, + count: ctx.UpdateCount, + }, + }, + currentRequestIndex: 0, + currentCounter: ctx.InsertCount, + findNewRequestsGeneratorMutex: sync.Mutex{}, + } + + // Start looped requests in connection. + var connectionWait sync.WaitGroup + for i := 0; i < ctx.SimultaneousRequests; i++ { + connectionWait.Add(1) + go func() { + defer connectionWait.Done() + requestsLoop(&requestsSequence, benchData.backgroundCtx) + }() + } + connectionWait.Wait() + }(connectionsPool[i]) + } + + waitBenchEnd(benchData) + return nil +} diff --git a/cli/bench/requests.go b/cli/bench/requests.go index 313ebf123..4148c6445 100644 --- a/cli/bench/requests.go +++ b/cli/bench/requests.go @@ -8,9 +8,9 @@ import ( "github.com/tarantool/cartridge-cli/cli/common" ) -// insertOperation execute insert operation. -func insertOperation(request *Request) { - _, err := request.tarantoolConnection.Exec( +// insertOperationOnConnection execute insert operation with specified connection. +func insertOperationOnConnection(tarantoolConnection *tarantool.Connection, request *Request) { + _, err := tarantoolConnection.Exec( tarantool.Insert( benchSpaceName, []interface{}{ @@ -20,17 +20,41 @@ func insertOperation(request *Request) { request.results.incrementRequestsCounters(err) } -// selectOperation execute select operation. -func selectOperation(request *Request) { - _, err := request.tarantoolConnection.Exec(tarantool.Call( +// insertOperation execute insert operation. +func insertOperation(request *Request) { + insertOperationOnConnection(request.tarantoolConnection, request) +} + +// clusterInsertOperation execute insert operation on cluster topology. +func clusterInsertOperation(request *Request) { + connectionsPool := request.clusterNodesConnections.getNextConnectionsPool() + tarantoolConnection := connectionsPool.getNextConnection() + insertOperationOnConnection(tarantoolConnection, request) +} + +// selectOperationOnConnection execute select operation with specified connection. +func selectOperationOnConnection(tarantoolConnection *tarantool.Connection, request *Request) { + _, err := tarantoolConnection.Exec(tarantool.Call( getRandomTupleCommand, []interface{}{rand.Int()})) request.results.incrementRequestsCounters(err) } -// updateOperation execute update operation. -func updateOperation(request *Request) { - getRandomTupleResponse, err := request.tarantoolConnection.Exec( +// selectOperation execute select operation. +func selectOperation(request *Request) { + selectOperationOnConnection(request.tarantoolConnection, request) +} + +// clusterSelectOperation execute select operation on cluster topology. +func clusterSelectOperation(request *Request) { + connectionsPool := request.clusterNodesConnections.getNextConnectionsPool() + tarantoolConnection := connectionsPool.getNextConnection() + selectOperationOnConnection(tarantoolConnection, request) +} + +// updateOperationOnConnection execute update operation with specified connection. +func updateOperationOnConnection(tarantoolConnection *tarantool.Connection, request *Request) { + getRandomTupleResponse, err := tarantoolConnection.Exec( tarantool.Call(getRandomTupleCommand, []interface{}{rand.Int()})) if err == nil { @@ -51,8 +75,20 @@ func updateOperation(request *Request) { } } +// updateOperation execute update operation. +func updateOperation(request *Request) { + updateOperationOnConnection(request.tarantoolConnection, request) +} + +// clusterUpdateOperation execute update operation on cluster topology. +func clusterUpdateOperation(request *Request) { + connectionsPool := request.clusterNodesConnections.getNextConnectionsPool() + tarantoolConnection := connectionsPool.getNextConnection() + updateOperationOnConnection(tarantoolConnection, request) +} + // getNext return next operation in operations sequence. -func (requestsSequence *RequestsSequence) getNext() Request { +func (requestsSequence *RequestsSequence) getNext() *Request { // If at the moment the number of remaining requests = 0, // then find a new generator, which requests count > 0. // If new generator has requests count = 0, then repeat. @@ -63,11 +99,11 @@ func (requestsSequence *RequestsSequence) getNext() Request { requestsSequence.currentRequestIndex++ requestsSequence.currentRequestIndex %= len(requestsSequence.requests) // Get new generator by index. - nextRequestsGenerator := requestsSequence.requests[requestsSequence.currentRequestIndex] + nextRequestsGenerator := &requestsSequence.requests[requestsSequence.currentRequestIndex] // Get requests count for new operation. requestsSequence.currentCounter = nextRequestsGenerator.count } // Logical taking of a single request. requestsSequence.currentCounter-- - return requestsSequence.requests[requestsSequence.currentRequestIndex].request + return &requestsSequence.requests[requestsSequence.currentRequestIndex].request } diff --git a/cli/bench/types.go b/cli/bench/types.go index 440b01fde..427610f07 100644 --- a/cli/bench/types.go +++ b/cli/bench/types.go @@ -1,7 +1,9 @@ package bench import ( + bctx "context" "sync" + "time" "github.com/FZambia/tarantool" "github.com/tarantool/cartridge-cli/cli/context" @@ -16,15 +18,31 @@ type Results struct { requestsPerSecond int // Cumber of requests per second - the main measured value. } +// RotaryConnectionsPool describes round-cycled connection pool. +type RotaryConnectionsPool struct { + connectionsPool []*tarantool.Connection + currentIndex int + mutex sync.Mutex +} + +// RotaryNodesConnectionsPools describes round-cycled cluster nodes array, +// where each represented by RotaryConnectionsPool. +type RotaryNodesConnectionsPools struct { + rotaryConnectionsPool []RotaryConnectionsPool + currentIndex int + mutex sync.Mutex +} + // RequestOperaion describes insert, select or update operation in request. type RequestOperaion func(*Request) // Request describes various types of requests. type Request struct { - operation RequestOperaion // insertOperation, selectOperation or updateOperation. - ctx context.BenchCtx - tarantoolConnection *tarantool.Connection - results *Results + operation RequestOperaion // insertOperation, selectOperation or updateOperation. + ctx context.BenchCtx + tarantoolConnection *tarantool.Connection + clusterNodesConnections RotaryNodesConnectionsPools + results *Results } // RequestsGenerator data structure for abstraction of a renewable heap of identical requests. @@ -44,3 +62,13 @@ type RequestsSequence struct { // findNewRequestsGeneratorMutex provides goroutine-safe search for new generator. findNewRequestsGeneratorMutex sync.Mutex } + +// BenchmarkData describes necessary data for bench. +type BenchmarkData struct { + backgroundCtx bctx.Context + cancel bctx.CancelFunc + waitGroup sync.WaitGroup + results Results + startTime time.Time + timer *time.Timer +} diff --git a/cli/bench/utils.go b/cli/bench/utils.go new file mode 100644 index 000000000..852302e05 --- /dev/null +++ b/cli/bench/utils.go @@ -0,0 +1,141 @@ +package bench + +import ( + bctx "context" + "fmt" + "os" + "strings" + "text/tabwriter" + "time" + + "github.com/FZambia/tarantool" + "github.com/tarantool/cartridge-cli/cli/context" +) + +// printResults outputs benchmark foramatted results. +func printResults(results Results) { + fmt.Printf("\nResults:\n") + fmt.Printf("\tSuccess operations: %d\n", results.successResultCount) + fmt.Printf("\tFailed operations: %d\n", results.failedResultCount) + fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount) + fmt.Printf("\tTime (seconds): %f\n", results.duration) + fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond) +} + +// printConfig output formatted config parameters. +func printConfig(ctx context.BenchCtx, tarantoolConnection *tarantool.Connection) { + fmt.Printf("%s\n", tarantoolConnection.Greeting().Version) + fmt.Printf("Parameters:\n") + if cluster, _ := isCluster(ctx); cluster { + fmt.Printf("\tLeaders:\n") + fmt.Printf("\t\t%s\n", strings.Join(*ctx.Leaders, "\n")) + fmt.Printf("\tReplicas:\n") + fmt.Printf("\t\t%s\n", strings.Join(*ctx.Replicas, "\n")) + } else { + fmt.Printf("\tURL: %s\n", ctx.URL) + } + fmt.Printf("\tuser: %s\n", ctx.User) + fmt.Printf("\tconnections: %d\n", ctx.Connections) + fmt.Printf("\tsimultaneous requests: %d\n", ctx.SimultaneousRequests) + fmt.Printf("\tduration: %d seconds\n", ctx.Duration) + fmt.Printf("\tkey size: %d bytes\n", ctx.KeySize) + fmt.Printf("\tdata size: %d bytes\n", ctx.DataSize) + fmt.Printf("\tinsert: %d percentages\n", ctx.InsertCount) + fmt.Printf("\tselect: %d percentages\n", ctx.SelectCount) + fmt.Printf("\tupdate: %d percentages\n\n", ctx.UpdateCount) + + fmt.Printf("Data schema\n") + w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) + fmt.Fprintf(w, "|\tkey\t|\tvalue\n") + fmt.Fprintf(w, "|\t------------------------------\t|\t------------------------------\n") + fmt.Fprintf(w, "|\trandom(%d)\t|\trandom(%d)\n", ctx.KeySize, ctx.DataSize) + w.Flush() +} + +func isCluster(ctx context.BenchCtx) (bool, error) { + result := (len(*ctx.Leaders) > 0 && len(*ctx.Replicas) > 0) || (len(*ctx.Leaders) > 1) + if result == false && (len(*ctx.Leaders) > 0 || len(*ctx.Replicas) > 0) { + return result, fmt.Errorf("Cluster: at least one leader and replica, or two leaders must be specified") + } + return result, nil +} + +// verifyOperationsPercentage checks that the amount of operations percentage is 100. +func verifyOperationsPercentage(ctx *context.BenchCtx) error { + entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount + if entire_percentage != 100 { + return fmt.Errorf( + "The number of operations as a percentage should be equal to 100, " + + "note that by default the percentage of inserts is 100") + } + return nil +} + +// incrementRequest increases the counter of successful/failed requests depending on the presence of an error. +func (results *Results) incrementRequestsCounters(err error) { + if err == nil { + results.successResultCount++ + } else { + results.failedResultCount++ + } + results.handledRequestsCount++ +} + +// requestsLoop continuously executes the insert query until the benchmark time runs out. +func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) { + for { + select { + case <-backgroundCtx.Done(): + return + default: + request := requestsSequence.getNext() + request.operation(request) + } + } +} + +// spacePreset prepares space for a benchmark. +func spacePreset(tarantoolConnection *tarantool.Connection) error { + dropBenchmarkSpace(tarantoolConnection) + return createBenchmarkSpace(tarantoolConnection) +} + +// preFillBenchmarkSpaceIfRequired fills benchmark space +// if insert count = 0 or PreFillingCount flag is explicitly specified. +func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx) error { + if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount { + connectionsPool, err := createConnectionsPool(ctx) + if err != nil { + return err + } + defer deleteConnectionsPool(connectionsPool) + + fmt.Println("\nThe pre-filling of the space has started,\n" + + "because the insert operation is not specified\n" + + "or there was an explicit instruction for pre-filling.") + fmt.Println("...") + filledCount, err := fillBenchmarkSpace(ctx, connectionsPool) + if err != nil { + return err + } + fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount) + } + return nil +} + +func getBenchData(ctx context.BenchCtx) BenchmarkData { + backgroundCtx, cancel := bctx.WithCancel(bctx.Background()) + timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second))) + return BenchmarkData{ + backgroundCtx: backgroundCtx, + cancel: cancel, + timer: timer, + } +} + +func waitBenchEnd(benchData *BenchmarkData) { + // Sends "signal" to all "requestsLoop" and waits for them to complete. + <-benchData.timer.C + benchData.cancel() + benchData.waitGroup.Wait() +} diff --git a/cli/commands/bench.go b/cli/commands/bench.go index 93ba4bb40..6c191b692 100644 --- a/cli/commands/bench.go +++ b/cli/commands/bench.go @@ -36,4 +36,6 @@ func init() { benchCmd.Flags().IntVar(&ctx.Bench.UpdateCount, "update", 0, "percentage of updates") benchCmd.Flags().IntVar(&ctx.Bench.PreFillingCount, "fill", bench.PreFillingCount, "number of records to pre-fill the space") + ctx.Bench.Leaders = benchCmd.Flags().StringSlice("leader", []string{}, "") + ctx.Bench.Replicas = benchCmd.Flags().StringSlice("replica", []string{}, "") } diff --git a/cli/context/context.go b/cli/context/context.go index ae32d632f..bc63ef4f6 100644 --- a/cli/context/context.go +++ b/cli/context/context.go @@ -180,16 +180,18 @@ type FailoverCtx struct { } type BenchCtx struct { - URL string // URL - the URL of the tarantool used for testing - User string // User - username to connect to the tarantool. - Password string // Password to connect to the tarantool. - Connections int // Connections describes the number of connection to be used in the test. - SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection. - Duration int // Duration describes test duration in seconds. - KeySize int // DataSize describes the size of key part of benchmark data (bytes). - DataSize int // DataSize describes the size of value part of benchmark data (bytes). - InsertCount int // InsertCount describes the number of insert operations as a percentage. - SelectCount int // SelectCount describes the number of select operations as a percentage. - UpdateCount int // UpdateCount describes the number of update operations as a percentage. - PreFillingCount int // PreFillingCount describes the number of records to pre-fill the space. + URL string // URL - the URL of the tarantool used for testing + User string // User - username to connect to the tarantool. + Password string // Password to connect to the tarantool. + Connections int // Connections describes the number of connection to be used in the test. + SimultaneousRequests int // SimultaneousRequests describes the number of parallel requests from one connection. + Duration int // Duration describes test duration in seconds. + KeySize int // DataSize describes the size of key part of benchmark data (bytes). + DataSize int // DataSize describes the size of value part of benchmark data (bytes). + InsertCount int // InsertCount describes the number of insert operations as a percentage. + SelectCount int // SelectCount describes the number of select operations as a percentage. + UpdateCount int // UpdateCount describes the number of update operations as a percentage. + PreFillingCount int // PreFillingCount describes the number of records to pre-fill the space. + Leaders *[]string // URL's of leaders in cluster + Replicas *[]string // URL's of replicas in cluster }