From 161dd8dd4dc9aca37a48246217ded6a628dbf2aa Mon Sep 17 00:00:00 2001 From: Hugo Gonzalez Labrador Date: Thu, 24 Oct 2024 13:43:51 +0200 Subject: [PATCH] start refactoring --- pkg/storage/fs/cephfs/cephfs.go | 187 ++++++++++++--------------- pkg/storage/fs/cephfs/chunking.go | 3 - pkg/storage/fs/cephfs/connections.go | 23 +++- pkg/storage/fs/cephfs/upload.go | 4 - pkg/storage/fs/cephfs/user.go | 17 ++- 5 files changed, 110 insertions(+), 124 deletions(-) diff --git a/pkg/storage/fs/cephfs/cephfs.go b/pkg/storage/fs/cephfs/cephfs.go index 76a0467e2f..83b13495dc 100644 --- a/pkg/storage/fs/cephfs/cephfs.go +++ b/pkg/storage/fs/cephfs/cephfs.go @@ -90,9 +90,9 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro func (fs *cephfs) GetHome(ctx context.Context) (string, error) { log := appctx.GetLogger(ctx) - user := fs.makeUser(ctx) - log.Debug().Interface("user", user).Msg("GetHome for user") - return user.home, nil + mount := fs.getMount(ctx) + log.Debug().Interface("mount", mount).Msg("GetHome") + return mount.GetHome(), nil } func (fs *cephfs) CreateHome(ctx context.Context) (err error) { @@ -134,58 +134,53 @@ func (fs *cephfs) CreateHome(ctx context.Context) (err error) { } func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error { - user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) + mount := fs.getMount(ctx) + path, err := fs.resolveRef(ref) if err != nil { return getRevaError(err) } - user.op(func(cv *cacheVal) { - if err = cv.mount.MakeDir(path, fs.conf.DirPerms); err != nil { - return - } - }) - - return getRevaError(err) + if err = mount.MakeDir(path, fs.conf.DirPerms); err != nil { + return getRevaError(err) + } + return nil } -func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) { - var path string - user := fs.makeUser(ctx) - path, err = user.resolveRef(ref) +func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) error { + path, err := fs.resolveRef(ref) if err != nil { - return err + return getRevaError(err) } - user.op(func(cv *cacheVal) { - if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory { - err = cv.mount.RemoveDir(path) - } - }) + mount := fs.getMount(ctx) + err = mount.Unlink(path) + if err != nil && err.Error() == errIsADirectory { + err = mount.RemoveDir(path) + } - //has already been deleted by direct mount - if err != nil && err.Error() == errNotFound { + if err != nil && err.Error() != errNotFound { return nil } - return getRevaError(err) + // has already been deleted by direct mount + return nil } -func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) (err error) { - var oldPath, newPath string - user := fs.makeUser(ctx) - if oldPath, err = user.resolveRef(oldRef); err != nil { - return +func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) error { + oldPath, err := fs.resolveRef(oldRef) + if err != nil { + return getRevaError(err) } - if newPath, err = user.resolveRef(newRef); err != nil { - return + + newPath, err := fs.resolveRef(newRef) + if err != nil { + return getRevaError(err) } - user.op(func(cv *cacheVal) { - if err = cv.mount.Rename(oldPath, newPath); err != nil { - return - } - }) + mount := fs.getMount(ctx) + if err := mount.Rename(oldPath, newPath); err != nil { + return getRevaError(err) + } // has already been moved by direct mount if err != nil && err.Error() == errNotFound { @@ -195,80 +190,60 @@ func (fs *cephfs) Move(ctx context.Context, oldRef, newRef *provider.Reference) return getRevaError(err) } -func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (ri *provider.ResourceInfo, err error) { - if ref == nil { - return nil, errors.New("error: ref is nil") +func (fs *cephfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []string) (*provider.ResourceInfo, error) { + path, err := fs.resolveRef(ref) + if err != nil { + return nil, getRevaError(err) } - var path string - user := fs.makeUser(ctx) + mount := fs.getMount(ctx) - if path, err = user.resolveRef(ref); err != nil { - return nil, err + stat, err := mount.Statx(path, goceph.StatxBasicStats, 0) + if err != nil { + return nil, getRevaError(err) } - user.op(func(cv *cacheVal) { - var stat Statx - if stat, err = cv.mount.Statx(path, goceph.StatxBasicStats, 0); err != nil { - return - } - ri, err = user.fileAsResourceInfo(cv, path, stat, mdKeys) - }) + ri, err := fs.fileAsResourceInfo(mount, path, stat, mdKeys) + if err != nil { + return nil, err + } return ri, getRevaError(err) } -func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) (files []*provider.ResourceInfo, err error) { - if ref == nil { - return nil, errors.New("error: ref is nil") +func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKeys []string) ([]*provider.ResourceInfo, error) { + path, err := fs.resolveRef(ref) + if err != nil { + return nil, getRevaError(err) } - log := appctx.GetLogger(ctx) - log.Debug().Interface("ref", ref) - fmt.Println("debugging: listing folder", ref) - user := fs.makeUser(ctx) - fmt.Println("debugging: user", user) - - fmt.Println("debugging: ceph got", ref) - var path string - if path, err = user.resolveRef(ref); err != nil { - return nil, err + mount := fs.getMount(ctx) + dir, err := mount.OpenDir(path) + if err != nil { + return nil, getRevaError(err) } - fmt.Println("debugging: listing folder after user resolv ref", path) - - user.op(func(cv *cacheVal) { - var dir *goceph.Directory - if dir, err = cv.mount.OpenDir(path); err != nil { - fmt.Println(err) - return - } - defer closeDir(dir) + defer closeDir(dir) - fmt.Println("debugging: dir obtained ", dir) + var entry *goceph.DirEntryPlus + var ri *provider.ResourceInfo - var entry *goceph.DirEntryPlus - var ri *provider.ResourceInfo - - for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { - if fs.conf.HiddenDirs[entry.Name()] { - continue - } + for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) { + if fs.conf.HiddenDirs[entry.Name()] { + continue + } - fmt.Println("debugging: inside ReadDirPlus, before user.fileAsResourceInfo", cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) - ri, err = user.fileAsResourceInfo(cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) - fmt.Println("debugging: inside ReadDirPlus, after user.fileAsResourceInfo", cv, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) - if ri == nil || err != nil { - if err != nil { - log := appctx.GetLogger(ctx) - log.Err(err).Msg("cephfs: error in file as resource info") - } - err = nil - continue + ri, err = fs.fileAsResourceInfo(mount, filepath.Join(path, entry.Name()), entry.Statx(), mdKeys) + if ri == nil || err != nil { + if err != nil { + log := appctx.GetLogger(ctx) + log.Err(err).Msg("cephfs: error in file as resource info") } - - files = append(files, ri) + err = nil + continue } - }) + + files = append(files, ri) + } return files, getRevaError(err) } @@ -276,7 +251,7 @@ func (fs *cephfs) ListFolder(ctx context.Context, ref *provider.Reference, mdKey func (fs *cephfs) Download(ctx context.Context, ref *provider.Reference) (rc io.ReadCloser, err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return nil, errors.Wrap(err, "cephfs: error resolving ref") } @@ -306,7 +281,7 @@ func (fs *cephfs) GetPathByID(ctx context.Context, id *provider.ResourceId) (str func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return } @@ -320,7 +295,7 @@ func (fs *cephfs) AddGrant(ctx context.Context, ref *provider.Reference, g *prov func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return } @@ -334,7 +309,7 @@ func (fs *cephfs) RemoveGrant(ctx context.Context, ref *provider.Reference, g *p func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return } @@ -348,7 +323,7 @@ func (fs *cephfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *p func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *provider.Grantee) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return } @@ -363,7 +338,7 @@ func (fs *cephfs) DenyGrant(ctx context.Context, ref *provider.Reference, g *pro func (fs *cephfs) ListGrants(ctx context.Context, ref *provider.Reference) (glist []*provider.Grant, err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return } @@ -418,7 +393,7 @@ func (fs *cephfs) Shutdown(ctx context.Context) (err error) { func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return err } @@ -440,7 +415,7 @@ func (fs *cephfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) (err error) { var path string user := fs.makeUser(ctx) - if path, err = user.resolveRef(ref); err != nil { + if path, err = fs.resolveRef(ref); err != nil { return err } @@ -461,7 +436,7 @@ func (fs *cephfs) UnsetArbitraryMetadata(ctx context.Context, ref *provider.Refe func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error { user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) + path, err := fs.resolveRef(ref) if err != nil { return getRevaError(err) } @@ -534,7 +509,7 @@ func decodeLock(content string) (*provider.Lock, error) { func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) + path, err := fs.resolveRef(ref) if err != nil { return getRevaError(err) } @@ -569,7 +544,7 @@ func (fs *cephfs) SetLock(ctx context.Context, ref *provider.Reference, lock *pr func (fs *cephfs) GetLock(ctx context.Context, ref *provider.Reference) (*provider.Lock, error) { user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) + path, err := fs.resolveRef(ref) if err != nil { return nil, getRevaError(err) } @@ -665,7 +640,7 @@ func (fs *cephfs) RefreshLock(ctx context.Context, ref *provider.Reference, newL func (fs *cephfs) Unlock(ctx context.Context, ref *provider.Reference, lock *provider.Lock) error { user := fs.makeUser(ctx) - path, err := user.resolveRef(ref) + path, err := fs.resolveRef(ref) if err != nil { return getRevaError(err) } diff --git a/pkg/storage/fs/cephfs/chunking.go b/pkg/storage/fs/cephfs/chunking.go index 3380eeb2cc..1b5be2b4b0 100644 --- a/pkg/storage/fs/cephfs/chunking.go +++ b/pkg/storage/fs/cephfs/chunking.go @@ -93,7 +93,6 @@ type ChunkHandler struct { // NewChunkHandler creates a handler for chunked uploads. func NewChunkHandler(ctx context.Context, fs *cephfs) *ChunkHandler { - fmt.Println("debugging NewChunkHandler", fs.makeUser(ctx), fs.conf.UploadFolder) u := fs.makeUser(ctx) return &ChunkHandler{u, path.Join(u.home, fs.conf.UploadFolder)} } @@ -125,7 +124,6 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu // err = fmt.Errorf("error getting transfer folder anme", err) return } - fmt.Println("debugging: transferfoldername", transferFolderName) // here we write a temporary file that will be renamed to the transfer folder // with the correct sequence number filename. @@ -135,7 +133,6 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu c.user.op(func(cv *cacheVal) { var tmpFile *goceph.File target := filepath.Join(c.uploadFolder, tmpFilename) - fmt.Println("debugging savechunk, target: ", target) tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms) defer closeFile(tmpFile) if err != nil { diff --git a/pkg/storage/fs/cephfs/connections.go b/pkg/storage/fs/cephfs/connections.go index 6a04748e85..13238ce080 100644 --- a/pkg/storage/fs/cephfs/connections.go +++ b/pkg/storage/fs/cephfs/connections.go @@ -24,6 +24,7 @@ package cephfs import ( "context" "fmt" + "path/filepath" "time" "github.com/ceph/go-ceph/cephfs/admin" @@ -31,7 +32,9 @@ import ( grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/pkg/storage/utils/templates" "github.com/pkg/errors" goceph "github.com/ceph/go-ceph/cephfs" @@ -185,7 +188,6 @@ func newAdminConn(conf *Options) (*adminConn, error) { } func newConn(user *User) *cacheVal { - fmt.Printf("debugging user: %+v\n", user) var perm *goceph.UserPerm mount, err := goceph.CreateMountWithId(user.fs.conf.ClientID) if err != nil { @@ -204,7 +206,6 @@ func newConn(user *User) *cacheVal { } if user != nil { //nil creates admin conn - fmt.Println("creating admin connection: debugging new connection for user: ", user.UidNumber) perm = goceph.NewUserPerm(int(user.UidNumber), int(user.GidNumber), []int{}) if err = mount.SetMountPerms(perm); err != nil { return destroyCephConn(mount, perm) @@ -332,3 +333,21 @@ func (fs *cephfs) getGroupByOpaqueID(ctx context.Context, oid string) (*grouppb. return getGroupResp.Group, nil } + +type mount struct { + *goceph.MountInfo + userHomePath string +} + +func (m *mount) GetHome() string { + return m.userHomePath +} + +func (fs *cephfs) getMount(ctx context.Context) *mount { + u := appctx.ContextMustGetUser(ctx) + userHomePath := filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) + m := &mount{ + userHomePath: userHomePath, + } + return m +} diff --git a/pkg/storage/fs/cephfs/upload.go b/pkg/storage/fs/cephfs/upload.go index 0c9694f5d9..44639881a7 100644 --- a/pkg/storage/fs/cephfs/upload.go +++ b/pkg/storage/fs/cephfs/upload.go @@ -23,7 +23,6 @@ package cephfs import ( "context" - "fmt" "io" "os" @@ -44,7 +43,6 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read } if !ok { - fmt.Println("debugging: upload is not chunked", p) var file io.WriteCloser user.op(func(cv *cacheVal) { file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms) @@ -65,13 +63,11 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read } // upload is chunked - fmt.Println("debugging: upload is chunked", p) var assembledFile string // iniate the chunk handler originalFilename, assembledFile, err := NewChunkHandler(ctx, fs).WriteChunk(p, r) - fmt.Println("debugging: assembly file", originalFilename, assembledFile, r) if err != nil { return errors.Wrapf(err, "error writing chunk %v %v %v", p, r, assembledFile) } diff --git a/pkg/storage/fs/cephfs/user.go b/pkg/storage/fs/cephfs/user.go index a5b393b329..5a7a867ef7 100644 --- a/pkg/storage/fs/cephfs/user.go +++ b/pkg/storage/fs/cephfs/user.go @@ -55,7 +55,6 @@ func (fs *cephfs) makeUser(ctx context.Context) *User { u := appctx.ContextMustGetUser(ctx) // home := fs.conf.Root home := filepath.Join(fs.conf.Root, templates.WithUser(u, fs.conf.UserLayout)) - fmt.Println("debugging makeUser", home) return &User{u, fs, ctx, home} } @@ -90,7 +89,7 @@ func (user *User) op(cb callBack) { cb(val.(*cacheVal)) } -func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { +func (fs *cephfs) fileAsResourceInfo(mount *mount, path string, stat *goceph.CephStatx, mdKeys []string) (ri *provider.ResourceInfo, err error) { var ( _type provider.ResourceType target string @@ -101,7 +100,7 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep switch int(stat.Mode) & syscall.S_IFMT { case syscall.S_IFDIR: _type = provider.ResourceType_RESOURCE_TYPE_CONTAINER - if buf, err = cv.mount.GetXattr(path, "ceph.dir.rbytes"); err == nil { + if buf, err = mount.GetXattr(path, "ceph.dir.rbytes"); err == nil { size, err = strconv.ParseUint(string(buf), 10, 64) } else if err.Error() == errPermissionDenied { // Ignore permission denied errors so ListFolder does not fail because of them. @@ -109,7 +108,7 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep } case syscall.S_IFLNK: _type = provider.ResourceType_RESOURCE_TYPE_SYMLINK - target, err = cv.mount.Readlink(path) + target, err = mount.Readlink(path) case syscall.S_IFREG: _type = provider.ResourceType_RESOURCE_TYPE_FILE size = stat.Size @@ -127,10 +126,10 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep keys = map[string]bool{} } mx := make(map[string]string) - if xattrs, err = cv.mount.ListXattr(path); err == nil { + if xattrs, err = mount.ListXattr(path); err == nil { for _, xattr := range xattrs { if len(mdKeys) == 0 || keys[xattr] { - if buf, err := cv.mount.GetXattr(path, xattr); err == nil { + if buf, err := mount.GetXattr(path, xattr); err == nil { mx[xattr] = string(buf) } } @@ -139,7 +138,7 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep var etag string if isDir(_type) { - rctime, _ := cv.mount.GetXattr(path, "ceph.dir.rctime") + rctime, _ := mount.GetXattr(path, "ceph.dir.rctime") etag = fmt.Sprint(stat.Inode) + ":" + string(rctime) } else { etag = fmt.Sprint(stat.Inode) + ":" + strconv.FormatInt(stat.Ctime.Sec, 10) @@ -150,7 +149,7 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep Nanos: uint32(stat.Mtime.Nsec), } - perms := getPermissionSet(user, stat, cv.mount, path) + perms := getPermissionSet(user, stat, mount, path) for key := range mx { if !strings.HasPrefix(key, xattrUserNs) { @@ -200,7 +199,7 @@ func (user *User) fileAsResourceInfo(cv *cacheVal, path string, stat *goceph.Cep return } -func (user *User) resolveRef(ref *provider.Reference) (string, error) { +func (fs *cephfs) resolveRef(ref *provider.Reference) (string, error) { if ref == nil { return "", fmt.Errorf("cephfs: nil reference provided") }