Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pikpak):fix webdav upload issue #7050

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 21 additions & 31 deletions drivers/pikpak/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/alist-org/alist/v3/internal/op"
"golang.org/x/oauth2"
"io"
"net/http"
"strconv"
"strings"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils"
hash_extend "github.com/alist-org/alist/v3/pkg/utils/hash"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/oauth2"
"net/http"
"strconv"
"strings"
)

type PikPak struct {
Expand Down Expand Up @@ -123,10 +117,16 @@ func (d *PikPak) Init(ctx context.Context) (err error) {
d.AccessToken = token.AccessToken

// 获取CaptchaToken
err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Common.UserID)
err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Username)
if err != nil {
return err
}

// 获取用户ID
userID := token.Extra("sub").(string)
if userID != "" {
d.Common.SetUserID(userID)
}
// 更新UserAgent
if d.Platform == "android" {
d.Common.UserAgent = BuildCustomUserAgent(utils.GetMD5EncodeStr(d.Username+d.Password), AndroidClientID, AndroidPackageName, AndroidSdkVersion, AndroidClientVersion, AndroidPackageName, d.Common.UserID)
Expand Down Expand Up @@ -271,27 +271,17 @@ func (d *PikPak) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
}

params := resp.Resumable.Params
endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".")
cfg := &aws.Config{
Credentials: credentials.NewStaticCredentials(params.AccessKeyID, params.AccessKeySecret, params.SecurityToken),
Region: aws.String("pikpak"),
Endpoint: &endpoint,
//endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".")
// web 端上传 返回的endpoint 为 `mypikpak.com` | android 端上传 返回的endpoint 为 `vip-lixian-07.mypikpak.com`·
if d.Addition.Platform == "android" {
params.Endpoint = "mypikpak.com"
}
ss, err := session.NewSession(cfg)
if err != nil {
return err
}
uploader := s3manager.NewUploader(ss)
if stream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize {
uploader.PartSize = stream.GetSize() / (s3manager.MaxUploadParts - 1)
}
input := &s3manager.UploadInput{
Bucket: &params.Bucket,
Key: &params.Key,
Body: io.TeeReader(stream, driver.NewProgress(stream.GetSize(), up)),

if stream.GetSize() <= 10*utils.MB { // 文件大小 小于10MB,改用普通模式上传
return d.UploadByOSS(&params, stream, up)
}
_, err = uploader.UploadWithContext(ctx, input)
return err
// 分片上传
return d.UploadByMultipart(&params, stream.GetSize(), stream, up)
}

// 离线下载文件
Expand Down
24 changes: 13 additions & 11 deletions drivers/pikpak/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ type UploadTaskData struct {
UploadType string `json:"upload_type"`
//UPLOAD_TYPE_RESUMABLE
Resumable *struct {
Kind string `json:"kind"`
Params struct {
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
} `json:"params"`
Provider string `json:"provider"`
Kind string `json:"kind"`
Params S3Params `json:"params"`
Provider string `json:"provider"`
} `json:"resumable"`

File File `json:"file"`
}

type S3Params struct {
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
}

// 添加离线下载响应
type OfflineDownloadResp struct {
File *string `json:"file"`
Expand Down
249 changes: 248 additions & 1 deletion drivers/pikpak/util.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package pikpak

import (
"bytes"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"io"
"net/http"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/alist-org/alist/v3/drivers/base"
Expand Down Expand Up @@ -56,6 +63,12 @@ var WebAlgorithms = []string{
"NhXXU9rg4XXdzo7u5o",
}

const (
OSSUserAgent = "aliyun-sdk-android/2.9.13(Linux/Android 14/M2004j7ac;UKQ1.231108.001)"
OssSecurityTokenHeaderName = "X-OSS-Security-Token"
ThreadsNum = 10
)

const (
AndroidClientID = "YNxT9w7GMdWvEOKa"
AndroidClientSecret = "dbw2OtmVEeuUvIptb1Coyg"
Expand Down Expand Up @@ -393,3 +406,237 @@ func (d *PikPak) refreshCaptchaToken(action string, metas map[string]string) err
d.Common.SetCaptchaToken(resp.CaptchaToken)
return nil
}

func (d *PikPak) UploadByOSS(params *S3Params, stream model.FileStreamer, up driver.UpdateProgress) error {
ossClient, err := oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret)
if err != nil {
return err
}
bucket, err := ossClient.Bucket(params.Bucket)
if err != nil {
return err
}

err = bucket.PutObject(params.Key, stream, OssOption(params)...)
if err != nil {
return err
}
return nil
}

func (d *PikPak) UploadByMultipart(params *S3Params, fileSize int64, stream model.FileStreamer, up driver.UpdateProgress) error {
var (
chunks []oss.FileChunk
parts []oss.UploadPart
imur oss.InitiateMultipartUploadResult
ossClient *oss.Client
bucket *oss.Bucket
err error
)

tmpF, err := stream.CacheFullInTempFile()
if err != nil {
return err
}

if ossClient, err = oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret); err != nil {
return err
}

if bucket, err = ossClient.Bucket(params.Bucket); err != nil {
return err
}

ticker := time.NewTicker(time.Hour * 12)
defer ticker.Stop()
// 设置超时
timeout := time.NewTimer(time.Hour * 24)

if chunks, err = SplitFile(fileSize); err != nil {
return err
}

if imur, err = bucket.InitiateMultipartUpload(params.Key,
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
); err != nil {
return err
}

wg := sync.WaitGroup{}
wg.Add(len(chunks))

chunksCh := make(chan oss.FileChunk)
errCh := make(chan error)
UploadedPartsCh := make(chan oss.UploadPart)
quit := make(chan struct{})

// producer
go chunksProducer(chunksCh, chunks)
go func() {
wg.Wait()
quit <- struct{}{}
}()

// consumers
for i := 0; i < ThreadsNum; i++ {
go func(threadId int) {
defer func() {
if r := recover(); r != nil {
errCh <- fmt.Errorf("recovered in %v", r)
}
}()
for chunk := range chunksCh {
var part oss.UploadPart // 出现错误就继续尝试,共尝试3次
for retry := 0; retry < 3; retry++ {
select {
case <-ticker.C:
errCh <- errors.Wrap(err, "ossToken 过期")
default:
}

buf := make([]byte, chunk.Size)
if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) {
continue
}

b := bytes.NewBuffer(buf)
if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, OssOption(params)...); err == nil {
break
}
}
if err != nil {
errCh <- errors.Wrap(err, fmt.Sprintf("上传 %s 的第%d个分片时出现错误:%v", stream.GetName(), chunk.Number, err))
}
UploadedPartsCh <- part
}
}(i)
}

