Skip to content

Commit

Permalink
Rclone third-party copy push (#3491)
Browse files Browse the repository at this point in the history
* Transfer method depends on implementation or configuration.

* Support rclone tpc push.

* Support tpc push with different accounts at src and dest.

* Simplify; don't touch the target URIs.

* Support rclone tpc push.

* Enhancement: rclone tpc push

* Fix comment

* Configurable auth scheme for dest fs

* Method name changed and documented

Co-authored-by: Antoon P <antoon@redblom.com>
  • Loading branch information
redblom and antoonp authored Dec 2, 2022
1 parent 44329a5 commit 396dd74
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 72 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/rclone-tpc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: implement rclone third-party copy push option

This enhancement gives the option to use third-party copy push with rclone between two different user accounts.

https://github.com/cs3org/reva/pull/3491
39 changes: 28 additions & 11 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
# example data transfer service configuration
# Example data transfer service configuration
[grpc.services.datatx]
# rclone is the default data transfer driver
# Rclone is the default data transfer driver
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (default: /home/DataTransfers)
# Base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# rclone data transfer driver
# Rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
# Rclone endpoint
endpoint = "http://..."
# basic auth is used
# Basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
# Check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
# The job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# Rclone supports third-party copy push; for that to work with reva enable this setting
enable_http_tpc = true
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
52 changes: 2 additions & 50 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package datatx
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"sync"

Expand Down Expand Up @@ -77,13 +75,6 @@ type txShare struct {
Opaque *types.Opaque `json:"opaque"`
}

type webdavEndpoint struct {
filePath string
endpoint string
endpointScheme string
token string
}

func (c *config) init() {
if c.TxDriver == "" {
c.TxDriver = "rclone"
Expand Down Expand Up @@ -156,23 +147,7 @@ func (s *service) UnprotectedEndpoints() []string {
}

func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri)
if err != nil {
return nil, err
}
srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint)
srcPath := srcEp.filePath
srcToken := srcEp.token

destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri)
if err != nil {
return nil, err
}
dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint)
dstPath := destEp.filePath
dstToken := destEp.token

txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri)

// we always save the transfer regardless of start transfer outcome
// only then, if starting fails, can we try to restart it
Expand Down Expand Up @@ -205,7 +180,7 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ
return &datatx.PullTransferResponse{
Status: status.NewOK(ctx),
TxInfo: txInfo,
}, err
}, nil
}

func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) {
Expand Down Expand Up @@ -307,29 +282,6 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe
}, nil
}

func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) {
if targetURL == "" {
return nil, errtypes.BadRequest("datatx service: ref target is an empty uri")
}

uri, err := url.Parse(targetURL)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL)
}

m, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target resource name")
}

return &webdavEndpoint{
filePath: m["name"][0],
endpoint: uri.Host + uri.Path,
endpointScheme: uri.Scheme,
token: uri.User.String(),
}, nil
}

