Skip to content

Commit

Permalink
Add user ID cache warmup to EOS storage driver (cs3org#1774)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Jun 11, 2021
1 parent 2c98edc commit a9556a3
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 26 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/eos-cache-warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Add user ID cache warmup to EOS storage driver

https://github.com/cs3org/reva/pull/1774
10 changes: 10 additions & 0 deletions pkg/storage/utils/eosfs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,14 @@ type Config struct {
// URI of the EOS MGM grpc server
// Default is empty
GrpcURI string `mapstructure:"master_grpc_uri"`

// Size of the cache used to store user ID and UID resolution.
// Default value is 1000000.
UserIDCacheSize int `mapstructure:"user_id_cache_size"`

// The depth, starting from root, that we'll parse directories to lookup the
// owner and warm up the cache. For example, for a layout of {{substr 0 1 .Username}}/{{.Username}}
// and a depth of 2, we'll lookup each user's home directory.
// Default value is 2.
UserIDCacheWarmupDepth int `mapstructure:"user_id_cache_warmup_depth"`
}
84 changes: 58 additions & 26 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"

"github.com/bluele/gcache"
grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -119,6 +118,14 @@ func (c *Config) init() {
c.UserLayout = "{{.Username}}" // TODO set better layout
}

if c.UserIDCacheSize == 0 {
c.UserIDCacheSize = 1000000
}

if c.UserIDCacheWarmupDepth == 0 {
c.UserIDCacheWarmupDepth = 2
}

c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc)
}

Expand All @@ -128,7 +135,7 @@ type eosfs struct {
chunkHandler *chunking.ChunkHandler
singleUserUID string
singleUserGID string
userIDCache sync.Map
userIDCache gcache.Cache
}

// NewEOSFS returns a storage.FS interface implementation that connects to an EOS instance
Expand Down Expand Up @@ -179,12 +186,35 @@ func NewEOSFS(c *Config) (storage.FS, error) {
c: eosClient,
conf: c,
chunkHandler: chunking.NewChunkHandler(c.CacheDirectory),
userIDCache: sync.Map{},
userIDCache: gcache.New(c.UserIDCacheSize).LFU().Build(),
}

go eosfs.userIDcacheWarmup()

return eosfs, nil
}

func (fs *eosfs) userIDcacheWarmup() {
if !fs.conf.EnableHome {
ctx := context.Background()
paths := []string{fs.wrap(ctx, "/")}
uid, gid, _ := fs.getRootUIDAndGID(ctx)

for i := 0; i < fs.conf.UserIDCacheWarmupDepth; i++ {
var newPaths []string
for _, fn := range paths {
if eosFileInfos, err := fs.c.List(ctx, uid, gid, fn); err == nil {
for _, f := range eosFileInfos {
_, _ = fs.getUserIDGateway(ctx, strconv.FormatUint(f.UID, 10))
newPaths = append(newPaths, f.File)
}
}
}
paths = newPaths
}
}
}

func (fs *eosfs) Shutdown(ctx context.Context) error {
// TODO(labkode): in a grpc implementation we can close connections.
return nil
Expand Down Expand Up @@ -579,7 +609,7 @@ func (fs *eosfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []st
return nil, err
}

return fs.convertToResourceInfo(ctx, eosFileInfo, false)
return fs.convertToResourceInfo(ctx, eosFileInfo)
}

func (fs *eosfs) getMDShareFolder(ctx context.Context, p string, mdKeys []string) (*provider.ResourceInfo, error) {
Expand All @@ -602,7 +632,7 @@ func (fs *eosfs) getMDShareFolder(ctx context.Context, p string, mdKeys []string
// TODO(labkode): diff between root (dir) and children (ref)

if fs.isShareFolderRoot(ctx, p) {
return fs.convertToResourceInfo(ctx, eosFileInfo, false)
return fs.convertToResourceInfo(ctx, eosFileInfo)
}
return fs.convertToFileReference(ctx, eosFileInfo)
}
Expand Down Expand Up @@ -644,10 +674,6 @@ func (fs *eosfs) listWithNominalHome(ctx context.Context, p string) (finfos []*p
}

fn := fs.wrap(ctx, p)
virtualView := false
if !fs.conf.EnableHome && filepath.Dir(fn) == filepath.Clean(fs.conf.Namespace) {
virtualView = true
}

eosFileInfos, err := fs.c.List(ctx, uid, gid, fn)
if err != nil {
Expand All @@ -665,7 +691,7 @@ func (fs *eosfs) listWithNominalHome(ctx context.Context, p string) (finfos []*p
}

// Remove the hidden folders in the topmost directory
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo, virtualView); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
finfos = append(finfos, finfo)
}
}
Expand Down Expand Up @@ -719,7 +745,7 @@ func (fs *eosfs) listHome(ctx context.Context, home string) ([]*provider.Resourc
}
}

