Skip to content

Commit

Permalink
move unwrapping and wrapping of paths to the gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
David Christofas committed Jul 5, 2021
1 parent 54e2236 commit 1993a12
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 254 deletions.
2 changes: 1 addition & 1 deletion internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.St
TargetUri: targetURI,
}

c, err := s.findByPath(ctx, refPath)
c, _, err := s.findByPath(ctx, refPath)
if err != nil {
if _, ok := err.(errtypes.IsNotFound); ok {
return status.NewNotFound(ctx, "storage provider not found"), nil
Expand Down
137 changes: 107 additions & 30 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *svc) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (
log := appctx.GetLogger(ctx)

home := s.getHome(ctx)
c, err := s.findByPath(ctx, home)
c, _, err := s.findByPath(ctx, home)
if err != nil {
return &provider.CreateHomeResponse{
Status: status.NewStatusFromErrType(ctx, "error finding home", err),
Expand All @@ -96,7 +96,7 @@ func (s *svc) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (
func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
c, err := s.findByPath(ctx, req.Type)
c, _, err := s.findByPath(ctx, req.Type)
if err != nil {
return &provider.CreateStorageSpaceResponse{
Status: status.NewStatusFromErrType(ctx, "error finding path", err),
Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
c, _, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + a opaqueid
OpaqueId: parts[1],
}})
Expand All @@ -151,7 +151,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
c, err := s.find(ctx, &provider.Reference{ResourceId: req.StorageSpace.Root})
c, _, err := s.find(ctx, &provider.Reference{ResourceId: req.StorageSpace.Root})
if err != nil {
return &provider.UpdateStorageSpaceResponse{
Status: status.NewStatusFromErrType(ctx, "error finding ID", err),
Expand All @@ -177,7 +177,7 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
c, _, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + a opaqueid
OpaqueId: parts[1],
}})
Expand Down Expand Up @@ -368,13 +368,16 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi

func (s *svc) initiateFileDownload(ctx context.Context, req *provider.InitiateFileDownloadRequest) (*gateway.InitiateFileDownloadResponse, error) {
// TODO(ishank011): enable downloading references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &gateway.InitiateFileDownloadResponse{
Status: status.NewStatusFromErrType(ctx, "error initiating download ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
storageRes, err := c.InitiateFileDownload(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling InitiateFileDownload")
Expand Down Expand Up @@ -563,13 +566,16 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile
}

func (s *svc) initiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*gateway.InitiateFileUploadResponse, error) {
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &gateway.InitiateFileUploadResponse{
Status: status.NewStatusFromErrType(ctx, "initiateFileUpload ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
storageRes, err := c.InitiateFileUpload(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling InitiateFileUpload")
Expand Down Expand Up @@ -709,13 +715,16 @@ func (s *svc) CreateContainer(ctx context.Context, req *provider.CreateContainer
}

func (s *svc) createContainer(ctx context.Context, req *provider.CreateContainerRequest) (*provider.CreateContainerResponse, error) {
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.CreateContainerResponse{
Status: status.NewStatusFromErrType(ctx, "createContainer ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
res, err := c.CreateContainer(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling CreateContainer")
Expand Down Expand Up @@ -812,13 +821,16 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide

func (s *svc) delete(ctx context.Context, req *provider.DeleteRequest) (*provider.DeleteResponse, error) {
// TODO(ishank011): enable deleting references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.DeleteResponse{
Status: status.NewStatusFromErrType(ctx, "delete ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
res, err := c.Delete(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling Delete")
Expand Down Expand Up @@ -948,19 +960,28 @@ func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
Status: status.NewInternal(ctx, err, "error connecting to storage provider="+srcP.Address),
}, nil
}
if req.Source, err = unwrap(req.Source, srcP.ProviderPath); err != nil {
return nil, err
}
if req.Destination, err = unwrap(req.Destination, dstP.ProviderPath); err != nil {
return nil, err
}

return c.Move(ctx, req)
}

func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitraryMetadataRequest) (*provider.SetArbitraryMetadataResponse, error) {
// TODO(ishank011): enable for references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.SetArbitraryMetadataResponse{
Status: status.NewStatusFromErrType(ctx, "SetArbitraryMetadata ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
res, err := c.SetArbitraryMetadata(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling Stat")
Expand All @@ -971,13 +992,16 @@ func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitra

func (s *svc) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArbitraryMetadataRequest) (*provider.UnsetArbitraryMetadataResponse, error) {
// TODO(ishank011): enable for references spread across storage providers, eg. /eos
c, err := s.find(ctx, req.Ref)
c, p, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.UnsetArbitraryMetadataResponse{
Status: status.NewStatusFromErrType(ctx, "UnsetArbitraryMetadata ref="+req.Ref.String(), err),
}, nil
}

if req.Ref, err = unwrap(req.Ref, p.ProviderPath); err != nil {
return nil, err
}
res, err := c.UnsetArbitraryMetadata(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling Stat")
Expand Down Expand Up @@ -1093,7 +1117,22 @@ func (s *svc) stat(ctx context.Context, req *provider.StatRequest) (*provider.St
Status: status.NewInternal(ctx, err, "error connecting to storage provider="+providers[0].Address),
}, nil
}
return c.Stat(ctx, req)

ref, err := unwrap(req.Ref, providers[0].ProviderPath)
if err != nil {
return nil, err
}
// We need to copy the request because we don't want to overwrite the original request
sReq := &provider.StatRequest{
Opaque: req.Opaque,
Ref: ref,
ArbitraryMetadataKeys: req.ArbitraryMetadataKeys,
}
res, err := c.Stat(ctx, sReq)
if res != nil && res.Info != nil {
res.Info.Path = path.Join(providers[0].ProviderPath, res.Info.Path)
}
return res, err
}

infoFromProviders := make([]*provider.ResourceInfo, len(providers))
Expand Down Expand Up @@ -1142,18 +1181,26 @@ func (s *svc) statOnProvider(ctx context.Context, req *provider.StatRequest, res
}

resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
ref := &provider.Reference{
Path: req.Ref.Path,
}
if resPath != "." && !strings.HasPrefix(resPath, p.ProviderPath) {
ref.Path = p.ProviderPath
}
ref, err = unwrap(ref, p.ProviderPath)
if err != nil {
*e = err
return
}
r, err := c.Stat(ctx, &provider.StatRequest{Ref: &provider.Reference{Path: newPath}})
r, err := c.Stat(ctx, &provider.StatRequest{Ref: ref})
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
}
if res == nil {
res = &provider.ResourceInfo{}
}
r.Info.Path = path.Join(p.ProviderPath, r.Info.Path)
*res = *r.Info
}

Expand Down Expand Up @@ -1503,15 +1550,25 @@ func (s *svc) listContainerOnProvider(ctx context.Context, req *provider.ListCon
}

resPath := path.Clean(req.Ref.GetPath())
newPath := req.Ref.GetPath()
if resPath != "" && !strings.HasPrefix(resPath, p.ProviderPath) {
newPath = p.ProviderPath
ref := &provider.Reference{
Path: req.Ref.Path,
}
if resPath != "." && !strings.HasPrefix(resPath, p.ProviderPath) {
ref.Path = p.ProviderPath
}
ref, err = unwrap(ref, p.ProviderPath)
if err != nil {
*e = err
return
}
r, err := c.ListContainer(ctx, &provider.ListContainerRequest{Ref: &provider.Reference{Path: newPath}})
r, err := c.ListContainer(ctx, &provider.ListContainerRequest{Ref: ref})
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListContainer")
return
}
for i := range r.Infos {
r.Infos[i].Path = path.Join(p.ProviderPath, r.Infos[i].Path)
}
*res = r.Infos
}

Expand Down Expand Up @@ -1787,7 +1844,7 @@ func (s *svc) CreateSymlink(ctx context.Context, req *provider.CreateSymlinkRequ
}

func (s *svc) ListFileVersions(ctx context.Context, req *provider.ListFileVersionsRequest) (*provider.ListFileVersionsResponse, error) {
c, err := s.find(ctx, req.Ref)
c, _, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.ListFileVersionsResponse{
Status: status.NewStatusFromErrType(ctx, "ListFileVersions ref="+req.Ref.String(), err),
Expand All @@ -1803,7 +1860,7 @@ func (s *svc) ListFileVersions(ctx context.Context, req *provider.ListFileVersio
}

func (s *svc) RestoreFileVersion(ctx context.Context, req *provider.RestoreFileVersionRequest) (*provider.RestoreFileVersionResponse, error) {
c, err := s.find(ctx, req.Ref)
c, _, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.RestoreFileVersionResponse{
Status: status.NewStatusFromErrType(ctx, "RestoreFileVersion ref="+req.Ref.String(), err),
Expand All @@ -1824,7 +1881,7 @@ func (s *svc) ListRecycleStream(_ *gateway.ListRecycleStreamRequest, _ gateway.G

// TODO use the ListRecycleRequest.Ref to only list the trash of a specific storage
func (s *svc) ListRecycle(ctx context.Context, req *gateway.ListRecycleRequest) (*provider.ListRecycleResponse, error) {
c, err := s.find(ctx, req.GetRef())
c, _, err := s.find(ctx, req.GetRef())
if err != nil {
return &provider.ListRecycleResponse{
Status: status.NewStatusFromErrType(ctx, "ListFileVersions ref="+req.Ref.String(), err),
Expand All @@ -1844,7 +1901,7 @@ func (s *svc) ListRecycle(ctx context.Context, req *gateway.ListRecycleRequest)
}

func (s *svc) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecycleItemRequest) (*provider.RestoreRecycleItemResponse, error) {
c, err := s.find(ctx, req.Ref)
c, _, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.RestoreRecycleItemResponse{
Status: status.NewStatusFromErrType(ctx, "RestoreRecycleItem ref="+req.Ref.String(), err),
Expand All @@ -1861,7 +1918,7 @@ func (s *svc) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecyc

func (s *svc) PurgeRecycle(ctx context.Context, req *gateway.PurgeRecycleRequest) (*provider.PurgeRecycleResponse, error) {
// lookup storage by treating the key as a path. It has been prefixed with the storage path in ListRecycle
c, err := s.find(ctx, req.Ref)
c, _, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.PurgeRecycleResponse{
Status: status.NewStatusFromErrType(ctx, "PurgeRecycle ref="+req.Ref.String(), err),
Expand All @@ -1879,7 +1936,7 @@ func (s *svc) PurgeRecycle(ctx context.Context, req *gateway.PurgeRecycleRequest
}

func (s *svc) GetQuota(ctx context.Context, req *gateway.GetQuotaRequest) (*provider.GetQuotaResponse, error) {
c, err := s.find(ctx, req.Ref)
c, _, err := s.find(ctx, req.Ref)
if err != nil {
return &provider.GetQuotaResponse{
Status: status.NewStatusFromErrType(ctx, "GetQuota ref="+req.Ref.String(), err),
Expand All @@ -1896,17 +1953,18 @@ func (s *svc) GetQuota(ctx context.Context, req *gateway.GetQuotaRequest) (*prov
return res, nil
}

func (s *svc) findByPath(ctx context.Context, path string) (provider.ProviderAPIClient, error) {
func (s *svc) findByPath(ctx context.Context, path string) (provider.ProviderAPIClient, *registry.ProviderInfo, error) {
ref := &provider.Reference{Path: path}
return s.find(ctx, ref)
}

func (s *svc) find(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, error) {
func (s *svc) find(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, *registry.ProviderInfo, error) {
p, err := s.findProviders(ctx, ref)
if err != nil {
return nil, err
return nil, nil, err
}
return s.getStorageProviderClient(ctx, p[0])
c, err := s.getStorageProviderClient(ctx, p[0])
return c, p[0], err
}

func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderInfo) (provider.ProviderAPIClient, error) {
Expand Down Expand Up @@ -1959,3 +2017,22 @@ type etagWithTS struct {
Etag string
Timestamp time.Time
}

func unwrap(ref *provider.Reference, providerPath string) (*provider.Reference, error) {
if ref.GetResourceId() != nil {
return ref, nil
}

if ref.GetPath() == "" {
// abort, no valid id nor path
return nil, errtypes.BadRequest("unwrap: ref is invalid: " + ref.String())
}

p := strings.TrimPrefix(ref.GetPath(), providerPath)
if p == "" {
p = "/"
}
pathRef := &provider.Reference{Path: p}

return pathRef, nil
}
Loading

0 comments on commit 1993a12

Please sign in to comment.