Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spaces 1/2] initial ListStorageSpaces implementation #1802

Merged
merged 5 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/list-spaces.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Introduce list spaces

The ListStorageSpaces call now allows listing all user homes and shared resources using a storage space id. The gateway will forward requests to a specific storage provider when a filter by id is given. Otherwise it will query all storage providers. Results will be deduplicated. Currently, only the decomposed fs storage driver implements the necessary logic to demonstrate the implmentation. A new `/dav/spaces` WebDAV endpoint to directly access a storage space is introduced in a separate PR.

https://github.com/cs3org/reva/pull/1802
https://github.com/cs3org/reva/pull/1803
127 changes: 110 additions & 17 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,130 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
var id *provider.StorageSpaceId
for _, f := range req.Filters {
if f.Type == provider.ListStorageSpacesRequest_Filter_TYPE_ID {
id = f.GetId()
}
}
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {

var (
providers []*registry.ProviderInfo
err error
)
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}

if id != nil {
// query that specific storage provider
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
Comment on lines +136 to +141
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd refactor this to something with more semantic meaning. All we see here is "something has to be split in 2 using ! as a delimiter, and this will probably be used wherever we use the spaces API.

res, err := c.GetStorageProviders(ctx, &registry.GetStorageProvidersRequest{
Ref: &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid
OpaqueId: parts[1],
}},
})
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "ListStorageSpaces filters: req "+req.String(), err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}
providers = res.Providers
} else {
// get list of all storage providers
res, err := c.ListStorageProviders(ctx, &registry.ListStorageProvidersRequest{})

if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error listing providers", err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}

providers = make([]*registry.ProviderInfo, 0, len(res.Providers))
// FIXME filter only providers that have an id set ... currently none have?
// bug? only ProviderPath is set
for i := range res.Providers {
// use only providers whose path does not start with a /?
if strings.HasPrefix(res.Providers[i].ProviderPath, "/") {
butonic marked this conversation as resolved.
Show resolved Hide resolved
continue
}
providers = append(providers, res.Providers[i])
}
}

spacesFromProviders := make([][]*provider.StorageSpace, len(providers))
errors := make([]error, len(providers))

var wg sync.WaitGroup
for i, p := range providers {
wg.Add(1)
go s.listStorageSpacesOnProvider(ctx, req, &spacesFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

uniqueSpaces := map[string]*provider.StorageSpace{}
for i := range providers {
if errors[i] != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would an error condition mean skip the provider? Should we check the error type? Could it be a "mission critical" error and halt this operation? Maybe i'm overthinking 🦖

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when a provider retuns an unsupported error we should continue with the others. expect failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's one only provider which matches the request, we won't raise any errors and return an empty slice. Maybe we can handle that case separately?

if len(providers) > 1 {
log.Debug().Err(errors[i]).Msg("skipping provider")
continue
}
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error listing space", errors[i]),
}, nil
}
for j := range spacesFromProviders[i] {
uniqueSpaces[spacesFromProviders[i][j].Id.OpaqueId] = spacesFromProviders[i][j]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I first saw uniqueSpaces I thought you dedup the slice, but it wasn't the case. could we rename uniqueSpaces to spaces or knownSpaces or even abstract it to a function and call it in L#211?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it does dedupe: the home and user storage provider both return the same storage id ... we only need it once

}
}
spaces := make([]*provider.StorageSpace, 0, len(uniqueSpaces))
for spaceID := range uniqueSpaces {
spaces = append(spaces, uniqueSpaces[spaceID])
}
if len(spaces) == 0 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
Status: status.NewNotFound(ctx, "space not found"),
}, nil
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + a opaqueid
OpaqueId: parts[1],
}})

return &provider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

func (s *svc) listStorageSpacesOnProvider(ctx context.Context, req *provider.ListStorageSpacesRequest, res *[]*provider.StorageSpace, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error finding path", err),
}, nil
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

res, err := c.ListStorageSpaces(ctx, req)
r, err := c.ListStorageSpaces(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error listing storage space on storage provider")
return &provider.ListStorageSpacesResponse{
Status: status.NewInternal(ctx, err, "error calling ListStorageSpaces"),
}, nil
*e = errors.Wrap(err, "gateway: error calling ListStorageSpaces")
return
}
return res, nil

*res = r.StorageSpaces
}

func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
Expand Down
41 changes: 40 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,48 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
}, nil
}

func hasNodeID(s *provider.StorageSpace) bool {
return s != nil && s.Root != nil && s.Root.OpaqueId != ""
}

