diff --git a/drivers/123/upload.go b/drivers/123/upload.go index a5b65fef0c4..ae28d6aa519 100644 --- a/drivers/123/upload.go +++ b/drivers/123/upload.go @@ -34,6 +34,25 @@ func (d *Pan123) getS3PreSignedUrls(ctx context.Context, upReq *UploadResp, star return &s3PreSignedUrls, nil } +func (d *Pan123) getS3Auth(ctx context.Context, upReq *UploadResp, start, end int) (*S3PreSignedURLs, error) { + data := base.Json{ + "StorageNode": upReq.Data.StorageNode, + "bucket": upReq.Data.Bucket, + "key": upReq.Data.Key, + "partNumberEnd": end, + "partNumberStart": start, + "uploadId": upReq.Data.UploadId, + } + var s3PreSignedUrls S3PreSignedURLs + _, err := d.request(S3Auth, http.MethodPost, func(req *resty.Request) { + req.SetBody(data).SetContext(ctx) + }, &s3PreSignedUrls) + if err != nil { + return nil, err + } + return &s3PreSignedUrls, nil +} + func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.FileStreamer, isMultipart bool) error { data := base.Json{ "StorageNode": upReq.Data.StorageNode, @@ -51,11 +70,17 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F } func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, reader io.Reader, up driver.UpdateProgress) error { - chunkSize := int64(1024 * 1024 * 5) + chunkSize := int64(1024 * 1024 * 16) // fetch s3 pre signed urls chunkCount := int(math.Ceil(float64(file.GetSize()) / float64(chunkSize))) - // upload 10 chunks each batch - batchSize := 10 + // only 1 batch is allowed + isMultipart := chunkCount > 1 + batchSize := 1 + getS3UploadUrl := d.getS3Auth + if isMultipart { + batchSize = 10 + getS3UploadUrl = d.getS3PreSignedUrls + } for i := 1; i <= chunkCount; i += batchSize { if utils.IsCanceled(ctx) { return ctx.Err() @@ -65,7 +90,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi if end > chunkCount+1 { end = chunkCount + 1 } - s3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, start, end) + s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end) if err != nil { return err } @@ -78,7 +103,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi if j == chunkCount { curSize = file.GetSize() - (int64(chunkCount)-1)*chunkSize } - err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(reader, chunkSize), curSize, false) + err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.LimitReader(reader, chunkSize), curSize, false, getS3UploadUrl) if err != nil { return err } @@ -89,7 +114,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi return d.completeS3(ctx, upReq, file, chunkCount > 1) } -func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool) error { +func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader io.Reader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error { uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)] if uploadUrl == "" { return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls) @@ -111,13 +136,13 @@ func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSign return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode) } // refresh s3 pre signed urls - newS3PreSignedUrls, err := d.getS3PreSignedUrls(ctx, upReq, cur, end) + newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end) if err != nil { return err } s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls // retry - return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true) + return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl) } if res.StatusCode != http.StatusOK { body, err := io.ReadAll(res.Body)