diff --git a/storage/internal/benchmarks/client_pool.go b/storage/internal/benchmarks/client_pool.go index dc504907f1d2..2e0d180adc86 100644 --- a/storage/internal/benchmarks/client_pool.go +++ b/storage/internal/benchmarks/client_pool.go @@ -16,6 +16,7 @@ package main import ( "context" + "crypto/tls" "log" "net/http" "os" @@ -87,7 +88,10 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { nonBenchmarkingClients, closeNonBenchmarking = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, useDefault, useDefault, false) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: useDefault, + readBufferSize: useDefault, + }) }, 1, ) @@ -96,7 +100,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == xmlAPI { xmlClients, closeXML = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, false) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + useJSON: false, + setGCSFuseOpts: opts.useGCSFuseConfig, + }) }, opts.numClients, ) @@ -108,7 +117,12 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == jsonAPI || opts.api == xmlAPI { jsonClients, closeJSON = newClientPool( func() (*storage.Client, error) { - return initializeHTTPClient(ctx, opts.writeBufferSize, opts.readBufferSize, true) + return initializeHTTPClient(ctx, clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + useJSON: true, + setGCSFuseOpts: opts.useGCSFuseConfig, + }) }, opts.numClients, ) @@ -118,7 +132,11 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { if opts.api == mixedAPIs || opts.api == grpcAPI || opts.api == directPath { gRPCClients, closeGRPC = newClientPool( func() (*storage.Client, error) { - return initializeGRPCClient(context.Background(), opts.writeBufferSize, opts.readBufferSize, opts.connPoolSize) + return initializeGRPCClient(context.Background(), clientConfig{ + writeBufferSize: opts.writeBufferSize, + readBufferSize: opts.readBufferSize, + connectionPoolSize: opts.connPoolSize, + }) }, opts.numClients, ) @@ -156,22 +174,44 @@ func getClient(ctx context.Context, api benchmarkAPI) *storage.Client { // mutex on starting a client so that we can set an env variable for GRPC clients var clientMu sync.Mutex -func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, json bool) (*storage.Client, error) { +// Client config +type clientConfig struct { + writeBufferSize, readBufferSize int + useJSON bool // only applicable to HTTP Clients + setGCSFuseOpts bool // only applicable to HTTP Clients + connectionPoolSize int // only applicable to GRPC Clients +} + +func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) { opts := []option.ClientOption{} - if writeBufferSize != useDefault || readBufferSize != useDefault { + if config.writeBufferSize != useDefault || config.readBufferSize != useDefault || config.setGCSFuseOpts { // We need to modify the underlying HTTP client - base := http.DefaultTransport.(*http.Transport).Clone() - base.MaxIdleConnsPerHost = 100 // this is set in Storage as well - base.WriteBufferSize = writeBufferSize - base.ReadBufferSize = readBufferSize - http2Trans, err := http2.ConfigureTransports(base) - if err == nil { - http2Trans.ReadIdleTimeout = time.Second * 31 + // Set MaxIdleConnsPerHost for parity with the Storage library, as it + // sets this as well + base.MaxIdleConnsPerHost = 100 + + if config.setGCSFuseOpts { + base = &http.Transport{ + MaxConnsPerHost: 100, + MaxIdleConnsPerHost: 100, + // This disables HTTP/2 in transport. + TLSNextProto: make( + map[string]func(string, *tls.Conn) http.RoundTripper, + ), + } + } else { + http2Trans, err := http2.ConfigureTransports(base) + if err == nil { + http2Trans.ReadIdleTimeout = time.Second * 31 + } } + base.WriteBufferSize = config.writeBufferSize + base.ReadBufferSize = config.readBufferSize + trans, err := htransport.NewTransport(ctx, base, option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control")) if err != nil { @@ -181,7 +221,7 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i opts = append(opts, option.WithHTTPClient(&http.Client{Transport: trans})) } - if json { + if config.useJSON { opts = append(opts, storage.WithJSONReads()) } @@ -193,14 +233,14 @@ func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize i return client, err } -func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connectionPoolSize int) (*storage.Client, error) { - opts := []option.ClientOption{option.WithGRPCConnectionPool(connectionPoolSize)} +func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Client, error) { + opts := []option.ClientOption{option.WithGRPCConnectionPool(config.connectionPoolSize)} - if writeBufferSize != useDefault { - opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize))) + if config.writeBufferSize != useDefault { + opts = append(opts, option.WithGRPCDialOption(grpc.WithWriteBufferSize(config.writeBufferSize))) } - if readBufferSize != useDefault { - opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize))) + if config.readBufferSize != useDefault { + opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize))) } clientMu.Lock() diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index 649853ac9d8b..3ec08b57d838 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -80,6 +80,8 @@ type benchmarkOptions struct { numClients int workload int numObjectsPerDirectory int + + useGCSFuseConfig bool } func (b *benchmarkOptions) validate() error { @@ -151,6 +153,8 @@ func parseFlags() { flag.Int64Var(&opts.minReadOffset, "minimum_read_offset", 0, "minimum read offset in bytes") flag.Int64Var(&opts.maxReadOffset, "maximum_read_offset", 0, "maximum read offset in bytes") + flag.BoolVar(&opts.useGCSFuseConfig, "gcs_fuse", false, "use GCSFuse configs on HTTP client creation") + flag.IntVar(&opts.readBufferSize, "read_buffer_size", useDefault, "read buffer size in bytes") flag.IntVar(&opts.writeBufferSize, "write_buffer_size", useDefault, "write buffer size in bytes")