diff --git a/go.mod b/go.mod index 8bed00741b55..318da35057a1 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/u2takey/ffmpeg-go v0.5.0 github.com/upyun/go-sdk/v3 v3.0.4 github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5 - github.com/xhofe/tache v0.1.1 + github.com/xhofe/tache v0.1.2 github.com/xhofe/wopan-sdk-go v0.1.3 github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22 golang.org/x/crypto v0.25.0 diff --git a/go.sum b/go.sum index d82af70ecf21..9de941c5e858 100644 --- a/go.sum +++ b/go.sum @@ -506,6 +506,8 @@ github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3K github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0= github.com/xhofe/tache v0.1.1 h1:O5QY4cVjIGELx3UGh6LbVAc18MWGXgRNQjMt72x6w/8= github.com/xhofe/tache v0.1.1/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= +github.com/xhofe/tache v0.1.2 h1:pHrXlrWcbTb4G7hVUDW7Rc+YTUnLJvnLBrdktVE1Fqg= +github.com/xhofe/tache v0.1.2/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ= github.com/xhofe/wopan-sdk-go v0.1.3 h1:J58X6v+n25ewBZjb05pKOr7AWGohb+Rdll4CThGh6+A= github.com/xhofe/wopan-sdk-go v0.1.3/go.mod h1:dcY9yA28fnaoZPnXZiVTFSkcd7GnIPTpTIIlfSI5z5Q= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= diff --git a/internal/bootstrap/config.go b/internal/bootstrap/config.go index 2b7e9e135062..ff36509cf5f9 100644 --- a/internal/bootstrap/config.go +++ b/internal/bootstrap/config.go @@ -68,11 +68,7 @@ func InitConfig() { } conf.Conf.TempDir = absPath } - err := os.RemoveAll(filepath.Join(conf.Conf.TempDir)) - if err != nil { - log.Errorln("failed delete temp file:", err) - } - err = os.MkdirAll(conf.Conf.TempDir, 0o777) + err := os.MkdirAll(conf.Conf.TempDir, 0o777) if err != nil { log.Fatalf("create temp dir error: %+v", err) } @@ -104,3 +100,9 @@ func initURL() { } conf.URL = u } + +func CleanTempDir() { + if err := os.RemoveAll(conf.Conf.TempDir); err != nil { + log.Errorln("failed delete temp file: ", err) + } +} diff --git a/internal/bootstrap/data/data.go b/internal/bootstrap/data/data.go index 6c77ebf23855..c2170d2f4795 100644 --- a/internal/bootstrap/data/data.go +++ b/internal/bootstrap/data/data.go @@ -5,6 +5,7 @@ import "github.com/alist-org/alist/v3/cmd/flags" func InitData() { initUser() initSettings() + initTasks() if flags.Dev { initDevData() initDevDo() diff --git a/internal/bootstrap/data/setting.go b/internal/bootstrap/data/setting.go index 75244d840275..0a29b52d10fd 100644 --- a/internal/bootstrap/data/setting.go +++ b/internal/bootstrap/data/setting.go @@ -144,6 +144,7 @@ func InitialSettings() []model.SettingItem { {Key: conf.ForwardDirectLinkParams, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL}, {Key: conf.IgnoreDirectLinkParams, Value: "sign,alist_ts", Type: conf.TypeString, Group: model.GLOBAL}, {Key: conf.WebauthnLoginEnabled, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC}, + {Key: conf.IsTaskPersistant, Value: "true", Type: conf.TypeBool, Group: model.GLOBAL}, // single settings {Key: conf.Token, Value: token, Type: conf.TypeString, Group: model.SINGLE, Flag: model.PRIVATE}, diff --git a/internal/bootstrap/data/task.go b/internal/bootstrap/data/task.go new file mode 100644 index 000000000000..7100e2e25c11 --- /dev/null +++ b/internal/bootstrap/data/task.go @@ -0,0 +1,29 @@ +package data + +import ( + "github.com/alist-org/alist/v3/internal/db" + "github.com/alist-org/alist/v3/internal/model" +) + +var initialTaskItems []model.TaskItem + +func initTasks() { + InitialTasks() + + for i := range initialTaskItems { + item := &initialTaskItems[i] + taskitem, _ := db.GetTaskDataByType(item.Key) + if taskitem == nil { + db.CreateTaskData(item) + } + } +} + +func InitialTasks() []model.TaskItem { + initialTaskItems = []model.TaskItem{ + {Key: "copy", PersistData: "[]"}, + {Key: "download", PersistData: "[]"}, + {Key: "transfer", PersistData: "[]"}, + } + return initialTaskItems +} diff --git a/internal/bootstrap/task.go b/internal/bootstrap/task.go index 5d52e9d2ef88..04877f9a32f6 100644 --- a/internal/bootstrap/task.go +++ b/internal/bootstrap/task.go @@ -2,14 +2,20 @@ package bootstrap import ( "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/db" "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/offline_download/tool" + "github.com/alist-org/alist/v3/internal/setting" "github.com/xhofe/tache" ) func InitTaskManager() { - fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) - fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) - tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry)) - tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry)) + enabled := setting.GetBool(conf.IsTaskPersistant) + fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist + fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("copy", enabled), db.UpdateTaskDataFunc("copy", enabled)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry)) + tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("download", enabled), db.UpdateTaskDataFunc("download", enabled)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry)) + tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("transfer", enabled), db.UpdateTaskDataFunc("transfer", enabled)), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry)) + if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted + CleanTempDir() + } } diff --git a/internal/conf/const.go b/internal/conf/const.go index 2d53702e91a0..565e52e7a6ed 100644 --- a/internal/conf/const.go +++ b/internal/conf/const.go @@ -30,6 +30,7 @@ const ( ProxyIgnoreHeaders = "proxy_ignore_headers" AudioAutoplay = "audio_autoplay" VideoAutoplay = "video_autoplay" + IsTaskPersistant = "enable_persistant_task" // global HideFiles = "hide_files" diff --git a/internal/db/db.go b/internal/db/db.go index cd3905ffc929..2df58d3760b0 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -12,7 +12,7 @@ var db *gorm.DB func Init(d *gorm.DB) { db = d - err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode)) + err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem)) if err != nil { log.Fatalf("failed migrate database: %s", err.Error()) } diff --git a/internal/db/tasks.go b/internal/db/tasks.go new file mode 100644 index 000000000000..b41b3974658b --- /dev/null +++ b/internal/db/tasks.go @@ -0,0 +1,52 @@ +package db + +import ( + "github.com/alist-org/alist/v3/internal/model" + "github.com/pkg/errors" +) + +func GetTaskDataByType(type_s string) (*model.TaskItem, error) { + task := model.TaskItem{Key: type_s} + if err := db.Where(task).First(&task).Error; err != nil { + return nil, errors.Wrapf(err, "failed find task") + } + return &task, nil +} + +func UpdateTaskData(t *model.TaskItem) error { + return errors.WithStack(db.Model(&model.TaskItem{}).Where("key = ?", t.Key).Update("persist_data", t.PersistData).Error) +} + +func CreateTaskData(t *model.TaskItem) error { + return errors.WithStack(db.Create(t).Error) +} + +func GetTaskDataFunc(type_s string, enabled bool) func() ([]byte, error) { + if !enabled { + return func() ([]byte, error) { + return []byte("[]"), nil + } + } + task, err := GetTaskDataByType(type_s) + if err != nil { + return nil + } + return func() ([]byte, error) { + return []byte(task.PersistData), nil + } +} + +func UpdateTaskDataFunc(type_s string, enabled bool) func([]byte) error { + if !enabled { + return func([]byte) error { + return nil + } + } + return func(data []byte) error { + s := string(data) + if s == "null" || s == "" { + s = "[]" + } + return UpdateTaskData(&model.TaskItem{Key: type_s, PersistData: s}) + } +} diff --git a/internal/fs/copy.go b/internal/fs/copy.go index 25f068f0c403..38407c9a8639 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -3,6 +3,9 @@ package fs import ( "context" "fmt" + "net/http" + stdpath "path" + "github.com/alist-org/alist/v3/internal/conf" "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" @@ -11,20 +14,21 @@ import ( "github.com/alist-org/alist/v3/pkg/utils" "github.com/pkg/errors" "github.com/xhofe/tache" - "net/http" - stdpath "path" ) type CopyTask struct { tache.Base - Status string `json:"status"` - srcStorage, dstStorage driver.Driver - srcObjPath, dstDirPath string + Status string `json:"-"` //don't save status to save space + SrcObjPath string `json:"src_path"` + DstDirPath string `json:"dst_path"` + srcStorage driver.Driver `json:"-"` + dstStorage driver.Driver `json:"-"` + SrcStorageMp string `json:"src_storage_mp"` + DstStorageMp string `json:"dst_storage_mp"` } func (t *CopyTask) GetName() string { - return fmt.Sprintf("copy [%s](%s) to [%s](%s)", - t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath) + return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) } func (t *CopyTask) GetStatus() string { @@ -32,7 +36,17 @@ func (t *CopyTask) GetStatus() string { } func (t *CopyTask) Run() error { - return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath) + var err error + if t.srcStorage == nil { + t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) + } + if t.dstStorage == nil { + t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) + } + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) } var CopyTaskManager *tache.Manager[*CopyTask] @@ -79,10 +93,12 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool } // not in the same storage t := &CopyTask{ - srcStorage: srcStorage, - dstStorage: dstStorage, - srcObjPath: srcObjActualPath, - dstDirPath: dstDirActualPath, + srcStorage: srcStorage, + dstStorage: dstStorage, + SrcObjPath: srcObjActualPath, + DstDirPath: dstDirActualPath, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, } CopyTaskManager.Add(t) return t, nil @@ -107,10 +123,12 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src srcObjPath := stdpath.Join(srcObjPath, obj.GetName()) dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName()) CopyTaskManager.Add(&CopyTask{ - srcStorage: srcStorage, - dstStorage: dstStorage, - srcObjPath: srcObjPath, - dstDirPath: dstObjPath, + srcStorage: srcStorage, + dstStorage: dstStorage, + SrcObjPath: srcObjPath, + DstDirPath: dstObjPath, + SrcStorageMp: srcStorage.GetStorage().MountPath, + DstStorageMp: dstStorage.GetStorage().MountPath, }) } t.Status = "src object is dir, added all copy tasks of objs" diff --git a/internal/model/task.go b/internal/model/task.go new file mode 100644 index 000000000000..8a87c5a5062e --- /dev/null +++ b/internal/model/task.go @@ -0,0 +1,6 @@ +package model + +type TaskItem struct { + Key string `json:"key"` + PersistData string `gorm:"type:text" json:"persist_data"` +} diff --git a/internal/offline_download/tool/add.go b/internal/offline_download/tool/add.go index e9bcdc50e522..372c73a6cc78 100644 --- a/internal/offline_download/tool/add.go +++ b/internal/offline_download/tool/add.go @@ -76,6 +76,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (tache.TaskWithInfo, error) { DstDirPath: args.DstDirPath, TempDir: tempDir, DeletePolicy: deletePolicy, + Toolname: args.Tool, tool: tool, } DownloadTaskManager.Add(t) diff --git a/internal/offline_download/tool/download.go b/internal/offline_download/tool/download.go index 79a29ef0c9a9..c778d93632e9 100644 --- a/internal/offline_download/tool/download.go +++ b/internal/offline_download/tool/download.go @@ -14,19 +14,26 @@ import ( type DownloadTask struct { tache.Base - Url string `json:"url"` - DstDirPath string `json:"dst_dir_path"` - TempDir string `json:"temp_dir"` - DeletePolicy DeletePolicy `json:"delete_policy"` - - Status string `json:"status"` - Signal chan int `json:"-"` - GID string `json:"-"` + Url string `json:"url"` + DstDirPath string `json:"dst_dir_path"` + TempDir string `json:"temp_dir"` + DeletePolicy DeletePolicy `json:"delete_policy"` + Toolname string `json:"toolname"` + Status string `json:"-"` + Signal chan int `json:"-"` + GID string `json:"-"` tool Tool callStatusRetried int } func (t *DownloadTask) Run() error { + if t.tool == nil { + tool, err := Tools.Get(t.Toolname) + if err != nil { + return errors.WithMessage(err, "failed get tool") + } + t.tool = tool + } if err := t.tool.Run(t); !errs.IsNotSupportError(err) { if err == nil { return t.Complete() @@ -142,9 +149,10 @@ func (t *DownloadTask) Complete() error { file := files[i] TransferTaskManager.Add(&TransferTask{ file: file, - dstDirPath: t.DstDirPath, - tempDir: t.TempDir, - deletePolicy: t.DeletePolicy, + DstDirPath: t.DstDirPath, + TempDir: t.TempDir, + DeletePolicy: t.DeletePolicy, + FileDir: file.Path, }) } return nil diff --git a/internal/offline_download/tool/transfer.go b/internal/offline_download/tool/transfer.go index 0ef58df5019e..3744c7b500f7 100644 --- a/internal/offline_download/tool/transfer.go +++ b/internal/offline_download/tool/transfer.go @@ -2,6 +2,9 @@ package tool import ( "fmt" + "os" + "path/filepath" + "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" "github.com/alist-org/alist/v3/internal/stream" @@ -9,21 +12,27 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/xhofe/tache" - "os" - "path/filepath" ) type TransferTask struct { tache.Base + FileDir string `json:"file_dir"` + DstDirPath string `json:"dst_dir_path"` + TempDir string `json:"temp_dir"` + DeletePolicy DeletePolicy `json:"delete_policy"` file File - dstDirPath string - tempDir string - deletePolicy DeletePolicy } func (t *TransferTask) Run() error { // check dstDir again - storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.dstDirPath) + var err error + if (t.file == File{}) { + t.file, err = GetFile(t.FileDir) + if err != nil { + return errors.Wrapf(err, "failed to get file %s", t.FileDir) + } + } + storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.DstDirPath) if err != nil { return errors.WithMessage(err, "failed get storage") } @@ -44,7 +53,7 @@ func (t *TransferTask) Run() error { Mimetype: mimetype, Closers: utils.NewClosers(rc), } - relDir, err := filepath.Rel(t.tempDir, filepath.Dir(t.file.Path)) + relDir, err := filepath.Rel(t.TempDir, filepath.Dir(t.file.Path)) if err != nil { log.Errorf("find relation directory error: %v", err) } @@ -53,7 +62,7 @@ func (t *TransferTask) Run() error { } func (t *TransferTask) GetName() string { - return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.dstDirPath) + return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.DstDirPath) } func (t *TransferTask) GetStatus() string { @@ -61,7 +70,7 @@ func (t *TransferTask) GetStatus() string { } func (t *TransferTask) OnSucceeded() { - if t.deletePolicy == DeleteOnUploadSucceed || t.deletePolicy == DeleteAlways { + if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways { err := os.Remove(t.file.Path) if err != nil { log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error()) @@ -70,7 +79,7 @@ func (t *TransferTask) OnSucceeded() { } func (t *TransferTask) OnFailed() { - if t.deletePolicy == DeleteOnUploadFailed || t.deletePolicy == DeleteAlways { + if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways { err := os.Remove(t.file.Path) if err != nil { log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error()) diff --git a/internal/offline_download/tool/util.go b/internal/offline_download/tool/util.go index 4258eff61e02..b2c6ec02bfa1 100644 --- a/internal/offline_download/tool/util.go +++ b/internal/offline_download/tool/util.go @@ -26,3 +26,16 @@ func GetFiles(dir string) ([]File, error) { } return files, nil } + +func GetFile(path string) (File, error) { + info, err := os.Stat(path) + if err != nil { + return File{}, err + } + return File{ + Name: info.Name(), + Size: info.Size(), + Path: path, + Modified: info.ModTime(), + }, nil +}