From 4cc7f25b4c82a50e39b30417e14724697ce5ca8b Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Wed, 16 Aug 2023 13:51:30 +0800 Subject: [PATCH] This is an automated cherry-pick of #46040 Signed-off-by: ti-chi-bot --- br/pkg/storage/BUILD.bazel | 5 +++ br/pkg/storage/azblob.go | 65 ++++++++++++++++++++++++++++++++++ br/pkg/storage/storage.go | 18 ++++++++++ br/pkg/storage/storage_test.go | 26 ++++++++++++++ br/pkg/task/stream.go | 2 ++ 5 files changed, 116 insertions(+) create mode 100644 br/pkg/storage/storage_test.go diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 810585525db8d..3a4b56dea6f00 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -67,11 +67,16 @@ go_test( "memstore_test.go", "parse_test.go", "s3_test.go", + "storage_test.go", "writer_test.go", ], embed = [":storage"], flaky = True, +<<<<<<< HEAD shard_count = 43, +======= + shard_count = 47, +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) deps = [ "//br/pkg/mock", "@com_github_aws_aws_sdk_go//aws", diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c3d734ebe9a12..dc5b414d3406b 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -107,25 +107,59 @@ type sharedKeyClientBuilder struct { cred *azblob.SharedKeyCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } +<<<<<<< HEAD func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions()) +======= +func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions) +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) } func (b *sharedKeyClientBuilder) GetAccountName() string { return b.accountName } +<<<<<<< HEAD +======= +// use SAS to access azure blob storage +type sasClientBuilder struct { + accountName string + // Example of serviceURL: https://.blob.core.windows.net/? + serviceURL string + + clientOptions *azblob.ClientOptions +} + +func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions) +} + +func (b *sasClientBuilder) GetAccountName() string { + return b.accountName +} + +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) // use token to access azure blob storage type tokenClientBuilder struct { cred *azidentity.ClientSecretCredential accountName string serviceURL string + + clientOptions *azblob.ClientOptions } +<<<<<<< HEAD func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions()) +======= +func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) { + return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions) +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) } func (b *tokenClientBuilder) GetAccountName() string { @@ -144,6 +178,31 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte return nil, errors.New("bucket(container) cannot be empty to access azure blob storage") } +<<<<<<< HEAD +======= + clientOptions := getDefaultClientOptions() + if opts != nil && opts.HTTPClient != nil { + clientOptions.Transport = opts.HTTPClient + } + + if len(options.AccountName) > 0 && len(options.AccessSig) > 0 { + serviceURL := options.Endpoint + if len(serviceURL) == 0 { + if strings.HasPrefix(options.AccessSig, "?") { + serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/%s", options.AccountName, options.AccessSig) + } else { + serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/?%s", options.AccountName, options.AccessSig) + } + } + return &sasClientBuilder{ + options.AccountName, + serviceURL, + + clientOptions, + }, nil + } + +>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040)) if len(options.AccountName) > 0 && len(options.SharedKey) > 0 { serviceURL := options.Endpoint if len(serviceURL) == 0 { @@ -157,6 +216,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, options.AccountName, serviceURL, + + clientOptions, }, nil } @@ -185,6 +246,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?")) @@ -212,6 +275,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte cred, accountName, serviceURL, + + clientOptions, }, nil } diff --git a/br/pkg/storage/storage.go b/br/pkg/storage/storage.go index 2ccc67d4eab20..40664ac11df06 100644 --- a/br/pkg/storage/storage.go +++ b/br/pkg/storage/storage.go @@ -27,6 +27,8 @@ const ( GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" + + DefaultRequestConcurrency uint = 128 ) // WalkOption is the option of storage.WalkDir. @@ -190,3 +192,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend) } } + +// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` +// to the actual request concurrency to reuse tcp connection as much as possible. +func GetDefaultHttpClient(concurrency uint) *http.Client { + transport, _ := CloneDefaultHttpTransport() + transport.MaxIdleConns = int(concurrency) + transport.MaxIdleConnsPerHost = int(concurrency) + return &http.Client{ + Transport: transport, + } +} + +func CloneDefaultHttpTransport() (*http.Transport, bool) { + transport, ok := http.DefaultTransport.(*http.Transport) + return transport.Clone(), ok +} diff --git a/br/pkg/storage/storage_test.go b/br/pkg/storage/storage_test.go new file mode 100644 index 0000000000000..c6ca5c39b6a02 --- /dev/null +++ b/br/pkg/storage/storage_test.go @@ -0,0 +1,26 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package storage_test + +import ( + "net/http" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestDefaultHttpTransport(t *testing.T) { + transport, ok := storage.CloneDefaultHttpTransport() + require.True(t, ok) + require.True(t, transport.MaxConnsPerHost == 0) + require.True(t, transport.MaxIdleConns > 0) +} + +func TestDefaultHttpClient(t *testing.T) { + var concurrency uint = 128 + transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport) + require.True(t, ok) + require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost) + require.Equal(t, int(concurrency), transport.MaxIdleConns) +} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 8c1e3490fc6d7..eec02d1adce14 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } storage, err := storage.New(ctx, u, &opts) if err != nil { @@ -1470,6 +1471,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, SendCredentials: cfg.SendCreds, + HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize), } if err = client.SetStorage(ctx, u, &opts); err != nil { return nil, errors.Trace(err)