func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
spaces, err := s.storage.ListStorageSpaces(ctx, req.Filters)
if err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "not found when listing spaces")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
case errtypes.NotSupported:
st = status.NewUnimplemented(ctx, err, "not implemented")
default:
st = status.NewInternal(ctx, err, "error listing spaces")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
st = status.NewInternal(ctx, err, "error listing spaces")
st = status.NewInternal(ctx, err, "could not list spaces")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't mention the fact that we errored in any of the previous messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the other cases are known errors. this is a catchall in case it is none of the above errors. These huge error handling blocks are ugly and I hope they can be refactored ... in a dedicated PR.

}
return &provider.ListStorageSpacesResponse{
Status: st,
}, nil
}

for i := range spaces {
if hasNodeID(spaces[i]) {
butonic marked this conversation as resolved.
Show resolved Hide resolved
// fill in storagespace id if it is not set
if spaces[i].Id == nil || spaces[i].Id.OpaqueId == "" {
spaces[i].Id = &provider.StorageSpaceId{OpaqueId: s.mountID + "!" + spaces[i].Root.OpaqueId}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstract to its method with some defensive enforcing the format, perhaps using a simple regex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would make migration scenarios harder...

}
// fill in storage id if it is not set
if spaces[i].Root.StorageId == "" {
spaces[i].Root.StorageId = s.mountID
}
} else if spaces[i].Id == nil || spaces[i].Id.OpaqueId == "" {
log.Warn().Str("service", "storageprovider").Str("driver", s.conf.Driver).Interface("space", spaces[i]).Msg("space is missing space id and root id")
}
}

return &provider.ListStorageSpacesResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("ListStorageSpaces not implemented"), "ListStorageSpaces not implemented"),
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,10 @@ func (fs *ocfs) RestoreRecycleItem(ctx context.Context, key string, restoreRef *
return fs.propagate(ctx, tgt)
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}

func (fs *ocfs) propagate(ctx context.Context, leafPath string) error {
var root string
if fs.c.EnableHome {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/fs/owncloudsql/owncloudsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,11 @@ func (fs *ocfs) HashFile(path string) (string, string, string, error) {
}
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
// TODO(corby): Implement
return nil, errtypes.NotSupported("list storage spaces")
}

func readChecksumIntoResourceChecksum(ctx context.Context, checksums, algo string, ri *provider.ResourceInfo) {
re := regexp.MustCompile(strings.ToUpper(algo) + `:(.*)`)
matches := re.FindStringSubmatch(checksums)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,7 @@ func (fs *s3FS) ListRecycle(ctx context.Context) ([]*provider.RecycleItem, error
func (fs *s3FS) RestoreRecycleItem(ctx context.Context, key string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("restore recycle")
}

func (fs *s3FS) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}
29 changes: 18 additions & 11 deletions pkg/storage/registry/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,25 @@ func (b *reg) FindProviders(ctx context.Context, ref *provider.Reference) ([]*re

// If the reference has a resource id set, use it to route
if ref.ResourceId != nil {
for prefix, rule := range b.c.Rules {
addr := getProviderAddr(ctx, rule)
r, err := regexp.Compile("^" + prefix + "$")
if err != nil {
continue
if ref.ResourceId.StorageId != "" {
for prefix, rule := range b.c.Rules {
addr := getProviderAddr(ctx, rule)
r, err := regexp.Compile("^" + prefix + "$")
if err != nil {
continue
}
// TODO(labkode): fill path info based on provider id, if path and storage id points to same id, take that.
if m := r.FindString(ref.ResourceId.StorageId); m != "" {
return []*registrypb.ProviderInfo{{
ProviderId: ref.ResourceId.StorageId,
Address: addr,
}}, nil
}
}
// TODO(labkode): fill path info based on provider id, if path and storage id points to same id, take that.
if m := r.FindString(ref.ResourceId.StorageId); m != "" {
return []*registrypb.ProviderInfo{{
ProviderId: ref.ResourceId.StorageId,
Address: addr,
}}, nil
// TODO if the storage id is not set but node id is set we could poll all storage providers to check if the node is known there
// for now, say the reference is invalid
Comment on lines +163 to +164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be interesting to follow up this with a benchmark. But of course real life scenarios are a problem in and on themselves with storages potentially located in different continents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more interesting to get this to work with eos first ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this one 😅

if ref.ResourceId.OpaqueId != "" {
return nil, errtypes.BadRequest("invalid reference " + ref.String())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type FS interface {
Shutdown(ctx context.Context) error
SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) error
UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) error
ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error)
}

// Registry is the interface that storage registries implement
Expand Down
Loading