if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo, false); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
finfos = append(finfos, finfo)
}
}
Expand Down Expand Up @@ -1324,7 +1350,7 @@ func (fs *eosfs) convertToRecycleItem(ctx context.Context, eosDeletedItem *eoscl
}

func (fs *eosfs) convertToRevision(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.FileVersion, error) {
md, err := fs.convertToResourceInfo(ctx, eosFileInfo, false)
md, err := fs.convertToResourceInfo(ctx, eosFileInfo)
if err != nil {
return nil, err
}
Expand All @@ -1337,12 +1363,12 @@ func (fs *eosfs) convertToRevision(ctx context.Context, eosFileInfo *eosclient.F
return revision, nil
}

func (fs *eosfs) convertToResourceInfo(ctx context.Context, eosFileInfo *eosclient.FileInfo, virtualView bool) (*provider.ResourceInfo, error) {
return fs.convert(ctx, eosFileInfo, virtualView)
func (fs *eosfs) convertToResourceInfo(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
return fs.convert(ctx, eosFileInfo)
}

func (fs *eosfs) convertToFileReference(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
info, err := fs.convert(ctx, eosFileInfo, false)
info, err := fs.convert(ctx, eosFileInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1432,7 +1458,7 @@ func mergePermissions(l *provider.ResourcePermissions, r *provider.ResourcePermi
l.UpdateGrant = l.UpdateGrant || r.UpdateGrant
}

func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo, virtualView bool) (*provider.ResourceInfo, error) {
func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
path, err := fs.unwrap(ctx, eosFileInfo.File)
if err != nil {
return nil, err
Expand All @@ -1443,13 +1469,10 @@ func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo, v
size = eosFileInfo.TreeSize
}

owner := &userpb.UserId{}
if !virtualView {
owner, err = fs.getUserIDGateway(ctx, strconv.FormatUint(eosFileInfo.UID, 10))
if err != nil {
sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Warn().Uint64("uid", eosFileInfo.UID).Msg("could not lookup userid, leaving empty")
}
owner, err := fs.getUserIDGateway(ctx, strconv.FormatUint(eosFileInfo.UID, 10))
if err != nil {
sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Warn().Uint64("uid", eosFileInfo.UID).Msg("could not lookup userid, leaving empty")
}

var xs provider.ResourceChecksum
Expand Down Expand Up @@ -1542,9 +1565,18 @@ func (fs *eosfs) getUIDGateway(ctx context.Context, u *userpb.UserId) (string, s
}

func (fs *eosfs) getUserIDGateway(ctx context.Context, uid string) (*userpb.UserId, error) {
if userIDInterface, ok := fs.userIDCache.Load(uid); ok {
log := appctx.GetLogger(ctx)
// Handle the case of root
if uid == "0" {
return nil, errtypes.BadRequest("eosfs: cannot return root user")
}

if userIDInterface, err := fs.userIDCache.Get(uid); err == nil {
log.Debug().Msg("eosfs: found cached uid " + uid)
return userIDInterface.(*userpb.UserId), nil
}

log.Debug().Msg("eosfs: retrieving user from gateway for uid " + uid)
client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc)
if err != nil {
return nil, errors.Wrap(err, "eos: error getting gateway grpc client")
Expand All @@ -1560,7 +1592,7 @@ func (fs *eosfs) getUserIDGateway(ctx context.Context, uid string) (*userpb.User
return nil, errors.Wrap(err, "eos: grpc get user failed")
}

fs.userIDCache.Store(uid, getUserResp.User.Id)
_ = fs.userIDCache.Set(uid, getUserResp.User.Id)
return getUserResp.User.Id, nil
}

Expand Down

0 comments on commit a9556a3

Please sign in to comment.