go func() {
for part := range UploadedPartsCh {
parts = append(parts, part)
wg.Done()
}
}()
LOOP:
for {
select {
case <-ticker.C:
// ossToken 过期
return err
case <-quit:
break LOOP
case <-errCh:
return err
case <-timeout.C:
return fmt.Errorf("time out")
}
}

// EOF错误是xml的Unmarshal导致的,响应其实是json格式,所以实际上上传是成功的
if _, err = bucket.CompleteMultipartUpload(imur, parts, OssOption(params)...); err != nil && !errors.Is(err, io.EOF) {
// 当文件名含有 &< 这两个字符之一时响应的xml解析会出现错误,实际上上传是成功的
if filename := filepath.Base(stream.GetName()); !strings.ContainsAny(filename, "&<") {
return err
}
}
return nil
}

func chunksProducer(ch chan oss.FileChunk, chunks []oss.FileChunk) {
for _, chunk := range chunks {
ch <- chunk
}
}

func SplitFile(fileSize int64) (chunks []oss.FileChunk, err error) {
for i := int64(1); i < 10; i++ {
if fileSize < i*utils.GB { // 文件大小小于iGB时分为i*100片
if chunks, err = SplitFileByPartNum(fileSize, int(i*100)); err != nil {
return
}
break
}
}
if fileSize > 9*utils.GB { // 文件大小大于9GB时分为1000片
if chunks, err = SplitFileByPartNum(fileSize, 1000); err != nil {
return
}
}
// 单个分片大小不能小于1MB
if chunks[0].Size < 1*utils.MB {
if chunks, err = SplitFileByPartSize(fileSize, 1*utils.MB); err != nil {
return
}
}
return
}

// SplitFileByPartNum splits big file into parts by the num of parts.
// Split the file with specified parts count, returns the split result when error is nil.
func SplitFileByPartNum(fileSize int64, chunkNum int) ([]oss.FileChunk, error) {
if chunkNum <= 0 || chunkNum > 10000 {
return nil, errors.New("chunkNum invalid")
}

if int64(chunkNum) > fileSize {
return nil, errors.New("oss: chunkNum invalid")
}

var chunks []oss.FileChunk
chunk := oss.FileChunk{}
chunkN := (int64)(chunkNum)
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * (fileSize / chunkN)
if i == chunkN-1 {
chunk.Size = fileSize/chunkN + fileSize%chunkN
} else {
chunk.Size = fileSize / chunkN
}
chunks = append(chunks, chunk)
}

return chunks, nil
}

// SplitFileByPartSize splits big file into parts by the size of parts.
// Splits the file by the part size. Returns the FileChunk when error is nil.
func SplitFileByPartSize(fileSize int64, chunkSize int64) ([]oss.FileChunk, error) {
if chunkSize <= 0 {
return nil, errors.New("chunkSize invalid")
}

chunkN := fileSize / chunkSize
if chunkN >= 10000 {
return nil, errors.New("Too many parts, please increase part size")
}

var chunks []oss.FileChunk
chunk := oss.FileChunk{}
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * chunkSize
chunk.Size = chunkSize
chunks = append(chunks, chunk)
}

if fileSize%chunkSize > 0 {
chunk.Number = len(chunks) + 1
chunk.Offset = int64(len(chunks)) * chunkSize
chunk.Size = fileSize % chunkSize
chunks = append(chunks, chunk)
}

return chunks, nil
}

// OssOption get options
func OssOption(params *S3Params) []oss.Option {
options := []oss.Option{
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
}
return options
}
Loading