Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark: Add sleepBetweenRPCs and connections parameters #6299

Merged
merged 20 commits into from
Jun 6, 2023
Merged
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5c9f7a4
Add sleepBetweenRPCs parameter to benchmarks
s-matyukevich May 18, 2023
e4c5b51
Use random distribution
s-matyukevich May 18, 2023
65ccca3
Remove delay from the client receive function and add it to the serve…
s-matyukevich May 19, 2023
b3d96ed
Add parameter to control the number of connections
s-matyukevich May 19, 2023
0014165
fix linter error
s-matyukevich May 19, 2023
88afecc
rename parameter
s-matyukevich May 19, 2023
57a049a
fix linter error
s-matyukevich May 19, 2023
2bf89d6
Sleep before sending the first request
s-matyukevich May 23, 2023
ff36d9e
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 23, 2023
2aee78f
configure the number of connections
s-matyukevich May 23, 2023
659b192
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 24, 2023
23c1278
Skip warmup if SleepBetweenRPCs > 0, use graceful stop on the server.
s-matyukevich May 24, 2023
3615d73
fix warmup condition
s-matyukevich May 24, 2023
5a8d7a7
Revert s.GracefulStop() change
s-matyukevich May 24, 2023
60ac14b
Allow codes.Canceled in unconstrained run
s-matyukevich May 24, 2023
907b01d
Fix unconstrained mode with preloader=on
s-matyukevich May 25, 2023
1905a66
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich May 25, 2023
accfb30
Merge branch 'master' of github.com:grpc/grpc-go into add-sleep-to-be…
s-matyukevich Jun 6, 2023
fa1f079
rename index parameter and import name
s-matyukevich Jun 6, 2023
7d0cd0d
add messing period
s-matyukevich Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
@@ -111,6 +111,7 @@ var (
serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
shareConnection = flag.Bool("shareConnection", true, "Controls whether a single connection or connection per `maxConcurrentCalls` should be used.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, how about a flag for the number of connections, where each connection does maxConcurrentCalls at a time? Hopefully that wouldn't be too different implementation-wise, and would allow for some more interesting testing options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done.


logger = grpclog.Component("benchmark")
)
@@ -241,15 +242,15 @@ func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Fea
go func(pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(math_rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
}
sender(pos)
atomic.AddUint64(&req, 1)
if maxSleep > 0 {
time.Sleep(time.Duration(math_rand.Intn(maxSleep)))
}
}
}(i)
go func(pos int) {
@@ -261,21 +262,18 @@ func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Fea
}
recver(pos)
atomic.AddUint64(&resp, 1)
if maxSleep > 0 {
time.Sleep(time.Duration(math_rand.Intn(maxSleep)))
}
}
}(i)
}
wg.Wait()
stop(req, resp)
}

// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
// makeClients returns a gRPC client (or multiple clients) for the grpc.testing.BenchmarkService
// service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
func makeClients(bf stats.Features) ([]testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
@@ -355,16 +353,41 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
}
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
return testgrpc.NewBenchmarkServiceClient(conn), func() {
conn.Close()
var conns []*grpc.ClientConn
var clients []testpb.BenchmarkServiceClient
if bf.ShareConnection {
conns = []*grpc.ClientConn{bm.NewClientConn("" /* target not used */, opts...)}
clients = []testpb.BenchmarkServiceClient{testgrpc.NewBenchmarkServiceClient(conns[0])}
} else {
conns = make([]*grpc.ClientConn, bf.MaxConcurrentCalls)
clients = make([]testpb.BenchmarkServiceClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
conns[i] = bm.NewClientConn("" /* target not used */, opts...)
clients[i] = testgrpc.NewBenchmarkServiceClient(conns[i])
}
}

return clients, func() {
for _, conn := range conns {
conn.Close()
}
stopper()
}
}

