Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Persistant Task #6925

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/u2takey/ffmpeg-go v0.5.0
github.com/upyun/go-sdk/v3 v3.0.4
github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5
github.com/xhofe/tache v0.1.1
github.com/xhofe/tache v0.1.2
github.com/xhofe/wopan-sdk-go v0.1.3
github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22
golang.org/x/crypto v0.25.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3K
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0=
github.com/xhofe/tache v0.1.1 h1:O5QY4cVjIGELx3UGh6LbVAc18MWGXgRNQjMt72x6w/8=
github.com/xhofe/tache v0.1.1/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/tache v0.1.2 h1:pHrXlrWcbTb4G7hVUDW7Rc+YTUnLJvnLBrdktVE1Fqg=
github.com/xhofe/tache v0.1.2/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/wopan-sdk-go v0.1.3 h1:J58X6v+n25ewBZjb05pKOr7AWGohb+Rdll4CThGh6+A=
github.com/xhofe/wopan-sdk-go v0.1.3/go.mod h1:dcY9yA28fnaoZPnXZiVTFSkcd7GnIPTpTIIlfSI5z5Q=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
Expand Down
12 changes: 7 additions & 5 deletions internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ func InitConfig() {
}
conf.Conf.TempDir = absPath
}
err := os.RemoveAll(filepath.Join(conf.Conf.TempDir))
if err != nil {
log.Errorln("failed delete temp file:", err)
}
err = os.MkdirAll(conf.Conf.TempDir, 0o777)
err := os.MkdirAll(conf.Conf.TempDir, 0o777)
if err != nil {
log.Fatalf("create temp dir error: %+v", err)
}
Expand Down Expand Up @@ -104,3 +100,9 @@ func initURL() {
}
conf.URL = u
}

func CleanTempDir() {
if err := os.RemoveAll(conf.Conf.TempDir); err != nil {
log.Errorln("failed delete temp file: ", err)
}
}
1 change: 1 addition & 0 deletions internal/bootstrap/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "github.com/alist-org/alist/v3/cmd/flags"
func InitData() {
initUser()
initSettings()
initTasks()
if flags.Dev {
initDevData()
initDevDo()
Expand Down
29 changes: 29 additions & 0 deletions internal/bootstrap/data/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package data

import (
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/model"
)

var initialTaskItems []model.TaskItem

func initTasks() {
InitialTasks()

for i := range initialTaskItems {
item := &initialTaskItems[i]
taskitem, _ := db.GetTaskDataByType(item.Key)
if taskitem == nil {
db.CreateTaskData(item)
}
}
}

func InitialTasks() []model.TaskItem {
initialTaskItems = []model.TaskItem{
{Key: "copy", PersistData: "[]"},
{Key: "download", PersistData: "[]"},
{Key: "transfer", PersistData: "[]"},
}
return initialTaskItems
}
12 changes: 8 additions & 4 deletions internal/bootstrap/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package bootstrap

import (
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool"
"github.com/xhofe/tache"
)

func InitTaskManager() {
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry))
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant), db.UpdateTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant), db.UpdateTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
CleanTempDir()
}
}
20 changes: 12 additions & 8 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ type LogConfig struct {
}

type TaskConfig struct {
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
TaskPersistant bool `json:"task_persistant" env:"TASK_PERSISTANT"`
}

type TasksConfig struct {
Expand Down Expand Up @@ -130,19 +131,22 @@ func DefaultConfig() *Config {
TlsInsecureSkipVerify: true,
Tasks: TasksConfig{
Download: TaskConfig{
Workers: 5,
MaxRetry: 1,
Workers: 5,
MaxRetry: 1,
TaskPersistant: true,
},
Transfer: TaskConfig{
Workers: 5,
MaxRetry: 2,
Workers: 5,
MaxRetry: 2,
TaskPersistant: true,
},
Upload: TaskConfig{
Workers: 5,
},
Copy: TaskConfig{
Workers: 5,
MaxRetry: 2,
Workers: 5,
MaxRetry: 2,
TaskPersistant: true,
},
},
Cors: Cors{
Expand Down
2 changes: 1 addition & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var db *gorm.DB

func Init(d *gorm.DB) {
db = d
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode))
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem))
if err != nil {
log.Fatalf("failed migrate database: %s", err.Error())
}
Expand Down
48 changes: 48 additions & 0 deletions internal/db/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package db

