Skip to content

Commit

Permalink
update code due to br cli options change
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielZhangQD committed Dec 19, 2019
1 parent 414ad7c commit e8e1525
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 64 deletions.
101 changes: 50 additions & 51 deletions cmd/backup-manager/app/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package util
import (
"context"
"fmt"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -25,6 +24,7 @@ import (
"gocloud.dev/blob/s3blob"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/util"
)

const (
Expand All @@ -44,19 +44,23 @@ const (
)

type s3Query struct {
region string
endpoint string
oriEndpoint string
bucket string
prefix string
provider string
insecure string
region string
endpoint string
oriEndpoint string
bucket string
prefix string
provider string
sse string
acl string
storageClass string
forcePathStyle bool
}

// NewRemoteStorage creates new remote storage
func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
switch backup.Spec.StorageType {
case "s3":
st := util.GetStorageType(backup.Spec.StorageProvider)
switch st {
case v1alpha1.BackupStorageTypeS3:
qs, err := checkS3Config(backup, true)
if err != nil {
return nil, err
Expand All @@ -67,67 +71,66 @@ func NewRemoteStorage(backup *v1alpha1.Backup) (*blob.Bucket, error) {
}
return bucket, nil
default:
return nil, fmt.Errorf("storage %s not support yet", backup.Spec.StorageType)
return nil, fmt.Errorf("storage %s not support yet", st)
}
}

// getRemoteStorage returns the arg for --storage option and the remote path for br
func getRemoteStorage(backup *v1alpha1.Backup) (string, string, error) {
switch backup.Spec.StorageType {
func getRemoteStorage(backup *v1alpha1.Backup) ([]string, string, error) {
st := util.GetStorageType(backup.Spec.StorageProvider)
switch st {
case "s3":
qs, err := checkS3Config(backup, false)
if err != nil {
return "", "", err
return nil, "", err
}
s, path := newS3StorageOption(qs)
return s, path, nil
default:
return "", "", fmt.Errorf("storage %s not support yet", backup.Spec.StorageType)
return nil, "", fmt.Errorf("storage %s not support yet", st)
}
}

// newS3StorageOption constructs the arg for --storage option and the remote path for br
func newS3StorageOption(qs *s3Query) (string, string) {
path := fmt.Sprintf("s3://%s/%s", qs.bucket, qs.prefix)
s := fmt.Sprintf("s3://%s?%s=%s", qs.bucket, prefixKey, qs.prefix)
func newS3StorageOption(qs *s3Query) ([]string, string) {
var s3options []string
var path string
if qs.prefix == "/" {
path = fmt.Sprintf("s3://%s%s", qs.bucket, qs.prefix)
} else {
path = fmt.Sprintf("s3://%s/%s", qs.bucket, qs.prefix)
}
s3options = append(s3options, fmt.Sprintf("--storage=%s", path))
if qs.region != "" {
s += fmt.Sprintf("&%s=%s", regionKey, qs.region)
s3options = append(s3options, fmt.Sprintf("--s3.region=%s", qs.region))
}
if qs.provider != "" {
s += fmt.Sprintf("&%s=%s", providerKey, qs.provider)
s3options = append(s3options, fmt.Sprintf("--s3.provider=%s", qs.provider))
}
if qs.endpoint != "" {
s += fmt.Sprintf("&%s=%s", endpointKey, qs.endpoint)
s3options = append(s3options, fmt.Sprintf("--s3.endpoint=%s", qs.endpoint))
}
if qs.insecure != "" {
s += fmt.Sprintf("&%s=%s", insecureKey, qs.insecure)
if qs.sse != "" {
s3options = append(s3options, fmt.Sprintf("--s3.sse=%s", qs.sse))
}
ak := os.Getenv(accessKeyEnv)
if ak != "" {
s += fmt.Sprintf("&%s=%s", accessKey, ak)
if qs.acl != "" {
s3options = append(s3options, fmt.Sprintf("--s3.acl=%s", qs.acl))
}
sak := os.Getenv(secretAccessKeyEnv)
if sak != "" {
s += fmt.Sprintf("&%s=%s", secretAccessKey, sak)
if qs.storageClass != "" {
s3options = append(s3options, fmt.Sprintf("--s3.storage-class=%s", qs.storageClass))
}

return s, path
return s3options, path
}

// newS3Storage initialize a new s3 storage
func newS3Storage(qs *s3Query) (*blob.Bucket, error) {
awsConfig := aws.NewConfig().WithMaxRetries(maxRetries)
awsConfig := aws.NewConfig().WithMaxRetries(maxRetries).WithS3ForcePathStyle(qs.forcePathStyle)
if qs.region != "" {
awsConfig.WithRegion(qs.region)
}
if qs.oriEndpoint != "" {
awsConfig.WithEndpoint(qs.oriEndpoint)
}
forcePathStyle := true
if qs.provider == aliKey {
forcePathStyle = false
}
awsConfig.WithS3ForcePathStyle(forcePathStyle)
// awsConfig.WithLogLevel(aws.LogDebugWithSigning)
awsSessionOpts := session.Options{
Config: *awsConfig,
Expand Down Expand Up @@ -159,24 +162,20 @@ func checkS3Config(backup *v1alpha1.Backup, fakeRegion bool) (*s3Query, error) {
sqs.prefix = backup.Spec.S3.Prefix
sqs.endpoint = backup.Spec.S3.Endpoint
sqs.oriEndpoint = backup.Spec.S3.Endpoint

sqs.sse = backup.Spec.S3.SSE
sqs.acl = backup.Spec.S3.Acl
sqs.storageClass = backup.Spec.S3.StorageClass
sqs.forcePathStyle = true
// In some cases, we need to set ForcePathStyle to false.
// Refer to: https://rclone.org/s3/#s3-force-path-style
if sqs.provider == "alibaba" || sqs.provider == "netease" {
sqs.forcePathStyle = false
}
if fakeRegion && sqs.region == "" {
sqs.region = "us-east-1"
}
sqs.prefix = strings.Trim(sqs.prefix, "/")
sqs.prefix += "/"
if sqs.endpoint != "" {
if strings.HasPrefix(sqs.endpoint, "https://") {
sqs.insecure = "false"
sqs.endpoint = strings.TrimPrefix(sqs.endpoint, "https://")
} else if strings.HasPrefix(sqs.endpoint, "http://") {
sqs.insecure = "true"
sqs.endpoint = strings.TrimPrefix(sqs.endpoint, "http://")
} else {
sqs.insecure = "true"
sqs.oriEndpoint += "http://"
}
}

return &sqs, nil
}
Expand Down Expand Up @@ -211,6 +210,6 @@ func ConstructBRGlobalOptions(backup *v1alpha1.Backup) ([]string, string, error)
if err != nil {
return nil, "", err
}
args = append(args, fmt.Sprintf("--storage=%s", s))
args = append(args, s...)
return args, path, nil
}
Loading

0 comments on commit e8e1525

Please sign in to comment.