Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46040
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Aug 16, 2023
1 parent 75a26f7 commit 4cc7f25
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 0 deletions.
5 changes: 5 additions & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ go_test(
"memstore_test.go",
"parse_test.go",
"s3_test.go",
"storage_test.go",
"writer_test.go",
],
embed = [":storage"],
flaky = True,
<<<<<<< HEAD
shard_count = 43,
=======
shard_count = 47,
>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040))
deps = [
"//br/pkg/mock",
"@com_github_aws_aws_sdk_go//aws",
Expand Down
65 changes: 65 additions & 0 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,25 +107,59 @@ type sharedKeyClientBuilder struct {
cred *azblob.SharedKeyCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

<<<<<<< HEAD
func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) {
return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions())
=======
func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions)
>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040))
}

func (b *sharedKeyClientBuilder) GetAccountName() string {
return b.accountName
}

<<<<<<< HEAD
=======
// use SAS to access azure blob storage
type sasClientBuilder struct {
accountName string
// Example of serviceURL: https://<account>.blob.core.windows.net/?<sas token>
serviceURL string

clientOptions *azblob.ClientOptions
}

func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions)
}

func (b *sasClientBuilder) GetAccountName() string {
return b.accountName
}

>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040))
// use token to access azure blob storage
type tokenClientBuilder struct {
cred *azidentity.ClientSecretCredential
accountName string
serviceURL string

clientOptions *azblob.ClientOptions
}

<<<<<<< HEAD
func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) {
return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions())
=======
func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) {
return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions)
>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040))
}

func (b *tokenClientBuilder) GetAccountName() string {
Expand All @@ -144,6 +178,31 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
return nil, errors.New("bucket(container) cannot be empty to access azure blob storage")
}

<<<<<<< HEAD
=======
clientOptions := getDefaultClientOptions()
if opts != nil && opts.HTTPClient != nil {
clientOptions.Transport = opts.HTTPClient
}

if len(options.AccountName) > 0 && len(options.AccessSig) > 0 {
serviceURL := options.Endpoint
if len(serviceURL) == 0 {
if strings.HasPrefix(options.AccessSig, "?") {
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/%s", options.AccountName, options.AccessSig)
} else {
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/?%s", options.AccountName, options.AccessSig)
}
}
return &sasClientBuilder{
options.AccountName,
serviceURL,

clientOptions,
}, nil
}

>>>>>>> 88225787f3c (br: configure the httpclient for external storage (#46040))
if len(options.AccountName) > 0 && len(options.SharedKey) > 0 {
serviceURL := options.Endpoint
if len(serviceURL) == 0 {
Expand All @@ -157,6 +216,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
options.AccountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down Expand Up @@ -185,6 +246,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}
log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?"))
Expand Down Expand Up @@ -212,6 +275,8 @@ func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *Exte
cred,
accountName,
serviceURL,

clientOptions,
}, nil
}

Expand Down
18 changes: 18 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
GetObject Permission = "GetObject"
// PutObject represents PutObject permission
PutObject Permission = "PutObject"

DefaultRequestConcurrency uint = 128
)

// WalkOption is the option of storage.WalkDir.
Expand Down Expand Up @@ -190,3 +192,19 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "storage %T is not supported yet", backend)
}
}

// Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost`
// to the actual request concurrency to reuse tcp connection as much as possible.
func GetDefaultHttpClient(concurrency uint) *http.Client {
transport, _ := CloneDefaultHttpTransport()
transport.MaxIdleConns = int(concurrency)
transport.MaxIdleConnsPerHost = int(concurrency)
return &http.Client{
Transport: transport,
}
}

func CloneDefaultHttpTransport() (*http.Transport, bool) {
transport, ok := http.DefaultTransport.(*http.Transport)
return transport.Clone(), ok
}
26 changes: 26 additions & 0 deletions br/pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package storage_test

import (
"net/http"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestDefaultHttpTransport(t *testing.T) {
transport, ok := storage.CloneDefaultHttpTransport()
require.True(t, ok)
require.True(t, transport.MaxConnsPerHost == 0)
require.True(t, transport.MaxIdleConns > 0)
}

func TestDefaultHttpClient(t *testing.T) {
var concurrency uint = 128
transport, ok := storage.GetDefaultHttpClient(concurrency).Transport.(*http.Transport)
require.True(t, ok)
require.Equal(t, int(concurrency), transport.MaxIdleConnsPerHost)
require.Equal(t, int(concurrency), transport.MaxIdleConns)
}
2 changes: 2 additions & 0 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStora
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
storage, err := storage.New(ctx, u, &opts)
if err != nil {
Expand Down Expand Up @@ -1470,6 +1471,7 @@ func createRestoreClient(ctx context.Context, g glue.Glue, cfg *RestoreConfig, m
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
HTTPClient: storage.GetDefaultHttpClient(cfg.MetadataDownloadBatchSize),
}
if err = client.SetStorage(ctx, u, &opts); err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit 4cc7f25

Please sign in to comment.