Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench: cluster patch #752

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, rebase on master and resolve changelog conflict.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit message misses a description and a link to the #645 ticket.

Some things that may be useful to mention there (especially since bench doesn't have any doc):

  • what exactly does bench cluster do, how it treats replicas and leaders;
  • how to run the command (like in the PR description);
  • how to provide an array of leaders/replicas for a command.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit title is also confusing: for me cluster patch sounds like we're already have something for cluster and patch this something. (Patch with what? It's also unclear.) Something like support cluster sounds more relevant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have tests for the feature.

and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Tarantool benchmark tool update (cluster bench):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I don't get a full understanding of what "cluster" is here.

It seems that one of the options for a cluster is a replicaset. No questions here.

But it's more complicated with a vshard cluster. Yeah, each storage in a vshard cluster has the same schema (if we're talking about the same vshard group), so you theoretically can send the same CRUD requests to each storage in a vshard cluster. But you never work with vshard storages directly, you use a router (especially since each record from a sharded space should have a bucket id, and it's computed on a router). So it doesn't make much sense to bench storages only, especially since stored procedures on a router may be complicated and the bottleneck of the vshard clusters is often the network requests between a router and a storage.

So the question is as follows. Is "cluster bench" actually a "replicaset bench"? What did you use it for?

* option --leader has been added - sest array of url's for leaders.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* option --leader has been added - sest array of url's for leaders.
* option --leader has been added - set array of url's for leaders,

* option --replica has been added - sest array of url's for replicas.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* option --replica has been added - sest array of url's for replicas.
* option --replica has been added - set array of url's for replicas.


## [2.12.4] - 2022-12-31

### Changed
Expand Down
201 changes: 37 additions & 164 deletions cli/bench/bench.go
Original file line number Diff line number Diff line change
@@ -1,115 +1,38 @@
package bench

import (
bctx "context"
"fmt"
"math/rand"
"sync"
"time"

"github.com/FZambia/tarantool"
"github.com/tarantool/cartridge-cli/cli/context"
)

// printResults outputs benchmark foramatted results.
func printResults(results Results) {
fmt.Printf("\nResults:\n")
fmt.Printf("\tSuccess operations: %d\n", results.successResultCount)
fmt.Printf("\tFailed operations: %d\n", results.failedResultCount)
fmt.Printf("\tRequest count: %d\n", results.handledRequestsCount)
fmt.Printf("\tTime (seconds): %f\n", results.duration)
fmt.Printf("\tRequests per second: %d\n\n", results.requestsPerSecond)
}

// verifyOperationsPercentage checks that the amount of operations percentage is 100.
func verifyOperationsPercentage(ctx *context.BenchCtx) error {
entire_percentage := ctx.InsertCount + ctx.SelectCount + ctx.UpdateCount
if entire_percentage != 100 {
return fmt.Errorf(
"The number of operations as a percentage should be equal to 100, " +
"note that by default the percentage of inserts is 100")
}
return nil
}

// spacePreset prepares space for a benchmark.
func spacePreset(tarantoolConnection *tarantool.Connection) error {
dropBenchmarkSpace(tarantoolConnection)
return createBenchmarkSpace(tarantoolConnection)
}

// incrementRequest increases the counter of successful/failed requests depending on the presence of an error.
func (results *Results) incrementRequestsCounters(err error) {
if err == nil {
results.successResultCount++
} else {
results.failedResultCount++
}
results.handledRequestsCount++
}
// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

// requestsLoop continuously executes the insert query until the benchmark time runs out.
func requestsLoop(requestsSequence *RequestsSequence, backgroundCtx bctx.Context) {
for {
select {
case <-backgroundCtx.Done():
return
default:
request := requestsSequence.getNext()
request.operation(&request)
}
if err := verifyOperationsPercentage(&ctx); err != nil {
return err
}
}

// connectionLoop runs "ctx.SimultaneousRequests" requests execution goroutines
// through the same connection.
func connectionLoop(
ctx *context.BenchCtx,
requestsSequence *RequestsSequence,
backgroundCtx bctx.Context,
) {
var connectionWait sync.WaitGroup
for i := 0; i < ctx.SimultaneousRequests; i++ {
connectionWait.Add(1)
go func() {
defer connectionWait.Done()
requestsLoop(requestsSequence, backgroundCtx)
}()
// Check cluster topology for further actions.
cluster, err := isCluster(ctx)
if err != nil {
return err
}

connectionWait.Wait()
}

// preFillBenchmarkSpaceIfRequired fills benchmark space
// if insert count = 0 or PreFillingCount flag is explicitly specified.
func preFillBenchmarkSpaceIfRequired(ctx context.BenchCtx, connectionPool []*tarantool.Connection) error {
if ctx.InsertCount == 0 || ctx.PreFillingCount != PreFillingCount {
fmt.Println("\nThe pre-filling of the space has started,\n" +
"because the insert operation is not specified\n" +
"or there was an explicit instruction for pre-filling.")
fmt.Println("...")
filledCount, err := fillBenchmarkSpace(ctx, connectionPool)
if err != nil {
if cluster {
// Check cluster for wrong topology.
if err := verifyClusterTopology(ctx); err != nil {
return err
}
fmt.Printf("Pre-filling is finished. Number of records: %d\n\n", filledCount)
}
return nil
}

// Main benchmark function.
func Run(ctx context.BenchCtx) error {
rand.Seed(time.Now().UnixNano())

if err := verifyOperationsPercentage(&ctx); err != nil {
return err
// Get url of one of instances in cluster for space preset and prefill.
ctx.URL = (*ctx.Leaders)[0]
}

// Connect to tarantool and preset space for benchmark.
tarantoolConnection, err := tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
tarantoolConnection, err := createConnection(ctx)
if err != nil {
return fmt.Errorf(
"Couldn't connect to Tarantool %s.",
Expand All @@ -123,87 +46,37 @@ func Run(ctx context.BenchCtx) error {
return err
}

/// Сreate a "connectionPool" before starting the benchmark to exclude the connection establishment time from measurements.
connectionPool := make([]*tarantool.Connection, ctx.Connections)
for i := 0; i < ctx.Connections; i++ {
connectionPool[i], err = tarantool.Connect(ctx.URL, tarantool.Opts{
User: ctx.User,
Password: ctx.Password,
})
if err != nil {
return err
}
defer connectionPool[i].Close()
}

if err := preFillBenchmarkSpaceIfRequired(ctx, connectionPool); err != nil {
if err := preFillBenchmarkSpaceIfRequired(ctx); err != nil {
return err
}

fmt.Println("Benchmark start")
fmt.Println("...")

// The "context" will be used to stop all "connectionLoop" when the time is out.
backgroundCtx, cancel := bctx.WithCancel(bctx.Background())
var waitGroup sync.WaitGroup
results := Results{}

startTime := time.Now()
timer := time.NewTimer(time.Duration(ctx.Duration * int(time.Second)))

// Start detached connections.
for i := 0; i < ctx.Connections; i++ {
waitGroup.Add(1)
go func(connection *tarantool.Connection) {
defer waitGroup.Done()
requestsSequence := RequestsSequence{
[]RequestsGenerator{
{
Request{
insertOperation,
ctx,
connection,
&results,
},
ctx.InsertCount,
},
{
Request{
selectOperation,
ctx,
connection,
&results,
},
ctx.SelectCount,
},
{
Request{
updateOperation,
ctx,
connection,
&results,
},
ctx.UpdateCount,
},
},
0,
ctx.InsertCount,
sync.Mutex{},
}
connectionLoop(&ctx, &requestsSequence, backgroundCtx)
}(connectionPool[i])
// Bench one instance by default.
benchStart := benchOneInstance
if cluster {
benchStart = benchCluster
}

// Prepare data for bench.
benchData := getBenchData(ctx)

// Start benching.
if err := benchStart(ctx, &benchData); err != nil {
return err
}
// Sends "signal" to all "connectionLoop" and waits for them to complete.
<-timer.C
cancel()
waitGroup.Wait()

results.duration = time.Since(startTime).Seconds()
results.requestsPerSecond = int(float64(results.handledRequestsCount) / results.duration)
// Calculate results.
benchData.results.duration = time.Since(benchData.startTime).Seconds()
benchData.results.requestsPerSecond = int(float64(benchData.results.handledRequestsCount) / benchData.results.duration)

dropBenchmarkSpace(tarantoolConnection)
fmt.Println("Benchmark stop")
// Benchmark space must exist after bench.
if err := dropBenchmarkSpace(tarantoolConnection); err != nil {
return err
}
fmt.Println("Benchmark stop.")

printResults(results)
printResults(benchData.results)
return nil
}
Loading