func loadOrCreate(file string) (*txShareModel, error) {
_, err := os.Stat(file)
if os.IsNotExist(err) {
Expand Down
7 changes: 6 additions & 1 deletion internal/http/services/owncloud/ocdav/ocdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ type Config struct {
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure" docs:"false;Whether to skip certificate checks when sending requests."`
// If true, HTTP COPY will expect the HTTP-TPC (third-party copy) headers
EnableHTTPTpc bool `mapstructure:"enable_http_tpc"`
EnableHTTPTpc bool `mapstructure:"enable_http_tpc"`
// The authentication scheme to use for the tpc push call when userinfo part is specified in the Destination header uri. Default value is 'bearer'.
// Possible values:
// "bearer" results in header: Authorization: Bearer ...token...
// "x-access-token": results in header: X-Access-Token: ...token...
HTTPTpcPushAuthHeader string `mapstructure:"http_tpc_push_auth_header"`
PublicURL string `mapstructure:"public_url"`
FavoriteStorageDriver string `mapstructure:"favorite_storage_driver"`
FavoriteStorageDrivers map[string]map[string]interface{} `mapstructure:"favorite_storage_drivers"`
Expand Down
37 changes: 34 additions & 3 deletions internal/http/services/owncloud/ocdav/tpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -392,9 +393,28 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie
return err
}

// add authentication header and content length
bearerHeader := r.Header.Get(HeaderTransferAuth)
req.Header.Add("Authorization", bearerHeader)
// Check if there is userinfo to be found in the destination URI
// This should be the token to use in the HTTP push call
userInfo, err := s.extractUserInfo(ctx, dst)
if err != nil {
sublog.Debug().Msgf("tpc push: error: %v", err)
}
if len(userInfo) > 0 {
sublog.Debug().Msg("tpc push: userinfo part found in destination url, using userinfo as token for the HTTP push request authorization header")
if s.c.HTTPTpcPushAuthHeader == "x-access-token" {
req.Header.Add(s.c.HTTPTpcPushAuthHeader, userInfo)
sublog.Debug().Msgf("tpc push: using authentication scheme: %v", s.c.HTTPTpcPushAuthHeader)
} else { // Bearer is the default
req.Header.Add("Authorization", "Bearer "+userInfo)
sublog.Debug().Msg("tpc push: using authentication scheme: bearer")
}
} else {
sublog.Debug().Msg("tpc push: no userinfo part found in destination url, using token from the COPY request authorization header")
// add authorization header; single token tpc
bearerHeader := r.Header.Get(HeaderTransferAuth)
req.Header.Add("Authorization", bearerHeader)
}
// add content length
req.ContentLength = int64(srcInfo.GetSize())

// do Upload
Expand All @@ -412,3 +432,14 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie

return nil
}

// Extracts and returns the userinfo part of the specified target URL (https://[userinfo]@www.example.com:123/...path).
// Returns an empty string if no userinfo part is found.
func (s *svc) extractUserInfo(ctx context.Context, targetURL string) (string, error) {
parsedURL, err := url.Parse(targetURL)
if err != nil {
return "", errtypes.BadRequest("tpc: error extracting userinfo part - error parsing url: " + targetURL)
}

return parsedURL.User.String(), nil
}
5 changes: 3 additions & 2 deletions pkg/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (

// Manager the interface any transfer driver should implement.
type Manager interface {
// StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any.
StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error)
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error)
// GetTransferStatus returns a TxInfo object including the current status, and error if any.
GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error)
// CancelTransfer cancels the transfer and returns a TxInfo object and error if any.
Expand Down
74 changes: 69 additions & 5 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cs3org/reva/pkg/appctx"
txdriver "github.com/cs3org/reva/pkg/datatx"
registry "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -64,6 +65,7 @@ type config struct {
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Expand Down Expand Up @@ -118,6 +120,13 @@ var txEndStatuses = map[string]int32{
"STATUS_TRANSFER_EXPIRED": 10,
}

type endpoint struct {
filePath string
endpoint string
endpointScheme string
token string
}

// New returns a new rclone driver.
func New(m map[string]interface{}) (txdriver.Manager, error) {
c, err := parseConfig(m)
Expand Down Expand Up @@ -207,9 +216,32 @@ func (m *transferModel) saveTransfer(e error) error {
return e
}

// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id.
func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) {
return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken)
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) {
logger := appctx.GetLogger(ctx)

srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI)
if err != nil {
return nil, err
}
srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint)
srcPath := srcEp.filePath
srcToken := srcEp.token

destEp, err := driver.extractEndpointInfo(ctx, dstTargetURI)
if err != nil {
return nil, err
}
dstPath := destEp.filePath
dstToken := destEp.token
// we always set the userinfo part of the destination url for rclone tpc push support
dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint)

logger.Debug().Msgf("destination target URI: %v", dstTargetURI)
logger.Debug().Msgf("destination remote: %v", dstRemote)

return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
}

// startJob starts a transfer job. Retries a previous job if transferID is specified.
Expand Down Expand Up @@ -281,8 +313,17 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote
// DstToken string `json:"destToken"`
Async bool `json:"_async"`
}
srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath)
dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath)
// bearer is the default authentication scheme for reva
srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken)
if driver.config.AuthHeader == "x-access-token" {
srcAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", srcToken)
}
srcFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", srcAuthHeader, srcRemote, srcPath)
destAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", destToken)
if driver.config.AuthHeader == "x-access-token" {
destAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", destToken)
}
dstFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", destAuthHeader, destRemote, destPath)
rcloneReq := &rcloneAsyncReqJSON{
SrcFs: srcFs,
DstFs: dstFs,
Expand Down Expand Up @@ -829,3 +870,26 @@ func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remot
// in all other cases the remote path is a directory
return true, nil
}

func (driver *rclone) extractEndpointInfo(ctx context.Context, targetURL string) (*endpoint, error) {
if targetURL == "" {
return nil, errtypes.BadRequest("datatx service: ref target is an empty uri")
}

uri, err := url.Parse(targetURL)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL)
}

m, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target resource name")
}

return &endpoint{
filePath: m["name"][0],
endpoint: uri.Host + uri.Path,
endpointScheme: uri.Scheme,
token: uri.User.String(),
}, nil
}

0 comments on commit 396dd74

Please sign in to comment.