From 396dd741bcb4c86ab6be8165451b6ccdfdfcba75 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Fri, 2 Dec 2022 09:12:30 +0100 Subject: [PATCH] Rclone third-party copy push (#3491) * Transfer method depends on implementation or configuration. * Support rclone tpc push. * Support tpc push with different accounts at src and dest. * Simplify; don't touch the target URIs. * Support rclone tpc push. * Enhancement: rclone tpc push * Fix comment * Configurable auth scheme for dest fs * Method name changed and documented Co-authored-by: Antoon P --- changelog/unreleased/rclone-tpc.md | 5 ++ examples/datatx/datatx.toml | 39 +++++++--- internal/grpc/services/datatx/datatx.go | 52 +------------ .../http/services/owncloud/ocdav/ocdav.go | 7 +- internal/http/services/owncloud/ocdav/tpc.go | 37 +++++++++- pkg/datatx/datatx.go | 5 +- pkg/datatx/manager/rclone/rclone.go | 74 +++++++++++++++++-- 7 files changed, 147 insertions(+), 72 deletions(-) create mode 100644 changelog/unreleased/rclone-tpc.md diff --git a/changelog/unreleased/rclone-tpc.md b/changelog/unreleased/rclone-tpc.md new file mode 100644 index 0000000000..b6f7582f89 --- /dev/null +++ b/changelog/unreleased/rclone-tpc.md @@ -0,0 +1,5 @@ +Enhancement: implement rclone third-party copy push option + +This enhancement gives the option to use third-party copy push with rclone between two different user accounts. + +https://github.com/cs3org/reva/pull/3491 \ No newline at end of file diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml index cb50385005..f1271b3483 100644 --- a/examples/datatx/datatx.toml +++ b/examples/datatx/datatx.toml @@ -1,22 +1,39 @@ -# example data transfer service configuration +# Example data transfer service configuration [grpc.services.datatx] -# rclone is the default data transfer driver +# Rclone is the default data transfer driver txdriver = "rclone" -# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) +# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) tx_shares_file = "" -# base folder of the data transfers (default: /home/DataTransfers) +# Base folder of the data transfers (default: /home/DataTransfers) data_transfers_folder = "" -# rclone data transfer driver +# Rclone data transfer driver [grpc.services.datatx.txdrivers.rclone] -# rclone endpoint +# Rclone endpoint endpoint = "http://..." -# basic auth is used +# Basic auth is used auth_user = "...rcloneuser" auth_pass = "...rcloneusersecret" -# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) +# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods) +# Valid values: +# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..." +# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +auth_header = "x-access-token" +# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) file = "" -# check status job interval in milliseconds +# Check status job interval in milliseconds job_status_check_interval = 2000 -# the job timeout in milliseconds (must be long enough for big transfers!) -job_timeout = 120000 \ No newline at end of file +# The job timeout in milliseconds (must be long enough for big transfers!) +job_timeout = 120000 + +[http.services.ocdav] +# Rclone supports third-party copy push; for that to work with reva enable this setting +enable_http_tpc = true +# The authentication scheme reva uses for the tpc push call (the call to Destination). +# Follows the destination endpoint authentication method. +# Valid values: +# "bearer" (default) will result in header: Authorization: "Bearer ...token..." +# "x-access-token" will result in header: X-Access-Token: "...token..." +# If not set "bearer" is assumed +http_tpc_push_auth_header = "x-access-token" diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 6aa48749d2..38e1d24979 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -21,9 +21,7 @@ package datatx import ( "context" "encoding/json" - "fmt" "io" - "net/url" "os" "sync" @@ -77,13 +75,6 @@ type txShare struct { Opaque *types.Opaque `json:"opaque"` } -type webdavEndpoint struct { - filePath string - endpoint string - endpointScheme string - token string -} - func (c *config) init() { if c.TxDriver == "" { c.TxDriver = "rclone" @@ -156,23 +147,7 @@ func (s *service) UnprotectedEndpoints() []string { } func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { - srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri) - if err != nil { - return nil, err - } - srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) - srcPath := srcEp.filePath - srcToken := srcEp.token - - destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri) - if err != nil { - return nil, err - } - dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint) - dstPath := destEp.filePath - dstToken := destEp.token - - txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) + txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri) // we always save the transfer regardless of start transfer outcome // only then, if starting fails, can we try to restart it @@ -205,7 +180,7 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ return &datatx.PullTransferResponse{ Status: status.NewOK(ctx), TxInfo: txInfo, - }, err + }, nil } func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { @@ -307,29 +282,6 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe }, nil } -func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) { - if targetURL == "" { - return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") - } - - uri, err := url.Parse(targetURL) - if err != nil { - return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) - } - - m, err := url.ParseQuery(uri.RawQuery) - if err != nil { - return nil, errors.Wrap(err, "datatx service: error parsing target resource name") - } - - return &webdavEndpoint{ - filePath: m["name"][0], - endpoint: uri.Host + uri.Path, - endpointScheme: uri.Scheme, - token: uri.User.String(), - }, nil -} - func loadOrCreate(file string) (*txShareModel, error) { _, err := os.Stat(file) if os.IsNotExist(err) { diff --git a/internal/http/services/owncloud/ocdav/ocdav.go b/internal/http/services/owncloud/ocdav/ocdav.go index 905ecc7513..30e11e39c7 100644 --- a/internal/http/services/owncloud/ocdav/ocdav.go +++ b/internal/http/services/owncloud/ocdav/ocdav.go @@ -100,7 +100,12 @@ type Config struct { Timeout int64 `mapstructure:"timeout"` Insecure bool `mapstructure:"insecure" docs:"false;Whether to skip certificate checks when sending requests."` // If true, HTTP COPY will expect the HTTP-TPC (third-party copy) headers - EnableHTTPTpc bool `mapstructure:"enable_http_tpc"` + EnableHTTPTpc bool `mapstructure:"enable_http_tpc"` + // The authentication scheme to use for the tpc push call when userinfo part is specified in the Destination header uri. Default value is 'bearer'. + // Possible values: + // "bearer" results in header: Authorization: Bearer ...token... + // "x-access-token": results in header: X-Access-Token: ...token... + HTTPTpcPushAuthHeader string `mapstructure:"http_tpc_push_auth_header"` PublicURL string `mapstructure:"public_url"` FavoriteStorageDriver string `mapstructure:"favorite_storage_driver"` FavoriteStorageDrivers map[string]map[string]interface{} `mapstructure:"favorite_storage_drivers"` diff --git a/internal/http/services/owncloud/ocdav/tpc.go b/internal/http/services/owncloud/ocdav/tpc.go index ca3db2e798..739c12cf73 100644 --- a/internal/http/services/owncloud/ocdav/tpc.go +++ b/internal/http/services/owncloud/ocdav/tpc.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net/http" + "net/url" "path" "strconv" "strings" @@ -392,9 +393,28 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie return err } - // add authentication header and content length - bearerHeader := r.Header.Get(HeaderTransferAuth) - req.Header.Add("Authorization", bearerHeader) + // Check if there is userinfo to be found in the destination URI + // This should be the token to use in the HTTP push call + userInfo, err := s.extractUserInfo(ctx, dst) + if err != nil { + sublog.Debug().Msgf("tpc push: error: %v", err) + } + if len(userInfo) > 0 { + sublog.Debug().Msg("tpc push: userinfo part found in destination url, using userinfo as token for the HTTP push request authorization header") + if s.c.HTTPTpcPushAuthHeader == "x-access-token" { + req.Header.Add(s.c.HTTPTpcPushAuthHeader, userInfo) + sublog.Debug().Msgf("tpc push: using authentication scheme: %v", s.c.HTTPTpcPushAuthHeader) + } else { // Bearer is the default + req.Header.Add("Authorization", "Bearer "+userInfo) + sublog.Debug().Msg("tpc push: using authentication scheme: bearer") + } + } else { + sublog.Debug().Msg("tpc push: no userinfo part found in destination url, using token from the COPY request authorization header") + // add authorization header; single token tpc + bearerHeader := r.Header.Get(HeaderTransferAuth) + req.Header.Add("Authorization", bearerHeader) + } + // add content length req.ContentLength = int64(srcInfo.GetSize()) // do Upload @@ -412,3 +432,14 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie return nil } + +// Extracts and returns the userinfo part of the specified target URL (https://[userinfo]@www.example.com:123/...path). +// Returns an empty string if no userinfo part is found. +func (s *svc) extractUserInfo(ctx context.Context, targetURL string) (string, error) { + parsedURL, err := url.Parse(targetURL) + if err != nil { + return "", errtypes.BadRequest("tpc: error extracting userinfo part - error parsing url: " + targetURL) + } + + return parsedURL.User.String(), nil +} diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go index ece6b472f4..34ead136de 100644 --- a/pkg/datatx/datatx.go +++ b/pkg/datatx/datatx.go @@ -26,8 +26,9 @@ import ( // Manager the interface any transfer driver should implement. type Manager interface { - // StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any. - StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) + // CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. + // Specified target URIs are of form scheme://userinfo@host:port?name={path} + CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) // GetTransferStatus returns a TxInfo object including the current status, and error if any. GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go index 72d2e7e8ef..f0b83bf8ae 100644 --- a/pkg/datatx/manager/rclone/rclone.go +++ b/pkg/datatx/manager/rclone/rclone.go @@ -37,6 +37,7 @@ import ( "github.com/cs3org/reva/pkg/appctx" txdriver "github.com/cs3org/reva/pkg/datatx" registry "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rhttp" "github.com/google/uuid" "github.com/mitchellh/mapstructure" @@ -64,6 +65,7 @@ type config struct { Endpoint string `mapstructure:"endpoint"` AuthUser string `mapstructure:"auth_user"` // rclone basicauth user AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + AuthHeader string `mapstructure:"auth_header"` File string `mapstructure:"file"` JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` JobTimeout int `mapstructure:"job_timeout"` @@ -118,6 +120,13 @@ var txEndStatuses = map[string]int32{ "STATUS_TRANSFER_EXPIRED": 10, } +type endpoint struct { + filePath string + endpoint string + endpointScheme string + token string +} + // New returns a new rclone driver. func New(m map[string]interface{}) (txdriver.Manager, error) { c, err := parseConfig(m) @@ -207,9 +216,32 @@ func (m *transferModel) saveTransfer(e error) error { return e } -// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. -func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { - return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) +// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id. +// Specified target URIs are of form scheme://userinfo@host:port?name={path} +func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) { + logger := appctx.GetLogger(ctx) + + srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI) + if err != nil { + return nil, err + } + srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) + srcPath := srcEp.filePath + srcToken := srcEp.token + + destEp, err := driver.extractEndpointInfo(ctx, dstTargetURI) + if err != nil { + return nil, err + } + dstPath := destEp.filePath + dstToken := destEp.token + // we always set the userinfo part of the destination url for rclone tpc push support + dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint) + + logger.Debug().Msgf("destination target URI: %v", dstTargetURI) + logger.Debug().Msgf("destination remote: %v", dstRemote) + + return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) } // startJob starts a transfer job. Retries a previous job if transferID is specified. @@ -281,8 +313,17 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote // DstToken string `json:"destToken"` Async bool `json:"_async"` } - srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath) - dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) + // bearer is the default authentication scheme for reva + srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken) + if driver.config.AuthHeader == "x-access-token" { + srcAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", srcToken) + } + srcFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", srcAuthHeader, srcRemote, srcPath) + destAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", destToken) + if driver.config.AuthHeader == "x-access-token" { + destAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", destToken) + } + dstFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", destAuthHeader, destRemote, destPath) rcloneReq := &rcloneAsyncReqJSON{ SrcFs: srcFs, DstFs: dstFs, @@ -829,3 +870,26 @@ func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remot // in all other cases the remote path is a directory return true, nil } + +func (driver *rclone) extractEndpointInfo(ctx context.Context, targetURL string) (*endpoint, error) { + if targetURL == "" { + return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") + } + + uri, err := url.Parse(targetURL) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) + } + + m, err := url.ParseQuery(uri.RawQuery) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target resource name") + } + + return &endpoint{ + filePath: m["name"][0], + endpoint: uri.Host + uri.Path, + endpointScheme: uri.Scheme, + token: uri.User.String(), + }, nil +}