diff --git a/go.mod b/go.mod index 8e3a8bc0b..72e95007a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( cloud.google.com/go/storage v1.4.0 + github.com/aws/aws-sdk-go v1.26.1 github.com/cheggaaa/pb/v3 v3.0.1 github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect diff --git a/go.sum b/go.sum index 4e32aacf4..e143fb43e 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI= +github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -171,6 +173,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/pkg/storage/parse_test.go b/pkg/storage/parse_test.go index 04e925963..c13216f46 100644 --- a/pkg/storage/parse_test.go +++ b/pkg/storage/parse_test.go @@ -58,20 +58,20 @@ func (r *testStorageSuite) TestCreateStorage(c *C) { fakeCredentialsFile.Close() os.Remove(fakeCredentialsFile.Name()) }() - gcsOpt := &storage.BackendOptions{ - GCS: storage.GCSBackendOptions{ + gcsOpt := &BackendOptions{ + GCS: GCSBackendOptions{ Endpoint: "https://gcs.example.com/", CredentialsFile: fakeCredentialsFile.Name(), }, } - s, err = storage.ParseBackend("gcs://bucket2/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket2/prefix/", gcsOpt) c.Assert(err, IsNil) gcs := s.GetGcs() c.Assert(gcs, NotNil) c.Assert(gcs.Bucket, Equals, "bucket2") c.Assert(gcs.Prefix, Equals, "prefix/") c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/") - s, err = storage.ParseBackend("gcs://bucket/more/prefix/", gcsOpt) + s, err = ParseBackend("gcs://bucket/more/prefix/", gcsOpt) c.Assert(err, IsNil) gcs = s.GetGcs() c.Assert(gcs, NotNil) @@ -106,7 +106,7 @@ func (r *testStorageSuite) TestFormatBackendURL(c *C) { }) c.Assert(url.String(), Equals, "s3://bucket/some%20prefix/") - url = storage.FormatBackendURL(&backup.StorageBackend{ + url = FormatBackendURL(&backup.StorageBackend{ Backend: &backup.StorageBackend_Gcs{ Gcs: &backup.GCS{ Bucket: "bucket", diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index 85a7ed5a7..f1a5421ae 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -2,12 +2,14 @@ package storage import ( "bytes" + "context" "io/ioutil" "net/url" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/spf13/pflag" @@ -34,11 +36,11 @@ const ( // s3Handlers make it easy to inject test functions type s3Handlers interface { - HeadObject(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) - GetObject(*s3.GetObjectInput) (*s3.GetObjectOutput, error) - PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) - HeadBucket(*s3.HeadBucketInput) (*s3.HeadBucketOutput, error) - WaitUntilObjectExists(*s3.HeadObjectInput) error + HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error) + GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error) + PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error) + HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error) + WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error } // S3Storage info for s3 storage @@ -214,7 +216,7 @@ var checkS3Bucket = func(svc *s3.S3, bucket string) error { } // Write write to s3 storage -func (rs *S3Storage) Write(file string, data []byte) error { +func (rs *S3Storage) Write(ctx context.Context, file string, data []byte) error { input := &s3.PutObjectInput{ Body: aws.ReadSeekCloser(bytes.NewReader(data)), Bucket: aws.String(rs.options.Bucket), @@ -230,8 +232,7 @@ func (rs *S3Storage) Write(file string, data []byte) error { input = input.SetStorageClass(rs.options.StorageClass) } - // TODO: PutObjectWithContext - _, err := rs.svc.PutObject(input) + _, err := rs.svc.PutObjectWithContext(ctx, input) if err != nil { return err } @@ -239,20 +240,18 @@ func (rs *S3Storage) Write(file string, data []byte) error { Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: WaitUntilObjectExistsWithContext - err = rs.svc.WaitUntilObjectExists(hinput) + err = rs.svc.WaitUntilObjectExistsWithContext(ctx, hinput) return err } // Read read file from s3 -func (rs *S3Storage) Read(file string) ([]byte, error) { +func (rs *S3Storage) Read(ctx context.Context, file string) ([]byte, error) { input := &s3.GetObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: GetObjectWithContext - result, err := rs.svc.GetObject(input) + result, err := rs.svc.GetObjectWithContext(ctx, input) if err != nil { return nil, err } @@ -265,14 +264,13 @@ func (rs *S3Storage) Read(file string) ([]byte, error) { } // FileExists check if file exists on s3 storage -func (rs *S3Storage) FileExists(file string) (bool, error) { +func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) { input := &s3.HeadObjectInput{ Bucket: aws.String(rs.options.Bucket), Key: aws.String(rs.options.Prefix + file), } - // TODO: HeadObjectWithContext - _, err := rs.svc.HeadObject(input) + _, err := rs.svc.HeadObjectWithContext(ctx, input) if err != nil { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { diff --git a/pkg/storage/s3_test.go b/pkg/storage/s3_test.go index c84c62031..a5c5af61e 100644 --- a/pkg/storage/s3_test.go +++ b/pkg/storage/s3_test.go @@ -1,11 +1,14 @@ package storage import ( + "context" "io/ioutil" "os" "strings" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -231,6 +234,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { } testFn := func(test *testcase, c *C) { c.Log(test.name) + ctx := aws.BackgroundContext() sendCredential = true if test.hackCheck { checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil } @@ -240,7 +244,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) { S3: test.s3, }, } - _, err := Create(s3) + _, err := Create(ctx, s3) if test.errReturn { c.Assert(err, NotNil) return @@ -348,15 +352,16 @@ func (r *testStorageSuite) TestS3Handlers(c *C) { testFn := func(test *testcase, c *C) { c.Log(test.name) + ctx := aws.BackgroundContext() ms3 := S3Storage{ svc: test.mh, options: test.options, } - err := ms3.Write("file", []byte("test")) + err := ms3.Write(ctx, "file", []byte("test")) c.Assert(err, Equals, test.mh.err) - _, err = ms3.Read("file") + _, err = ms3.Read(ctx, "file") c.Assert(err, Equals, test.mh.err) - _, err = ms3.FileExists("file") + _, err = ms3.FileExists(ctx, "file") if err != nil { c.Assert(err, Equals, test.mh.err) } @@ -423,10 +428,12 @@ type mockS3Handler struct { err error } -func (c *mockS3Handler) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { +func (c *mockS3Handler) HeadObjectWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) { return nil, c.err } -func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) { +func (c *mockS3Handler) GetObjectWithContext(ctx context.Context, + input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { if c.err != nil { return nil, c.err } @@ -434,12 +441,15 @@ func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput Body: ioutil.NopCloser(strings.NewReader("HappyFace.jpg")), }, nil } -func (c *mockS3Handler) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) { +func (c *mockS3Handler) PutObjectWithContext(ctx context.Context, + input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { return nil, c.err } -func (c *mockS3Handler) HeadBucket(input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) { +func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context, + input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { return nil, c.err } -func (c *mockS3Handler) WaitUntilObjectExists(input *s3.HeadObjectInput) error { +func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context, + input *s3.HeadObjectInput, opts ...request.WaiterOption) error { return c.err }