Skip to content

Commit

Permalink
start refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
labkode committed Oct 24, 2024
1 parent ec06a7d commit 161dd8d
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 124 deletions.
187 changes: 81 additions & 106 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro

func (fs *cephfs) GetHome(ctx context.Context) (string, error) {
log := appctx.GetLogger(ctx)
user := fs.makeUser(ctx)
log.Debug().Interface("user", user).Msg("GetHome for user")
return user.home, nil
mount := fs.getMount(ctx)
log.Debug().Interface("mount", mount).Msg("GetHome")
return mount.GetHome(), nil
}

func (fs *cephfs) CreateHome(ctx context.Context) (err error) {
Expand Down Expand Up @@ -134,58 +134,53 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) {
}

func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error {
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
mount := fs.getMount(ctx)
path, err := fs.resolveRef(ref)
if err != nil {
return getRevaError(err)
}

user.op(func(cv *cacheVal) {
if err = cv.mount.MakeDir(path, fs.conf.DirPerms); err != nil {
return
}
})

return getRevaError(err)
if err = mount.MakeDir(path, fs.conf.DirPerms); err != nil {
return getRevaError(err)
}
return nil
}

func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) {
var path string
user := fs.makeUser(ctx)
path, err = user.resolveRef(ref)
func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) error {
path, err := fs.resolveRef(ref)
if err != nil {
return err
return getRevaError(err)
}

user.op(func(cv *cacheVal) {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
}
})
mount := fs.getMount(ctx)
err = mount.Unlink(path)
if err != nil && err.Error() == errIsADirectory {
err = mount.RemoveDir(path)
}

//has already been deleted by direct mount
if err != nil && err.Error() == errNotFound {
if err != nil && err.Error() != errNotFound {
return nil
}

return getRevaError(err)
// has already been deleted by direct mount
return nil
}

func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) {
var oldPath, newPath string
user := fs.makeUser(ctx)
if oldPath, err = user.resolveRef(oldRef); err != nil {
return
func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) error {
oldPath, err := fs.resolveRef(oldRef)
if err != nil {
return getRevaError(err)
}
if newPath, err = user.resolveRef(newRef); err != nil {
return

newPath, err := fs.resolveRef(newRef)
if err != nil {
return getRevaError(err)
}

user.op(func(cv *cacheVal) {
if err = cv.mount.Rename(oldPath, newPath); err != nil {
return
}
})
mount := fs.getMount(ctx)
if err := mount.Rename(oldPath, newPath); err != nil {
return getRevaError(err)
}

// has already been moved by direct mount
if err != nil && err.Error() == errNotFound {
Expand All @@ -195,88 +190,68 @@ func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference)
return getRevaError(err)
}

func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (ri *provider.ResourceInfo, err error) {
if ref == nil {
return nil, errors.New("error: ref is nil")
func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) {
path, err := fs.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}

var path string
user := fs.makeUser(ctx)
mount := fs.getMount(ctx)

if path, err = user.resolveRef(ref); err != nil {
return nil, err
stat, err := mount.Statx(path, goceph.StatxBasicStats, 0)
if err != nil {
return nil, getRevaError(err)
}

user.op(func(cv *cacheVal) {
var stat Statx
if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil {
return
}
ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys)
})
ri, err := fs.fileAsResourceInfo(mount, path, stat, mdKeys)
if err != nil {
return nil, err
}

return ri, getRevaError(err)
}

func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) {
if ref == nil {
return nil, errors.New("error: ref is nil")
func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) ([]*provider.ResourceInfo, error) {
path, err := fs.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}

log := appctx.GetLogger(ctx)
log.Debug().Interface("ref", ref)
fmt.Println("debugging: listing folder", ref)
user := fs.makeUser(ctx)
fmt.Println("debugging: user", user)

fmt.Println("debugging: ceph got", ref)
var path string
if path, err = user.resolveRef(ref); err != nil {
return nil, err
mount := fs.getMount(ctx)
dir, err := mount.OpenDir(path)
if err != nil {
return nil, getRevaError(err)
}
fmt.Println("debugging: listing folder after user resolv ref", path)

user.op(func(cv *cacheVal) {
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(path); err != nil {
fmt.Println(err)
return
}
defer closeDir(dir)
defer closeDir(dir)

fmt.Println("debugging: dir obtained ", dir)
var entry *goceph.DirEntryPlus
var ri *provider.ResourceInfo

var entry *goceph.DirEntryPlus
var ri *provider.ResourceInfo

for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
if fs.conf.HiddenDirs[entry.Name()] {
continue
}
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
if fs.conf.HiddenDirs[entry.Name()] {
continue
}

fmt.Println("debugging: inside ReadDirPlus, before user.fileAsResourceInfo", cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys)
ri, err = user.fileAsResourceInfo(cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys)
fmt.Println("debugging: inside ReadDirPlus, after user.fileAsResourceInfo", cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys)
if ri == nil || err != nil {
if err != nil {
log := appctx.GetLogger(ctx)
log.Err(err).Msg("cephfs: error in file as resource info")
}
err = nil
continue
ri, err = fs.fileAsResourceInfo(mount, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys)
if ri == nil || err != nil {
if err != nil {
log := appctx.GetLogger(ctx)
log.Err(err).Msg("cephfs: error in file as resource info")
}

files = append(files, ri)
err = nil
continue
}
})

files = append(files, ri)
}

return files, getRevaError(err)
}

func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (rc io.ReadCloser, err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return nil, errors.Wrap(err, "cephfs: error resolving ref")
}

Expand Down Expand Up @@ -306,7 +281,7 @@ func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (str
func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return
}

Expand All @@ -320,7 +295,7 @@ func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *prov
func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return
}

Expand All @@ -334,7 +309,7 @@ func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *p
func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return
}

