Skip to content

Commit

Permalink
fix: storage used double calc for the netdisk mode
Browse files Browse the repository at this point in the history
  • Loading branch information
saltbo committed Jul 18, 2021
1 parent 2c4ae11 commit 06d3076
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 76 deletions.
2 changes: 1 addition & 1 deletion internal/app/api/matter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rs *FileResource) Register(router *gin.RouterGroup) {
router.PATCH("/matters/:alias/location", rs.move)
router.PATCH("/matters/:alias/duplicate", rs.copy)
router.DELETE("/matters/:alias", rs.delete)
rs.fs.Start()
rs.fs.StartFileAutoDoneWorker()
}

func (rs *FileResource) findAll(c *gin.Context) {
Expand Down
8 changes: 2 additions & 6 deletions internal/app/dao/matter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (ms *Matter) FindUserMatter(uid int64, alias string) (*model.Matter, error)

func (ms *Matter) Uploaded(matter *model.Matter, incrUsed bool) error {
fc := func(tx *gorm.DB) error {
if err := tx.First(matter).Update("uploaded_at", time.Now()).Error; err != nil {
if err := tx.First(matter).Where("uploaded_at is null").Update("uploaded_at", time.Now()).Error; err != nil {
return err
}

Expand All @@ -103,11 +103,7 @@ func (ms *Matter) Uploaded(matter *model.Matter, incrUsed bool) error {

// update the storage used of the user
expr := gorm.Expr("used+?", matter.Size)
if err := tx.Model(&model.UserStorage{}).Where("uid=?", matter.Uid).Update("used", expr).Error; err != nil {
return err
}

return nil
return tx.Model(&model.UserStorage{}).Where("uid=?", matter.Uid).Update("used", expr).Error
}

return gdb.Transaction(fc)
Expand Down
7 changes: 2 additions & 5 deletions internal/pkg/fakefs/ffs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type FakeFS struct {
worker *Worker
dMatter *dao.Matter

sFile *File
Expand All @@ -20,16 +19,15 @@ type FakeFS struct {

func New() *FakeFS {
return &FakeFS{
worker: NewWorker(),
dMatter: dao.NewMatter(),

sFile: NewFile(),
sFolder: NewFolder(),
}
}

func (fs *FakeFS) Start() {
go fs.worker.Run()
func (fs *FakeFS) StartFileAutoDoneWorker() {
go fs.sFile.RunFileAutoDoneWorker()
}

func (fs *FakeFS) List(uid int64, qp *bind.QueryFiles) (list []model.Matter, total int64, err error) {
Expand Down Expand Up @@ -128,7 +126,6 @@ func (fs *FakeFS) CreateFile(m *model.Matter) (interface{}, error) {
return nil, err
}

fs.worker.WaitDone(m, fs.TagUploadDone)
return gin.H{
"alias": m.Alias,
"object": m.Object,
Expand Down
15 changes: 13 additions & 2 deletions internal/pkg/fakefs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ import (
type File struct {
dMatter *dao.Matter

sStorage *service.Storage
sStorage *service.Storage
fileWaiter *FileWaiter
}

func NewFile() *File {
return &File{
dMatter: dao.NewMatter(),

sStorage: service.NewStorage(),
sStorage: service.NewStorage(),
fileWaiter: NewFileWaiter(),
}
}

func (f *File) RunFileAutoDoneWorker() error {
return f.fileWaiter.Run()
}

func (f *File) PreSignPutURL(matter *model.Matter) (url string, headers http.Header, err error) {
if !f.dMatter.ParentExist(matter.Uid, matter.Parent) {
return "", nil, fmt.Errorf("dir does not exists")
Expand All @@ -52,6 +58,11 @@ func (f *File) PreSignPutURL(matter *model.Matter) (url string, headers http.Hea
return "", nil, err
}

// 只有外链盘才有自动标记上传完成的逻辑
if storage.Mode == model.StorageModeOutline {
f.fileWaiter.Wait(provider, matter, f.UploadDone)
}

url, headers, err = provider.SignedPutURL(matter.Object, matter.Type, matter.Size, storage.PublicRead())
if err != nil {
return
Expand Down
65 changes: 65 additions & 0 deletions internal/pkg/fakefs/filewaiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package fakefs

import (
"fmt"
"math/rand"
"time"

"github.com/saltbo/zpan/internal/app/model"
"github.com/saltbo/zpan/internal/pkg/provider"
)

const maxHeadIntervalSec = 5

type AutoDoneMsg struct {
Provider provider.Provider
Matter *model.Matter
Handler func(uid int64, alias string) (*model.Matter, error)
}

type FileWaiter struct {
ch chan *AutoDoneMsg
}

func NewFileWaiter() *FileWaiter {
return &FileWaiter{
ch: make(chan *AutoDoneMsg),
}
}

func (w *FileWaiter) Run() error {
for m := range w.ch {
go w.runWait(m)
}

return nil
}

// fixme: 如果在外链上传期间服务重启了,将永远无法标记上传完成

func (w *FileWaiter) Wait(p provider.Provider, m *model.Matter, f func(uid int64, alias string) (*model.Matter, error)) {
w.ch <- &AutoDoneMsg{Provider: p, Matter: m, Handler: f}
}

func (w *FileWaiter) runWait(adm *AutoDoneMsg) {
startAt := time.Now()
for {
// 如果超过上传有效期仍然没有上传完成则判定为失败,不再等待
if startAt.Sub(time.Now()) > time.Hour {
break
}

s := time.Now()
if _, err := adm.Provider.Head(adm.Matter.Object); err != nil {
// 加一个时间限制,控制请求频率
if time.Now().Sub(s).Seconds() < maxHeadIntervalSec {
time.Sleep(time.Second * time.Duration(rand.Intn(maxHeadIntervalSec)))
}
continue
}

adm.Handler(adm.Matter.Uid, adm.Matter.Alias)
fmt.Printf("object %s uploaed\n", adm.Matter.Object)
return
}
}
62 changes: 0 additions & 62 deletions internal/pkg/fakefs/worker.go

This file was deleted.

0 comments on commit 06d3076

Please sign in to comment.