import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/pkg/errors"
)

func GetTaskDataByType(type_s string) (*model.TaskItem, error) {
task := model.TaskItem{Key: type_s}
if err := db.Where(task).First(&task).Error; err != nil {
return nil, errors.Wrapf(err, "failed find task")
}
return &task, nil
}

func UpdateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Model(&model.TaskItem{}).Where("key = ?", t.Key).Update("persist_data", t.PersistData).Error)
}

func CreateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Create(t).Error)
}

func GetTaskDataFunc(type_s string, enabled bool) func() ([]byte, error) {
if !enabled {
return nil
}
task, err := GetTaskDataByType(type_s)
if err != nil {
return nil
}
return func() ([]byte, error) {
return []byte(task.PersistData), nil
}
}

func UpdateTaskDataFunc(type_s string, enabled bool) func([]byte) error {
if !enabled {
return nil
}
return func(data []byte) error {
s := string(data)
if s == "null" || s == "" {
s = "[]"
}
return UpdateTaskData(&model.TaskItem{Key: type_s, PersistData: s})
}
}
50 changes: 34 additions & 16 deletions internal/fs/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package fs
import (
"context"
"fmt"
"net/http"
stdpath "path"

"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
Expand All @@ -11,28 +14,39 @@ import (
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
"github.com/xhofe/tache"
"net/http"
stdpath "path"
)

type CopyTask struct {
tache.Base
Status string `json:"status"`
srcStorage, dstStorage driver.Driver
srcObjPath, dstDirPath string
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
}

func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)",
t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath)
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}

func (t *CopyTask) GetStatus() string {
return t.Status
}

func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath)
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
}

var CopyTaskManager *tache.Manager[*CopyTask]
Expand Down Expand Up @@ -79,10 +93,12 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
}
// not in the same storage
t := &CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjActualPath,
dstDirPath: dstDirActualPath,
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
CopyTaskManager.Add(t)
return t, nil
Expand All @@ -107,10 +123,12 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src
srcObjPath := stdpath.Join(srcObjPath, obj.GetName())
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjPath,
dstDirPath: dstObjPath,
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjPath,
DstDirPath: dstObjPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
})
}
t.Status = "src object is dir, added all copy tasks of objs"
Expand Down
6 changes: 6 additions & 0 deletions internal/model/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package model

type TaskItem struct {
Key string `json:"key"`
PersistData string `gorm:"type:text" json:"persist_data"`
}
1 change: 1 addition & 0 deletions internal/offline_download/tool/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (tache.TaskWithInfo, error) {
DstDirPath: args.DstDirPath,
TempDir: tempDir,
DeletePolicy: deletePolicy,
Toolname: args.Tool,
tool: tool,
}
DownloadTaskManager.Add(t)
Expand Down
30 changes: 19 additions & 11 deletions internal/offline_download/tool/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ import (

type DownloadTask struct {
tache.Base
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`

Status string `json:"status"`
Signal chan int `json:"-"`
GID string `json:"-"`
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`
Toolname string `json:"toolname"`
Status string `json:"-"`
Signal chan int `json:"-"`
GID string `json:"-"`
tool Tool
callStatusRetried int
}

func (t *DownloadTask) Run() error {
if t.tool == nil {
tool, err := Tools.Get(t.Toolname)
if err != nil {
return errors.WithMessage(err, "failed get tool")
}
t.tool = tool
}
if err := t.tool.Run(t); !errs.IsNotSupportError(err) {
if err == nil {
return t.Complete()
Expand Down Expand Up @@ -142,9 +149,10 @@ func (t *DownloadTask) Complete() error {
file := files[i]
TransferTaskManager.Add(&TransferTask{
file: file,
dstDirPath: t.DstDirPath,
tempDir: t.TempDir,
deletePolicy: t.DeletePolicy,
DstDirPath: t.DstDirPath,
TempDir: t.TempDir,
DeletePolicy: t.DeletePolicy,
FileDir: file.Path,
})
}
return nil
Expand Down
Loading
Loading