Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CS3APIs datatx name change PullTransfer -> CreateTransfer #3553

Merged
merged 10 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/rclone-tpc-cs3apis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Change: Rename PullTransfer to CreateTransfer

This change implements a CS3APIs name change in the datatx module (PullTransfer to CreateTransfer)

https://github.com/cs3org/reva/pull/3553
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2
github.com/dgraph-io/ristretto v0.1.1
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
github.com/gdexlab/go-render v1.0.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8=
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7 h1:QShkOi9aBptnhYN4W0lueiWTlNtc7O69D6GRpYfZodg=
github.com/cs3org/go-cs3apis v0.0.0-20230124153659-5dc78d32c9b7/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2 h1:vJGHFm3lS7LC0XwsSit8ZMqIyE55Op2+X7p1xEH4Vt0=
github.com/cs3org/go-cs3apis v0.0.0-20230209081138-33f5a7d81cb2/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -921,6 +921,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac h1:GsyT76KNkNHftiTKPLY9qRkrU/Nl91G+x9uStQHBVZE=
github.com/redblom/go-cs3apis v0.0.0-20230130162347-1c3e4b532eac/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
33 changes: 16 additions & 17 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
txdriver "github.com/cs3org/reva/pkg/datatx"
txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
Expand Down Expand Up @@ -72,7 +71,7 @@ type txShare struct {
TxID string
SrcTargetURI string
DestTargetURI string
Opaque *types.Opaque `json:"opaque"`
ShareID string
}

func (c *config) init() {
Expand Down Expand Up @@ -146,7 +145,7 @@ func (s *service) UnprotectedEndpoints() []string {
return []string{}
}

func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) {
txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri)

// we always save the transfer regardless of start transfer outcome
Expand All @@ -155,29 +154,29 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ
TxID: txInfo.GetId().OpaqueId,
SrcTargetURI: req.SrcTargetUri,
DestTargetURI: req.DestTargetUri,
Opaque: req.Opaque,
ShareID: req.GetShareId().OpaqueId,
}
s.txShareDriver.Lock()
defer s.txShareDriver.Unlock()

s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String())
return &datatx.PullTransferResponse{
Status: status.NewInvalid(ctx, "error pulling transfer"),
return &datatx.CreateTransferResponse{
Status: status.NewInvalid(ctx, "error creating transfer"),
}, err
}

// now check start transfer outcome
if startTransferErr != nil {
startTransferErr = errors.Wrap(startTransferErr, "datatx service: error starting transfer job")
return &datatx.PullTransferResponse{
Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"),
return &datatx.CreateTransferResponse{
Status: status.NewInvalid(ctx, "datatx service: error creating transfer"),
TxInfo: txInfo,
}, startTransferErr
}

return &datatx.PullTransferResponse{
return &datatx.CreateTransferResponse{
Status: status.NewOK(ctx),
TxInfo: txInfo,
}, nil
Expand All @@ -198,7 +197,7 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.GetTransferStatusResponse{
Status: status.NewOK(ctx),
Expand All @@ -207,22 +206,22 @@ func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransfer
}

func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) {
txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()]
txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().OpaqueId]
if !ok {
return nil, errtypes.InternalError("datatx service: transfer not found")
}

txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId)
if err != nil {
txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}
err = errors.Wrap(err, "datatx service: error cancelling transfer")
return &datatx.CancelTransferResponse{
Status: status.NewInternal(ctx, err, "error cancelling transfer"),
TxInfo: txInfo,
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.CancelTransferResponse{
Status: status.NewOK(ctx),
Expand All @@ -237,15 +236,15 @@ func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRe
if len(filters) == 0 {
txInfos = append(txInfos, &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: txShare.TxID},
ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)},
ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID},
})
} else {
for _, f := range filters {
if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID {
if f.GetShareId().GetOpaqueId() == string(txShare.Opaque.Map["shareId"].Value) {
if f.GetShareId().GetOpaqueId() == txShare.ShareID {
txInfos = append(txInfos, &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: txShare.TxID},
ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)},
ShareId: &ocm.ShareId{OpaqueId: txShare.ShareID},
})
}
}
Expand Down Expand Up @@ -274,7 +273,7 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe
}, err
}

txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}
txInfo.ShareId = &ocm.ShareId{OpaqueId: txShare.ShareID}

return &datatx.RetryTransferResponse{
Status: status.NewOK(ctx),
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/gateway/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
"github.com/pkg/errors"
)

