Skip to content

Commit

Permalink
fix(pikpak): webdav upload issue (#7050)
Browse files Browse the repository at this point in the history
  • Loading branch information
Three-taile-dragon authored Aug 21, 2024
1 parent 489b28b commit ef5e192
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 43 deletions.
52 changes: 21 additions & 31 deletions drivers/pikpak/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/alist-org/alist/v3/internal/op"
"golang.org/x/oauth2"
"io"
"net/http"
"strconv"
"strings"

"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils"
hash_extend "github.com/alist-org/alist/v3/pkg/utils/hash"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/go-resty/resty/v2"
log "github.com/sirupsen/logrus"
"golang.org/x/oauth2"
"net/http"
"strconv"
"strings"
)

type PikPak struct {
Expand Down Expand Up @@ -123,10 +117,16 @@ func (d *PikPak) Init(ctx context.Context) (err error) {
d.AccessToken = token.AccessToken

// 获取CaptchaToken
err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Common.UserID)
err = d.RefreshCaptchaTokenAtLogin(GetAction(http.MethodGet, "https://api-drive.mypikpak.com/drive/v1/files"), d.Username)
if err != nil {
return err
}

// 获取用户ID
userID := token.Extra("sub").(string)
if userID != "" {
d.Common.SetUserID(userID)
}
// 更新UserAgent
if d.Platform == "android" {
d.Common.UserAgent = BuildCustomUserAgent(utils.GetMD5EncodeStr(d.Username+d.Password), AndroidClientID, AndroidPackageName, AndroidSdkVersion, AndroidClientVersion, AndroidPackageName, d.Common.UserID)
Expand Down Expand Up @@ -271,27 +271,17 @@ func (d *PikPak) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr
}

params := resp.Resumable.Params
endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".")
cfg := &aws.Config{
Credentials: credentials.NewStaticCredentials(params.AccessKeyID, params.AccessKeySecret, params.SecurityToken),
Region: aws.String("pikpak"),
Endpoint: &endpoint,
//endpoint := strings.Join(strings.Split(params.Endpoint, ".")[1:], ".")
// web 端上传 返回的endpoint 为 `mypikpak.com` | android 端上传 返回的endpoint 为 `vip-lixian-07.mypikpak.com`·
if d.Addition.Platform == "android" {
params.Endpoint = "mypikpak.com"
}
ss, err := session.NewSession(cfg)
if err != nil {
return err
}
uploader := s3manager.NewUploader(ss)
if stream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize {
uploader.PartSize = stream.GetSize() / (s3manager.MaxUploadParts - 1)
}
input := &s3manager.UploadInput{
Bucket: &params.Bucket,
Key: &params.Key,
Body: io.TeeReader(stream, driver.NewProgress(stream.GetSize(), up)),

if stream.GetSize() <= 10*utils.MB { // 文件大小 小于10MB,改用普通模式上传
return d.UploadByOSS(&params, stream, up)
}
_, err = uploader.UploadWithContext(ctx, input)
return err
// 分片上传
return d.UploadByMultipart(&params, stream.GetSize(), stream, up)
}

// 离线下载文件
Expand Down
24 changes: 13 additions & 11 deletions drivers/pikpak/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ type UploadTaskData struct {
UploadType string `json:"upload_type"`
//UPLOAD_TYPE_RESUMABLE
Resumable *struct {
Kind string `json:"kind"`
Params struct {
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
} `json:"params"`
Provider string `json:"provider"`
Kind string `json:"kind"`
Params S3Params `json:"params"`
Provider string `json:"provider"`
} `json:"resumable"`

File File `json:"file"`
}

type S3Params struct {
AccessKeyID string `json:"access_key_id"`
AccessKeySecret string `json:"access_key_secret"`
Bucket string `json:"bucket"`
Endpoint string `json:"endpoint"`
Expiration time.Time `json:"expiration"`
Key string `json:"key"`
SecurityToken string `json:"security_token"`
}

// 添加离线下载响应
type OfflineDownloadResp struct {
File *string `json:"file"`
Expand Down
249 changes: 248 additions & 1 deletion drivers/pikpak/util.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package pikpak

import (
"bytes"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"errors"
"fmt"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"io"
"net/http"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/alist-org/alist/v3/drivers/base"
Expand Down Expand Up @@ -56,6 +63,12 @@ var WebAlgorithms = []string{
"NhXXU9rg4XXdzo7u5o",
}

const (
OSSUserAgent = "aliyun-sdk-android/2.9.13(Linux/Android 14/M2004j7ac;UKQ1.231108.001)"
OssSecurityTokenHeaderName = "X-OSS-Security-Token"
ThreadsNum = 10
)

const (
AndroidClientID = "YNxT9w7GMdWvEOKa"
AndroidClientSecret = "dbw2OtmVEeuUvIptb1Coyg"
Expand Down Expand Up @@ -393,3 +406,237 @@ func (d *PikPak) refreshCaptchaToken(action string, metas map[string]string) err
d.Common.SetCaptchaToken(resp.CaptchaToken)
return nil
}

func (d *PikPak) UploadByOSS(params *S3Params, stream model.FileStreamer, up driver.UpdateProgress) error {
ossClient, err := oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret)
if err != nil {
return err
}
bucket, err := ossClient.Bucket(params.Bucket)
if err != nil {
return err
}

err = bucket.PutObject(params.Key, stream, OssOption(params)...)
if err != nil {
return err
}
return nil
}

func (d *PikPak) UploadByMultipart(params *S3Params, fileSize int64, stream model.FileStreamer, up driver.UpdateProgress) error {
var (
chunks []oss.FileChunk
parts []oss.UploadPart
imur oss.InitiateMultipartUploadResult
ossClient *oss.Client
bucket *oss.Bucket
err error
)

tmpF, err := stream.CacheFullInTempFile()
if err != nil {
return err
}

if ossClient, err = oss.New(params.Endpoint, params.AccessKeyID, params.AccessKeySecret); err != nil {
return err
}

if bucket, err = ossClient.Bucket(params.Bucket); err != nil {
return err
}

ticker := time.NewTicker(time.Hour * 12)
defer ticker.Stop()
// 设置超时
timeout := time.NewTimer(time.Hour * 24)

if chunks, err = SplitFile(fileSize); err != nil {
return err
}

if imur, err = bucket.InitiateMultipartUpload(params.Key,
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
); err != nil {
return err
}

wg := sync.WaitGroup{}
wg.Add(len(chunks))

chunksCh := make(chan oss.FileChunk)
errCh := make(chan error)
UploadedPartsCh := make(chan oss.UploadPart)
quit := make(chan struct{})

// producer
go chunksProducer(chunksCh, chunks)
go func() {
wg.Wait()
quit <- struct{}{}
}()

// consumers
for i := 0; i < ThreadsNum; i++ {
go func(threadId int) {
defer func() {
if r := recover(); r != nil {
errCh <- fmt.Errorf("recovered in %v", r)
}
}()
for chunk := range chunksCh {
var part oss.UploadPart // 出现错误就继续尝试,共尝试3次
for retry := 0; retry < 3; retry++ {
select {
case <-ticker.C:
errCh <- errors.Wrap(err, "ossToken 过期")
default:
}

buf := make([]byte, chunk.Size)
if _, err = tmpF.ReadAt(buf, chunk.Offset); err != nil && !errors.Is(err, io.EOF) {
continue
}

b := bytes.NewBuffer(buf)
if part, err = bucket.UploadPart(imur, b, chunk.Size, chunk.Number, OssOption(params)...); err == nil {
break
}
}
if err != nil {
errCh <- errors.Wrap(err, fmt.Sprintf("上传 %s 的第%d个分片时出现错误:%v", stream.GetName(), chunk.Number, err))
}
UploadedPartsCh <- part
}
}(i)
}

go func() {
for part := range UploadedPartsCh {
parts = append(parts, part)
wg.Done()
}
}()
LOOP:
for {
select {
case <-ticker.C:
// ossToken 过期
return err
case <-quit:
break LOOP
case <-errCh:
return err
case <-timeout.C:
return fmt.Errorf("time out")
}
}

// EOF错误是xml的Unmarshal导致的,响应其实是json格式,所以实际上上传是成功的
if _, err = bucket.CompleteMultipartUpload(imur, parts, OssOption(params)...); err != nil && !errors.Is(err, io.EOF) {
// 当文件名含有 &< 这两个字符之一时响应的xml解析会出现错误,实际上上传是成功的
if filename := filepath.Base(stream.GetName()); !strings.ContainsAny(filename, "&<") {
return err
}
}
return nil
}

func chunksProducer(ch chan oss.FileChunk, chunks []oss.FileChunk) {
for _, chunk := range chunks {
ch <- chunk
}
}

func SplitFile(fileSize int64) (chunks []oss.FileChunk, err error) {
for i := int64(1); i < 10; i++ {
if fileSize < i*utils.GB { // 文件大小小于iGB时分为i*100片
if chunks, err = SplitFileByPartNum(fileSize, int(i*100)); err != nil {
return
}
break
}
}
if fileSize > 9*utils.GB { // 文件大小大于9GB时分为1000片
if chunks, err = SplitFileByPartNum(fileSize, 1000); err != nil {
return
}
}
// 单个分片大小不能小于1MB
if chunks[0].Size < 1*utils.MB {
if chunks, err = SplitFileByPartSize(fileSize, 1*utils.MB); err != nil {
return
}
}
return
}

// SplitFileByPartNum splits big file into parts by the num of parts.
// Split the file with specified parts count, returns the split result when error is nil.
func SplitFileByPartNum(fileSize int64, chunkNum int) ([]oss.FileChunk, error) {
if chunkNum <= 0 || chunkNum > 10000 {
return nil, errors.New("chunkNum invalid")
}

if int64(chunkNum) > fileSize {
return nil, errors.New("oss: chunkNum invalid")
}

var chunks []oss.FileChunk
chunk := oss.FileChunk{}
chunkN := (int64)(chunkNum)
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * (fileSize / chunkN)
if i == chunkN-1 {
chunk.Size = fileSize/chunkN + fileSize%chunkN
} else {
chunk.Size = fileSize / chunkN
}
chunks = append(chunks, chunk)
}

return chunks, nil
}

// SplitFileByPartSize splits big file into parts by the size of parts.
// Splits the file by the part size. Returns the FileChunk when error is nil.
func SplitFileByPartSize(fileSize int64, chunkSize int64) ([]oss.FileChunk, error) {
if chunkSize <= 0 {
return nil, errors.New("chunkSize invalid")
}

chunkN := fileSize / chunkSize
if chunkN >= 10000 {
return nil, errors.New("Too many parts, please increase part size")
}

var chunks []oss.FileChunk
chunk := oss.FileChunk{}
for i := int64(0); i < chunkN; i++ {
chunk.Number = int(i + 1)
chunk.Offset = i * chunkSize
chunk.Size = chunkSize
chunks = append(chunks, chunk)
}

if fileSize%chunkSize > 0 {
chunk.Number = len(chunks) + 1
chunk.Offset = int64(len(chunks)) * chunkSize
chunk.Size = fileSize % chunkSize
chunks = append(chunks, chunk)
}

return chunks, nil
}

// OssOption get options
func OssOption(params *S3Params) []oss.Option {
options := []oss.Option{
oss.SetHeader(OssSecurityTokenHeaderName, params.SecurityToken),
oss.UserAgentHeader(OSSUserAgent),
}
return options
}

0 comments on commit ef5e192

Please sign in to comment.