Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielZhangQD committed Dec 13, 2019
1 parent 296c6cf commit a0898c7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 30 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
cloud.google.com/go/storage v1.4.0
github.com/aws/aws-sdk-go v1.26.1
github.com/cheggaaa/pb/v3 v3.0.1
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmx
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.26.1 h1:JGQggXhOiNJIqsmbYUl3cYtJZUffeOWlHtxfzGK7WPI=
github.com/aws/aws-sdk-go v1.26.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -171,6 +173,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ func (r *testStorageSuite) TestCreateStorage(c *C) {
fakeCredentialsFile.Close()
os.Remove(fakeCredentialsFile.Name())
}()
gcsOpt := &storage.BackendOptions{
GCS: storage.GCSBackendOptions{
gcsOpt := &BackendOptions{
GCS: GCSBackendOptions{
Endpoint: "https://gcs.example.com/",
CredentialsFile: fakeCredentialsFile.Name(),
},
}
s, err = storage.ParseBackend("gcs://bucket2/prefix/", gcsOpt)
s, err = ParseBackend("gcs://bucket2/prefix/", gcsOpt)
c.Assert(err, IsNil)
gcs := s.GetGcs()
c.Assert(gcs, NotNil)
c.Assert(gcs.Bucket, Equals, "bucket2")
c.Assert(gcs.Prefix, Equals, "prefix/")
c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/")
s, err = storage.ParseBackend("gcs://bucket/more/prefix/", gcsOpt)
s, err = ParseBackend("gcs://bucket/more/prefix/", gcsOpt)
c.Assert(err, IsNil)
gcs = s.GetGcs()
c.Assert(gcs, NotNil)
Expand Down Expand Up @@ -106,7 +106,7 @@ func (r *testStorageSuite) TestFormatBackendURL(c *C) {
})
c.Assert(url.String(), Equals, "s3://bucket/some%20prefix/")

url = storage.FormatBackendURL(&backup.StorageBackend{
url = FormatBackendURL(&backup.StorageBackend{
Backend: &backup.StorageBackend_Gcs{
Gcs: &backup.GCS{
Bucket: "bucket",
Expand Down
30 changes: 14 additions & 16 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package storage

import (
"bytes"
"context"
"io/ioutil"
"net/url"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/spf13/pflag"
Expand All @@ -34,11 +36,11 @@ const (

// s3Handlers make it easy to inject test functions
type s3Handlers interface {
HeadObject(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
GetObject(*s3.GetObjectInput) (*s3.GetObjectOutput, error)
PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error)
HeadBucket(*s3.HeadBucketInput) (*s3.HeadBucketOutput, error)
WaitUntilObjectExists(*s3.HeadObjectInput) error
HeadObjectWithContext(context.Context, *s3.HeadObjectInput, ...request.Option) (*s3.HeadObjectOutput, error)
GetObjectWithContext(context.Context, *s3.GetObjectInput, ...request.Option) (*s3.GetObjectOutput, error)
PutObjectWithContext(context.Context, *s3.PutObjectInput, ...request.Option) (*s3.PutObjectOutput, error)
HeadBucketWithContext(context.Context, *s3.HeadBucketInput, ...request.Option) (*s3.HeadBucketOutput, error)
WaitUntilObjectExistsWithContext(context.Context, *s3.HeadObjectInput, ...request.WaiterOption) error
}

// S3Storage info for s3 storage
Expand Down Expand Up @@ -214,7 +216,7 @@ var checkS3Bucket = func(svc *s3.S3, bucket string) error {
}

// Write write to s3 storage
func (rs *S3Storage) Write(file string, data []byte) error {
func (rs *S3Storage) Write(ctx context.Context, file string, data []byte) error {
input := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(bytes.NewReader(data)),
Bucket: aws.String(rs.options.Bucket),
Expand All @@ -230,29 +232,26 @@ func (rs *S3Storage) Write(file string, data []byte) error {
input = input.SetStorageClass(rs.options.StorageClass)
}

// TODO: PutObjectWithContext
_, err := rs.svc.PutObject(input)
_, err := rs.svc.PutObjectWithContext(ctx, input)
if err != nil {
return err
}
hinput := &s3.HeadObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}
// TODO: WaitUntilObjectExistsWithContext
err = rs.svc.WaitUntilObjectExists(hinput)
err = rs.svc.WaitUntilObjectExistsWithContext(ctx, hinput)
return err
}

// Read read file from s3
func (rs *S3Storage) Read(file string) ([]byte, error) {
func (rs *S3Storage) Read(ctx context.Context, file string) ([]byte, error) {
input := &s3.GetObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}

// TODO: GetObjectWithContext
result, err := rs.svc.GetObject(input)
result, err := rs.svc.GetObjectWithContext(ctx, input)
if err != nil {
return nil, err
}
Expand All @@ -265,14 +264,13 @@ func (rs *S3Storage) Read(file string) ([]byte, error) {
}

// FileExists check if file exists on s3 storage
func (rs *S3Storage) FileExists(file string) (bool, error) {
func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) {
input := &s3.HeadObjectInput{
Bucket: aws.String(rs.options.Bucket),
Key: aws.String(rs.options.Prefix + file),
}

// TODO: HeadObjectWithContext
_, err := rs.svc.HeadObject(input)
_, err := rs.svc.HeadObjectWithContext(ctx, input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
Expand Down
28 changes: 19 additions & 9 deletions pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package storage

import (
"context"
"io/ioutil"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -231,6 +234,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) {
}
testFn := func(test *testcase, c *C) {
c.Log(test.name)
ctx := aws.BackgroundContext()
sendCredential = true
if test.hackCheck {
checkS3Bucket = func(svc *s3.S3, bucket string) error { return nil }
Expand All @@ -240,7 +244,7 @@ func (r *testStorageSuite) TestS3Storage(c *C) {
S3: test.s3,
},
}
_, err := Create(s3)
_, err := Create(ctx, s3)
if test.errReturn {
c.Assert(err, NotNil)
return
Expand Down Expand Up @@ -348,15 +352,16 @@ func (r *testStorageSuite) TestS3Handlers(c *C) {

testFn := func(test *testcase, c *C) {
c.Log(test.name)
ctx := aws.BackgroundContext()
ms3 := S3Storage{
svc: test.mh,
options: test.options,
}
err := ms3.Write("file", []byte("test"))
err := ms3.Write(ctx, "file", []byte("test"))
c.Assert(err, Equals, test.mh.err)
_, err = ms3.Read("file")
_, err = ms3.Read(ctx, "file")
c.Assert(err, Equals, test.mh.err)
_, err = ms3.FileExists("file")
_, err = ms3.FileExists(ctx, "file")
if err != nil {
c.Assert(err, Equals, test.mh.err)
}
Expand Down Expand Up @@ -423,23 +428,28 @@ type mockS3Handler struct {
err error
}

func (c *mockS3Handler) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
func (c *mockS3Handler) HeadObjectWithContext(ctx context.Context,
input *s3.HeadObjectInput, opts ...request.Option) (*s3.HeadObjectOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) {
func (c *mockS3Handler) GetObjectWithContext(ctx context.Context,
input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) {
if c.err != nil {
return nil, c.err
}
return &s3.GetObjectOutput{
Body: ioutil.NopCloser(strings.NewReader("HappyFace.jpg")),
}, nil
}
func (c *mockS3Handler) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
func (c *mockS3Handler) PutObjectWithContext(ctx context.Context,
input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) HeadBucket(input *s3.HeadBucketInput) (*s3.HeadBucketOutput, error) {
func (c *mockS3Handler) HeadBucketWithContext(ctx context.Context,
input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) {
return nil, c.err
}
func (c *mockS3Handler) WaitUntilObjectExists(input *s3.HeadObjectInput) error {
func (c *mockS3Handler) WaitUntilObjectExistsWithContext(ctx context.Context,
input *s3.HeadObjectInput, opts ...request.WaiterOption) error {
return c.err
}

0 comments on commit a0898c7

Please sign in to comment.