From 5cba10446e50c4dccc9697e6e5e185b479ac9f1b Mon Sep 17 00:00:00 2001 From: Andy Hsu Date: Fri, 14 Apr 2023 15:48:39 +0800 Subject: [PATCH] fix(123): adapt new upload method (close #4141) --- drivers/123/driver.go | 36 ++++++------ drivers/123/types.go | 8 +++ drivers/123/upload.go | 127 ++++++++++++++++++++++++++++++++++++++++++ drivers/123/util.go | 24 ++++---- 4 files changed, 168 insertions(+), 27 deletions(-) create mode 100644 drivers/123/upload.go diff --git a/drivers/123/driver.go b/drivers/123/driver.go index 8398097ffd5..a96907c4650 100644 --- a/drivers/123/driver.go +++ b/drivers/123/driver.go @@ -230,23 +230,27 @@ func (d *Pan123) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr if resp.Data.Reuse || resp.Data.Key == "" { return nil } - cfg := &aws.Config{ - Credentials: credentials.NewStaticCredentials(resp.Data.AccessKeyId, resp.Data.SecretAccessKey, resp.Data.SessionToken), - Region: aws.String("123pan"), - Endpoint: aws.String(resp.Data.EndPoint), - S3ForcePathStyle: aws.Bool(true), - } - s, err := session.NewSession(cfg) - if err != nil { - return err - } - uploader := s3manager.NewUploader(s) - input := &s3manager.UploadInput{ - Bucket: &resp.Data.Bucket, - Key: &resp.Data.Key, - Body: uploadFile, + if resp.Data.AccessKeyId == "" || resp.Data.SecretAccessKey == "" || resp.Data.SessionToken == "" { + err = d.newUpload(ctx, &resp, stream, uploadFile, up) + } else { + cfg := &aws.Config{ + Credentials: credentials.NewStaticCredentials(resp.Data.AccessKeyId, resp.Data.SecretAccessKey, resp.Data.SessionToken), + Region: aws.String("123pan"), + Endpoint: aws.String(resp.Data.EndPoint), + S3ForcePathStyle: aws.Bool(true), + } + s, err := session.NewSession(cfg) + if err != nil { + return err + } + uploader := s3manager.NewUploader(s) + input := &s3manager.UploadInput{ + Bucket: &resp.Data.Bucket, + Key: &resp.Data.Key, + Body: uploadFile, + } + _, err = uploader.UploadWithContext(ctx, input) } - _, err = uploader.UploadWithContext(ctx, input) if err != nil { return err } diff --git a/drivers/123/types.go b/drivers/123/types.go index 8d855fdfd66..424189a9a68 100644 --- a/drivers/123/types.go +++ b/drivers/123/types.go @@ -75,5 +75,13 @@ type UploadResp struct { FileId int64 `json:"FileId"` Reuse bool `json:"Reuse"` EndPoint string `json:"EndPoint"` + StorageNode string `json:"StorageNode"` + UploadId string `json:"UploadId"` + } `json:"data"` +} + +type S3PreSignedURLs struct { + Data struct { + PreSignedUrls map[string]string `json:"presignedUrls"` } `json:"data"` } diff --git a/drivers/123/upload.go b/drivers/123/upload.go new file mode 100644 index 00000000000..81aefb4150a --- /dev/null +++ b/drivers/123/upload.go @@ -0,0 +1,127 @@ +package _123 + +import ( + "context" + "fmt" + "io" + "math" + "net/http" + "strconv" + + "github.com/alist-org/alist/v3/drivers/base" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/pkg/utils" + "github.com/go-resty/resty/v2" +) + +func (d *Pan123) getS3PreSignedUrls(ctx context.Context, upReq *UploadResp, start, end int) (*S3PreSignedURLs, error) { + data := base.Json{ + "bucket": upReq.Data.Bucket, + "key": upReq.Data.Key, + "partNumberEnd": end, + "partNumberStart": start, + "uploadId": upReq.Data.UploadId, + "StorageNode": upReq.Data.StorageNode, + } + var s3PreSignedUrls S3PreSignedURLs + _, err := d.request(S3PreSignedUrls, http.MethodPost, func(req *resty.Request) { + req.SetBody(data).SetContext(ctx) + }, &s3PreSignedUrls) + if err != nil { + return nil, err + } + return &s3PreSignedUrls, nil +} + +func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp) error { + data := base.Json{ + "bucket": upReq.Data.Bucket, + "key": upReq.Data.Key, + "uploadId": upReq.Data.UploadId, + "StorageNode": upReq.Data.StorageNode, + } + _, err := d.request(S3Complete, http.MethodPost, func(req *resty.Request) { + req.SetBody(data).SetContext(ctx) + }, nil) + return err +} + +func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, reader io.Reader, up driver.UpdateProgress) error { + chunkSize := int64(1024 * 1024 * 5) + // fetch s3 pre signed urls + chunkCount := int(math.Ceil(float64(file.GetSize()) / float64(chunkSize))) + // upload 10 chunks each batch + batchSize := 10 + for i := 1; i <= chunkCount; i += batchSize { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + start := i + end := i + batchSize + if end > chunkCount+1 { + end = chunkCount + 1 + } + s3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, start, end) + if err != nil { + return err + } + // upload each chunk + for j := start; j < end; j++ { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + curSize := chunkSize + if j == chunkCount { + curSize = file.GetSize() - (int64(chunkCount)-1)*chunkSize + } + err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(reader, chunkSize), curSize, false) + if err != nil { + return err + } + up(j * 100 / chunkCount) + } + } + // complete s3 upload + return d.completeS3(ctx, upReq) +} + +func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool) error { + uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)] + if uploadUrl == "" { + return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls) + } + req, err := http.NewRequest("PUT", uploadUrl, reader) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.ContentLength = curSize + //req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10)) + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode == http.StatusForbidden { + if retry { + return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode) + } + // refresh s3 pre signed urls + newS3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, cur, end) + if err != nil { + return err + } + s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls + // retry + return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true) + } + if res.StatusCode != http.StatusOK { + body, err := io.ReadAll(res.Body) + if err != nil { + return err + } + return fmt.Errorf("upload s3 chunk %d failed, status code: %d, body: %s", cur, res.StatusCode, body) + } + return nil +} diff --git a/drivers/123/util.go b/drivers/123/util.go index 24880c07e76..ffb888d036e 100644 --- a/drivers/123/util.go +++ b/drivers/123/util.go @@ -15,17 +15,19 @@ import ( // do others that not defined in Driver interface const ( - API = "https://www.123pan.com/b/api" - SignIn = API + "/user/sign_in" - UserInfo = API + "/user/info" - FileList = API + "/file/list/new" - DownloadInfo = API + "/file/download_info" - Mkdir = API + "/file/upload_request" - Move = API + "/file/mod_pid" - Rename = API + "/file/rename" - Trash = API + "/file/trash" - UploadRequest = API + "/file/upload_request" - UploadComplete = API + "/file/upload_complete" + API = "https://www.123pan.com/b/api" + SignIn = API + "/user/sign_in" + UserInfo = API + "/user/info" + FileList = API + "/file/list/new" + DownloadInfo = API + "/file/download_info" + Mkdir = API + "/file/upload_request" + Move = API + "/file/mod_pid" + Rename = API + "/file/rename" + Trash = API + "/file/trash" + UploadRequest = API + "/file/upload_request" + UploadComplete = API + "/file/upload_complete" + S3PreSignedUrls = API + "/file/s3_repare_upload_parts_batch" + S3Complete = API + "/file/s3_complete_multipart_upload" ) func (d *Pan123) login() error {