Skip to content

Commit

Permalink
chore(storage): add GCSFuse client config [benchmarking] (#8225)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Jul 7, 2023
1 parent b63d02b commit e1f52db
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
80 changes: 60 additions & 20 deletions storage/internal/benchmarks/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"crypto/tls"
"log"
"net/http"
"os"
Expand Down Expand Up @@ -87,7 +88,10 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {

nonBenchmarkingClients, closeNonBenchmarking = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, useDefault, useDefault, false)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: useDefault,
readBufferSize: useDefault,
})
},
1,
)
Expand All @@ -96,7 +100,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == xmlAPI {
xmlClients, closeXML = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: false,
setGCSFuseOpts: opts.useGCSFuseConfig,
})
},
opts.numClients,
)
Expand All @@ -108,7 +117,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI {
jsonClients, closeJSON = newClientPool(
func() (*storage.Client, error) {
return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true)
return initializeHTTPClient(ctx, clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
useJSON: true,
setGCSFuseOpts: opts.useGCSFuseConfig,
})
},
opts.numClients,
)
Expand All @@ -118,7 +132,11 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() {
if opts.api == mixedAPIs || opts.api == grpcAPI || opts.api == directPath {
gRPCClients, closeGRPC = newClientPool(
func() (*storage.Client, error) {
return initializeGRPCClient(context.Background(), opts.writeBufferSize, opts.readBufferSize, opts.connPoolSize)
return initializeGRPCClient(context.Background(), clientConfig{
writeBufferSize: opts.writeBufferSize,
readBufferSize: opts.readBufferSize,
connectionPoolSize: opts.connPoolSize,
})
},
opts.numClients,
)
Expand Down Expand Up @@ -156,22 +174,44 @@ func getClient(ctx context.Context, api benchmarkAPI) *storage.Client {
// mutex on starting a client so that we can set an env variable for GRPC clients
var clientMu sync.Mutex

func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool) (*storage.Client, error) {
// Client config
type clientConfig struct {
writeBufferSize, readBufferSize int
useJSON bool // only applicable to HTTP Clients
setGCSFuseOpts bool // only applicable to HTTP Clients
connectionPoolSize int // only applicable to GRPC Clients
}

func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{}

if writeBufferSize != useDefault || readBufferSize != useDefault {
if config.writeBufferSize != useDefault || config.readBufferSize != useDefault || config.setGCSFuseOpts {
// We need to modify the underlying HTTP client

base := http.DefaultTransport.(*http.Transport).Clone()
base.MaxIdleConnsPerHost = 100 // this is set in Storage as well
base.WriteBufferSize = writeBufferSize
base.ReadBufferSize = readBufferSize

http2Trans, err := http2.ConfigureTransports(base)
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
// Set MaxIdleConnsPerHost for parity with the Storage library, as it
// sets this as well
base.MaxIdleConnsPerHost = 100

if config.setGCSFuseOpts {
base = &http.Transport{
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
// This disables HTTP/2 in transport.
TLSNextProto: make(
map[string]func(string, *tls.Conn) http.RoundTripper,
),
}
} else {
http2Trans, err := http2.ConfigureTransports(base)
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
}
}

base.WriteBufferSize = config.writeBufferSize
base.ReadBufferSize = config.readBufferSize

trans, err := htransport.NewTransport(ctx, base,
option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control"))
if err != nil {
Expand All @@ -181,7 +221,7 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i
opts = append(opts, option.WithHTTPClient(&http.Client{Transport: trans}))
}

if json {
if config.useJSON {
opts = append(opts, storage.WithJSONReads())
}

Expand All @@ -193,14 +233,14 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i
return client, err
}

func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connectionPoolSize int) (*storage.Client, error) {
opts := []option.ClientOption{option.WithGRPCConnectionPool(connectionPoolSize)}
func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Client, error) {
opts := []option.ClientOption{option.WithGRPCConnectionPool(config.connectionPoolSize)}

if writeBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize)))
if config.writeBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(config.writeBufferSize)))
}
if readBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize)))
if config.readBufferSize != useDefault {
opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize)))
}

clientMu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions storage/internal/benchmarks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ type benchmarkOptions struct {
numClients int
workload int
numObjectsPerDirectory int

useGCSFuseConfig bool
}

func (b *benchmarkOptions) validate() error {
Expand Down Expand Up @@ -151,6 +153,8 @@ func parseFlags() {
flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes")
flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes")

flag.BoolVar(&opts.useGCSFuseConfig, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation")

flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes")
flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes")

Expand Down

0 comments on commit e1f52db

Please sign in to comment.