Skip to content

Commit

Permalink
fix(cloudreve): support upload to remote and OneDrive storage (#7632 c…
Browse files Browse the repository at this point in the history
…lose #6882)

- Add support for remote and OneDrive storage types
- Implement new upload methods for different storage types
- Update driver to handle various storage policies
- Add error handling and session cleanup for failed uploads
  • Loading branch information
xrgzs authored Dec 9, 2024
1 parent 016e169 commit 2a03530
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 30 deletions.
73 changes: 46 additions & 27 deletions drivers/cloudreve/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -134,6 +135,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
if io.ReadCloser(stream) == http.NoBody {
return d.create(ctx, dstDir, stream)
}

// 获取存储策略
var r DirectoryResp
err := d.request(http.MethodGet, "/directory"+dstDir.GetPath(), nil, &r)
if err != nil {
Expand All @@ -146,43 +149,59 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
"policy_id": r.Policy.Id,
"last_modified": stream.ModTime().Unix(),
}

// 获取上传会话信息
var u UploadInfo
err = d.request(http.MethodPut, "/file/upload", func(req *resty.Request) {
req.SetBody(uploadBody)
}, &u)
if err != nil {
return err
}
var chunkSize = u.ChunkSize
var buf []byte
var chunk int
for {
var n int
buf = make([]byte, chunkSize)
n, err = io.ReadAtLeast(stream, buf, chunkSize)
if err != nil && err != io.ErrUnexpectedEOF {
if err == io.EOF {
return nil
}
return err
}

if n == 0 {
break
}
buf = buf[:n]
err = d.request(http.MethodPost, "/file/upload/"+u.SessionID+"/"+strconv.Itoa(chunk), func(req *resty.Request) {
req.SetHeader("Content-Type", "application/octet-stream")
req.SetHeader("Content-Length", strconv.Itoa(n))
req.SetBody(buf)
}, nil)
if err != nil {
break
// 根据存储方式选择分片上传的方法
switch r.Policy.Type {
case "onedrive":
err = d.upOneDrive(ctx, stream, u, up)
case "remote": // 从机存储
err = d.upRemote(ctx, stream, u, up)
case "local": // 本机存储
var chunkSize = u.ChunkSize
var buf []byte
var chunk int
for {
var n int
buf = make([]byte, chunkSize)
n, err = io.ReadAtLeast(stream, buf, chunkSize)
if err != nil && err != io.ErrUnexpectedEOF {
if err == io.EOF {
return nil
}
return err
}
if n == 0 {
break
}
buf = buf[:n]
err = d.request(http.MethodPost, "/file/upload/"+u.SessionID+"/"+strconv.Itoa(chunk), func(req *resty.Request) {
req.SetHeader("Content-Type", "application/octet-stream")
req.SetHeader("Content-Length", strconv.Itoa(n))
req.SetBody(buf)
}, nil)
if err != nil {
break
}
chunk++
}
chunk++

default:
err = errs.NotImplement
}
return err
if err != nil {
// 删除失败的会话
err = d.request(http.MethodDelete, "/file/upload/"+u.SessionID, nil, nil)
return err
}
return nil
}

func (d *Cloudreve) create(ctx context.Context, dir model.Obj, file model.Obj) error {
Expand Down
8 changes: 5 additions & 3 deletions drivers/cloudreve/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ type Policy struct {
}

type UploadInfo struct {
SessionID string `json:"sessionID"`
ChunkSize int `json:"chunkSize"`
Expires int `json:"expires"`
SessionID string `json:"sessionID"`
ChunkSize int `json:"chunkSize"`
Expires int `json:"expires"`
UploadURLs []string `json:"uploadURLs"`
Credential string `json:"credential,omitempty"`
}

type DirectoryResp struct {
Expand Down
99 changes: 99 additions & 0 deletions drivers/cloudreve/util.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package cloudreve

import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/setting"
"github.com/alist-org/alist/v3/pkg/cookie"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/go-resty/resty/v2"
json "github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -172,3 +179,95 @@ func (d *Cloudreve) GetThumb(file Object) (model.Thumbnail, error) {
Thumbnail: resp.Header().Get("Location"),
}, nil
}

func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
uploadUrl := u.UploadURLs[0]
credential := u.Credential
var finish int64 = 0
var chunk int = 0
DEFAULT := int64(u.ChunkSize)
for finish < stream.GetSize() {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
utils.Log.Debugf("[Cloudreve-Remote] upload: %d", finish)
var byteSize = DEFAULT
left := stream.GetSize() - finish
if left < DEFAULT {
byteSize = left
}
byteData := make([]byte, byteSize)
n, err := io.ReadFull(stream, byteData)
utils.Log.Debug(err, n)
if err != nil {
return err
}
req, err := http.NewRequest("POST", uploadUrl+"?chunk="+strconv.Itoa(chunk), bytes.NewBuffer(byteData))
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
req.Header.Set("Authorization", fmt.Sprint(credential))
finish += byteSize
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
res.Body.Close()
up(float64(finish) * 100 / float64(stream.GetSize()))
chunk++
}
return nil
}

func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
uploadUrl := u.UploadURLs[0]
var finish int64 = 0
DEFAULT := int64(u.ChunkSize)
for finish < stream.GetSize() {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
utils.Log.Debugf("[Cloudreve-OneDrive] upload: %d", finish)
var byteSize = DEFAULT
left := stream.GetSize() - finish
if left < DEFAULT {
byteSize = left
}
byteData := make([]byte, byteSize)
n, err := io.ReadFull(stream, byteData)
utils.Log.Debug(err, n)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(byteData))
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header.Set("Content-Length", strconv.Itoa(int(byteSize)))
req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize()))
finish += byteSize
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
// https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession
if res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200 {
data, _ := io.ReadAll(res.Body)
res.Body.Close()
return errors.New(string(data))
}
res.Body.Close()
up(float64(finish) * 100 / float64(stream.GetSize()))
}
// 上传成功发送回调请求
err := d.request(http.MethodPost, "/callback/onedrive/finish/"+u.SessionID, func(req *resty.Request) {
req.SetBody("{}")
}, nil)
if err != nil {
return err
}
return nil
}

0 comments on commit 2a03530

Please sign in to comment.