func selectClient(clients []testpb.BenchmarkServiceClient, index int) testpb.BenchmarkServiceClient {
var tc testpb.BenchmarkServiceClient
if len(clients) > 1 {
tc = clients[index]
} else {
tc = clients[0]
}
return tc
}
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved

func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
return func(int) {
clients, cleanup := makeClients(bf)
return func(index int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
@@ -373,15 +396,16 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
unaryCaller(tc, reqSizeBytes, respSizeBytes)
unaryCaller(selectClient(clients, index), reqSizeBytes, respSizeBytes)
}, cleanup
}

func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
tc := selectClient(clients, i)
stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
@@ -432,12 +456,14 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r
}

func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1",
benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
ctx := metadata.NewOutgoingContext(context.Background(), md)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
tc := selectClient(clients, i)
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
@@ -486,6 +512,9 @@ func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.F
go func(pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(math_rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
@@ -495,9 +524,6 @@ func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.F
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
if maxSleep > 0 {
time.Sleep(time.Duration(math_rand.Intn(maxSleep)))
}
}
}(i)
}
@@ -517,6 +543,7 @@ type benchOpts struct {
benchmarkResultFile string
useBufconn bool
enableKeepalive bool
shareConnection bool
features *featureOpts
}

@@ -641,6 +668,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
UseBufConn: b.useBufconn,
EnableKeepalive: b.enableKeepalive,
BenchTime: b.benchTime,
ShareConnection: b.shareConnection,
// These features can potentially change for each iteration.
EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
@@ -710,6 +738,7 @@ func processFlags() *benchOpts {
benchmarkResultFile: *benchmarkResultFile,
useBufconn: *useBufconn,
enableKeepalive: *enableKeepalive,
shareConnection: *shareConnection,
features: &featureOpts{
enableTrace: setToggleMode(*traceMode),
readLatencies: append([]time.Duration(nil), *readLatency...),
20 changes: 20 additions & 0 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,9 @@ import (
"io"
"log"
"net"
"time"

"math/rand"
s-matyukevich marked this conversation as resolved.
Show resolved Hide resolved

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -77,6 +80,10 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
// of ping-pong.
const UnconstrainedStreamingHeader = "unconstrained-streaming"

// UnconstrainedStreamingDelayHeader is used to pass the maximum amount of time
// the server should sleep between consecutive RPC responses.
const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay"

func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
return s.UnconstrainedStreamingCall(stream)
@@ -103,6 +110,16 @@ func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCal
}

func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
maxSleep := 0
if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 {
val := md[UnconstrainedStreamingDelayHeader][0]
d, err := time.ParseDuration(val)
if err != nil {
return fmt.Errorf("can't parse %q header: %s", UnconstrainedStreamingDelayHeader, err)
}
maxSleep = int(d)
}

in := new(testpb.SimpleRequest)
// Receive a message to learn response type and size.
err := stream.RecvMsg(in)
@@ -135,6 +152,9 @@ func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService

go func() {
for {
if maxSleep > 0 {
time.Sleep(time.Duration(rand.Intn(maxSleep)))
}
err := stream.Send(response)
switch status.Code(err) {
case codes.Unavailable:
6 changes: 4 additions & 2 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,8 @@ type Features struct {
EnableKeepalive bool
// BenchTime indicates the duration of the benchmark run.
BenchTime time.Duration
// ShareConnection indicates whether a single connection should be used.
ShareConnection bool

// Features defined above are usually the same for all benchmark runs in a
// particular invocation, while the features defined below could vary from
@@ -143,12 +145,12 @@ func (f Features) String() string {
"trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
"compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+
"clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+
"sleepBetweenRPCs %v-",
"sleepBetweenRPCs_%v-shareConnection_%v-",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader,
f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize,
f.ServerWriteBufferSize, f.SleepBetweenRPCs)
f.ServerWriteBufferSize, f.SleepBetweenRPCs, f.ShareConnection)
}

// SharedFeatures returns the shared features as a pretty printable string.