Skip to content

Commit

Permalink
feat(crypt): force stream upload for supported drivers (#6270)
Browse files Browse the repository at this point in the history
  • Loading branch information
NewbieOrange authored Mar 29, 2024
1 parent d517add commit e37465e
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 17 deletions.
8 changes: 6 additions & 2 deletions drivers/189pc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,17 @@ func (y *Cloud189PC) Remove(ctx context.Context, obj model.Obj) error {

func (y *Cloud189PC) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
// 响应时间长,按需启用
if y.Addition.RapidUpload {
if y.Addition.RapidUpload && !stream.IsForceStreamUpload() {
if newObj, err := y.RapidUpload(ctx, dstDir, stream); err == nil {
return newObj, nil
}
}

switch y.UploadMethod {
uploadMethod := y.UploadMethod
if stream.IsForceStreamUpload() {
uploadMethod = "stream"
}
switch uploadMethod {
case "old":
return y.OldUpload(ctx, dstDir, stream, up)
case "rapid":
Expand Down
15 changes: 9 additions & 6 deletions drivers/aliyundrive_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
count := int(math.Ceil(float64(stream.GetSize()) / float64(partSize)))
createData["part_info_list"] = makePartInfos(count)
// rapid upload
rapidUpload := stream.GetSize() > 100*utils.KB && d.RapidUpload
rapidUpload := !stream.IsForceStreamUpload() && stream.GetSize() > 100*utils.KB && d.RapidUpload
if rapidUpload {
log.Debugf("[aliyundrive_open] start cal pre_hash")
// read 1024 bytes to calculate pre hash
Expand Down Expand Up @@ -242,13 +242,16 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
if remain := stream.GetSize() - offset; length > remain {
length = remain
}
//rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
rd, err := stream.RangeRead(http_range.Range{Start: offset, Length: length})
if err != nil {
return nil, err
rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
if rapidUpload {
srd, err := stream.RangeRead(http_range.Range{Start: offset, Length: length})
if err != nil {
return nil, err
}
rd = utils.NewMultiReadable(srd)
}
err = retry.Do(func() error {
//rd.Reset()
rd.Reset()
return d.uploadPart(ctx, rd, createResp.PartInfoList[i])
},
retry.Attempts(3),
Expand Down
13 changes: 7 additions & 6 deletions drivers/crypt/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package crypt
import (
"context"
"fmt"
"github.com/alist-org/alist/v3/internal/stream"
"io"
stdpath "path"
"regexp"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/http_range"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/alist-org/alist/v3/server/common"
Expand Down Expand Up @@ -160,7 +160,7 @@ func (d *Crypt) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([
// discarding hash as it's encrypted
}
if d.Thumbnail && thumb == "" {
thumb = utils.EncodePath(common.GetApiUrl(nil) + stdpath.Join("/d", args.ReqPath, ".thumbnails", name+".webp"), true)
thumb = utils.EncodePath(common.GetApiUrl(nil)+stdpath.Join("/d", args.ReqPath, ".thumbnails", name+".webp"), true)
}
if !ok && !d.Thumbnail {
result = append(result, &objRes)
Expand Down Expand Up @@ -389,10 +389,11 @@ func (d *Crypt) Put(ctx context.Context, dstDir model.Obj, streamer model.FileSt
Modified: streamer.ModTime(),
IsFolder: streamer.IsDir(),
},
Reader: wrappedIn,
Mimetype: "application/octet-stream",
WebPutAsTask: streamer.NeedStore(),
Exist: streamer.GetExist(),
Reader: wrappedIn,
Mimetype: "application/octet-stream",
WebPutAsTask: streamer.NeedStore(),
ForceStreamUpload: true,
Exist: streamer.GetExist(),
}
err = op.Put(ctx, d.remoteStorage, dstDirActualPath, streamOut, up, false)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/model/obj.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type FileStreamer interface {
GetMimetype() string
//SetReader(io.Reader)
NeedStore() bool
IsForceStreamUpload() bool
GetExist() Obj
SetExist(Obj)
//for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullInTempFile still works
Expand Down
12 changes: 9 additions & 3 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type FileStream struct {
Ctx context.Context
model.Obj
io.Reader
Mimetype string
WebPutAsTask bool
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
Mimetype string
WebPutAsTask bool
ForceStreamUpload bool
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
utils.Closers
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
peekBuff *bytes.Reader
Expand All @@ -43,6 +44,11 @@ func (f *FileStream) GetMimetype() string {
func (f *FileStream) NeedStore() bool {
return f.WebPutAsTask
}

func (f *FileStream) IsForceStreamUpload() bool {
return f.ForceStreamUpload
}

func (f *FileStream) Close() error {
var err1, err2 error
err1 = f.Closers.Close()
Expand Down

0 comments on commit e37465e

Please sign in to comment.