Expand All @@ -348,7 +323,7 @@ func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *p
func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *provider.Grantee) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return
}

Expand All @@ -363,7 +338,7 @@ func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *pro
func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glist []*provider.Grant, err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return
}

Expand Down Expand Up @@ -418,7 +393,7 @@ func (fs *cephfs) Shutdown(ctx context.Context) (err error) {
func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return err
}

Expand All @@ -440,7 +415,7 @@ func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere
func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) {
var path string
user := fs.makeUser(ctx)
if path, err = user.resolveRef(ref); err != nil {
if path, err = fs.resolveRef(ref); err != nil {
return err
}

Expand All @@ -461,7 +436,7 @@ func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Refe

func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error {
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
path, err := fs.resolveRef(ref)
if err != nil {
return getRevaError(err)
}
Expand Down Expand Up @@ -534,7 +509,7 @@ func decodeLock(content string) (*provider.Lock, error) {

func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
path, err := fs.resolveRef(ref)
if err != nil {
return getRevaError(err)
}
Expand Down Expand Up @@ -569,7 +544,7 @@ func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *pr

func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) {
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
path, err := fs.resolveRef(ref)
if err != nil {
return nil, getRevaError(err)
}
Expand Down Expand Up @@ -665,7 +640,7 @@ func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newL

func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error {
user := fs.makeUser(ctx)
path, err := user.resolveRef(ref)
path, err := fs.resolveRef(ref)
if err != nil {
return getRevaError(err)
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/storage/fs/cephfs/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type ChunkHandler struct {

// NewChunkHandler creates a handler for chunked uploads.
func NewChunkHandler(ctx context.Context, fs *cephfs) *ChunkHandler {
fmt.Println("debugging NewChunkHandler", fs.makeUser(ctx), fs.conf.UploadFolder)
u := fs.makeUser(ctx)
return &ChunkHandler{u, path.Join(u.home, fs.conf.UploadFolder)}
}
Expand Down Expand Up @@ -125,7 +124,6 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
// err = fmt.Errorf("error getting transfer folder anme", err)
return
}
fmt.Println("debugging: transferfoldername", transferFolderName)

// here we write a temporary file that will be renamed to the transfer folder
// with the correct sequence number filename.
Expand All @@ -135,7 +133,6 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
c.user.op(func(cv *cacheVal) {
var tmpFile *goceph.File
target := filepath.Join(c.uploadFolder, tmpFilename)
fmt.Println("debugging savechunk, target: ", target)
tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
defer closeFile(tmpFile)
if err != nil {
Expand Down
Loading

0 comments on commit 161dd8d

Please sign in to comment.