Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make object count a multiple of concurrency #323

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cli/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func checkGetSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
5 changes: 4 additions & 1 deletion cli/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var listFlags = []cli.Flag{
cli.IntFlag{
Name: "objects",
Value: 10000,
Usage: "Number of objects to upload. Rounded to have equal concurrent objects.",
Usage: "Number of objects to upload. Rounded up to have equal concurrent objects.",
},
cli.IntFlag{
Name: "versions",
Expand Down Expand Up @@ -84,6 +84,9 @@ func checkListSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}

checkAnalyze(ctx)
checkBenchmark(ctx)
Expand Down
4 changes: 3 additions & 1 deletion cli/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func checkMixedSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}

if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
1 change: 1 addition & 0 deletions cli/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func checkMultipartSyntax(ctx *cli.Context) {
console.Fatal("part.size must be >= 5MiB")
}
}

checkAnalyze(ctx)
checkBenchmark(ctx)
}
4 changes: 4 additions & 0 deletions cli/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cli
import (
"github.com/minio/cli"
"github.com/minio/minio-go/v7"
"github.com/minio/pkg/v2/console"
"github.com/minio/warp/pkg/bench"
)

Expand Down Expand Up @@ -91,6 +92,9 @@ func mainSelect(ctx *cli.Context) error {
}

func checkSelectSyntax(ctx *cli.Context) {
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
3 changes: 3 additions & 0 deletions cli/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func checkStatSyntax(ctx *cli.Context) {
if ctx.Int("versions") < 1 {
console.Fatal("At least one version must be tested")
}
if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
4 changes: 3 additions & 1 deletion cli/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ func checkVersionedSyntax(ctx *cli.Context) {
if ctx.NArg() > 0 {
console.Fatal("Command takes no arguments")
}

if ctx.Int("objects") < 1 {
console.Fatal("At least one object must be tested")
}
checkAnalyze(ctx)
checkBenchmark(ctx)
}
10 changes: 10 additions & 0 deletions pkg/bench/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,13 @@ func (c *Common) rpsLimit(ctx context.Context) error {

return c.RpsLimiter.Wait(ctx)
}

func splitObjs(objects, concurrency int) [][]struct{} {
res := make([][]struct{}, concurrency)
// Round up if not cleanly divisible
inEach := (objects + concurrency - 1) / concurrency
for i := range res {
res[i] = make([]struct{}, inEach)
}
return res
}
13 changes: 5 additions & 8 deletions pkg/bench/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,11 @@ func (d *Delete) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(d.Concurrency)
d.addCollector()
obj := make(chan struct{}, d.CreateObjects)
for i := 0; i < d.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(d.CreateObjects, d.Concurrency)

var mu sync.Mutex
for i := 0; i < d.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := d.Source()

Expand Down Expand Up @@ -179,7 +176,7 @@ func (d *Delete) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()

Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,13 @@ func (g *Get) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)

obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
objs := splitObjs(g.CreateObjects, g.Concurrency)
rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -219,7 +215,7 @@ func (g *Get) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
3 changes: 2 additions & 1 deletion pkg/bench/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *List) Prepare(ctx context.Context) error {
done()
}

objPerPrefix := d.CreateObjects / d.Concurrency
objPerPrefix := (d.CreateObjects + d.Concurrency - 1) / d.Concurrency
console.Eraseline()
x := ""
if d.Versions > 1 {
Expand Down Expand Up @@ -224,6 +224,7 @@ func (d *List) Start(ctx context.Context, wait chan struct{}) (Operations, error
Prefix: objs[0].Prefix,
Recursive: true,
WithVersions: d.Versions > 1,
MaxKeys: 100,
})

// Wait for errCh to close.
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/mixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,12 @@ func (g *Mixed) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
var groupErr error

objs := splitObjs(g.CreateObjects, g.Concurrency)
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func() {
for _, obj := range objs {
go func(obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -218,7 +215,7 @@ func (g *Mixed) Prepare(ctx context.Context) error {
g.Dist.addObj(*obj)
g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects))
}
}()
}(obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,17 @@ func (g *Multipart) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan int, g.CreateParts)
for i := 0; i < g.CreateParts; i++ {
obj <- i + g.PartStart
}
objs := splitObjs(g.CreateParts, g.Concurrency)

rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

if g.Custom == nil {
g.Custom = make(map[string]string, g.CreateParts)
}
for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -164,7 +161,7 @@ func (g *Multipart) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ func (g *Retention) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -139,7 +135,7 @@ func (g *Retention) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,12 @@ func (g *Select) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)

var groupErr error
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
for range obj {
Expand Down Expand Up @@ -124,7 +121,7 @@ func (g *Select) Prepare(ctx context.Context) error {
mu.Unlock()
rcv <- op
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
12 changes: 4 additions & 8 deletions pkg/bench/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,13 @@ func (g *Stat) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
objs := splitObjs(g.CreateObjects, g.Concurrency)
rcv := g.Collector.rcv
close(obj)
var groupErr error
var mu sync.Mutex

for i := 0; i < g.Concurrency; i++ {
go func(i int) {
for i, obj := range objs {
go func(i int, obj []struct{}) {
defer wg.Done()
src := g.Source()
opts := g.PutOpts
Expand Down Expand Up @@ -145,7 +141,7 @@ func (g *Stat) Prepare(ctx context.Context) error {
rcv <- op
}
}
}(i)
}(i, obj)
}
wg.Wait()
return groupErr
Expand Down
13 changes: 5 additions & 8 deletions pkg/bench/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ func (g *Versioned) Prepare(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(g.Concurrency)
g.addCollector()
obj := make(chan struct{}, g.CreateObjects)
for i := 0; i < g.CreateObjects; i++ {
obj <- struct{}{}
}
close(obj)
objs := splitObjs(g.CreateObjects, g.Concurrency)

var groupErr error
var mu sync.Mutex
for i := 0; i < g.Concurrency; i++ {
go func() {
for _, obj := range objs {
go func(obj []struct{}) {
defer wg.Done()
src := g.Source()

Expand Down Expand Up @@ -122,7 +119,7 @@ func (g *Versioned) Prepare(ctx context.Context) error {
g.Dist.addObj(*obj)
g.prepareProgress(float64(len(g.Dist.objects)) / float64(g.CreateObjects))
}
}()
}(obj)
}
wg.Wait()
return groupErr
Expand Down
Loading