func (s *svc) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) {
c, err := pool.GetDataTxClient(pool.Endpoint(s.c.DataTxEndpoint))
if err != nil {
err = errors.Wrap(err, "gateway: error calling GetDataTxClient")
return &datatx.PullTransferResponse{
return &datatx.CreateTransferResponse{
Status: status.NewInternal(ctx, err, "error getting data transfer client"),
}, nil
}

res, err := c.PullTransfer(ctx, req)
res, err := c.CreateTransfer(ctx, req)
if err != nil {
return nil, errors.Wrap(err, "gateway: error calling PullTransfer")
return nil, errors.Wrap(err, "gateway: error calling CreateTransfer")
}

return res, nil
Expand Down
58 changes: 32 additions & 26 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/errtypes"
Expand Down Expand Up @@ -285,25 +284,31 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
var srcEndpointScheme string
for _, s := range meshProvider.ProviderInfo.Services {
if strings.ToLower(s.Endpoint.Type.Name) == "webdav" {
endpointURL, err := url.Parse(s.Endpoint.Path)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path)
srcWebdavEndpointURL, err := url.Parse(s.Endpoint.Path)
if err != nil || srcWebdavEndpointURL.Host == "" {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + s.Endpoint.Path + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
urlServiceHostFull, err := url.Parse(s.Host)
var srcWebdavHostURLString string
if strings.Contains(s.Host, "://") {
srcWebdavHostURLString = s.Host
} else {
srcWebdavHostURLString = "http://" + s.Host
}
srcWebdavHostURL, err := url.Parse(srcWebdavHostURLString)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + s.Host)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + s.Host + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
srcServiceHost = urlServiceHostFull.Host + urlServiceHostFull.Path
srcServiceHost = srcWebdavHostURL.Host + srcWebdavHostURL.Path
// optional prefix must only appear in target url path:
// http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/...
srcEndpointPath = strings.TrimPrefix(endpointURL.Path, urlServiceHostFull.Path)
srcEndpointScheme = endpointURL.Scheme
srcEndpointPath = strings.TrimPrefix(srcWebdavEndpointURL.Path, srcWebdavHostURL.Path)
srcEndpointScheme = srcWebdavEndpointURL.Scheme
break
}
}
Expand Down Expand Up @@ -345,7 +350,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}
destWebdavEndpointURL, err := url.Parse(destWebdavEndpoint)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint \"" + destWebdavEndpoint + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
Expand All @@ -357,17 +362,23 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
urlServiceHostFull, err := url.Parse(destWebdavHost)
var dstWebdavURLString string
if strings.Contains(destWebdavHost, "://") {
dstWebdavURLString = destWebdavHost
} else {
dstWebdavURLString = "http://" + destWebdavHost
}
dstWebdavHostURL, err := url.Parse(dstWebdavURLString)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host " + destWebdavHost)
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav service host \"" + dstWebdavURLString + "\" into URL structure")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
destServiceHost := urlServiceHostFull.Host + urlServiceHostFull.Path
destServiceHost := dstWebdavHostURL.Host + dstWebdavHostURL.Path
// optional prefix must only appear in target url path:
// http://...token...@reva.eu/prefix/?name=remote.php/webdav/home/...
destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, urlServiceHostFull.Path)
destEndpointPath := strings.TrimPrefix(destWebdavEndpointURL.Path, dstWebdavHostURL.Path)
destEndpointScheme := destWebdavEndpointURL.Scheme
destToken := ctxpkg.ContextMustGetToken(ctx)
homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{})
Expand All @@ -380,30 +391,25 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
destPath := path.Join(destEndpointPath, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name))
destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destServiceHost, destPath)

opaqueObj := &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"shareId": {
Decoder: "plain",
Value: []byte(share.GetShare().GetId().OpaqueId),
},
},
shareID := &ocm.ShareId{
OpaqueId: share.GetShare().GetId().OpaqueId,
}
req := &datatx.PullTransferRequest{
req := &datatx.CreateTransferRequest{
SrcTargetUri: srcTargetURI,
DestTargetUri: destTargetURI,
Opaque: opaqueObj,
ShareId: shareID,
}
res, err := s.PullTransfer(ctx, req)
res, err := s.CreateTransfer(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error calling PullTransfer")
log.Err(err).Msg("gateway: error calling CreateTransfer")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, err
}

log.Info().Msgf("gateway: PullTransfer: %v", res.TxInfo)
log.Info().Msgf("gateway: CreateTransfer: %v", res.TxInfo)

// do not create an OCM reference, just return
return &ocm.UpdateReceivedOCMShareResponse{
Expand Down
9 changes: 1 addition & 8 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ func (m *transferModel) saveTransfer(e error) error {
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) {
logger := appctx.GetLogger(ctx)

srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI)
if err != nil {
return nil, err
Expand All @@ -238,9 +236,6 @@ func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, d
// we always set the userinfo part of the destination url for rclone tpc push support
dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint)

logger.Debug().Msgf("destination target URI: %v", dstTargetURI)
logger.Debug().Msgf("destination remote: %v", dstRemote)

return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
}

Expand Down Expand Up @@ -308,10 +303,8 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote

type rcloneAsyncReqJSON struct {
SrcFs string `json:"srcFs"`
// SrcToken string `json:"srcToken"`
DstFs string `json:"dstFs"`
// DstToken string `json:"destToken"`
Async bool `json:"_async"`
Async bool `json:"_async"`
}
// bearer is the default authentication scheme for reva
srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken)
Expand Down