Skip to content

Commit

Permalink
Fixes for data transfer (tus and non-tus)
Browse files Browse the repository at this point in the history
  • Loading branch information
ishank011 authored Oct 27, 2020
1 parent 7c9a6b0 commit acf5ed6
Show file tree
Hide file tree
Showing 29 changed files with 194 additions and 270 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/ocdav-putchunked.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: Upload file to storage provider after assembling chunks

In the PUT handler for chunked uploads in ocdav, we store the individual
chunks in temporary file but do not write the assembled file to storage.
This PR fixes that.

https://github.com/cs3org/reva/pull/1253
12 changes: 12 additions & 0 deletions cmd/reva/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/c-bata/go-prompt"
"github.com/cs3org/reva/pkg/rhttp"
)

var (
Expand All @@ -37,6 +40,8 @@ var (

gitCommit, buildDate, version, goVersion string

client *http.Client

commands = []*command{
versionCommand(),
configureCommand(),
Expand Down Expand Up @@ -94,6 +99,13 @@ func main() {
}
}

client = rhttp.GetHTTPClient(
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)

generateMainUsage()
executor := Executor{Timeout: timeout}
completer := Completer{DisableArgPrompt: disableargprompt}
Expand Down
45 changes: 5 additions & 40 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/pkg/errors"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand Down Expand Up @@ -84,7 +82,6 @@ func uploadCommand() *command {
if err != nil {
return err
}
defer fd.Close()

fmt.Printf("Local file size: %d bytes\n", md.Size())

Expand Down Expand Up @@ -149,14 +146,9 @@ func uploadCommand() *command {

dataServerURL := res.UploadEndpoint

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

if *disableTusFlag {
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader)
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -166,38 +158,21 @@ func uploadCommand() *command {
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
httpReq.URL.RawQuery = q.Encode()

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)

httpRes, err := httpClient.Do(httpReq)
httpRes, err := client.Do(httpReq)
if err != nil {
bar.Finish()
return err
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
bar.Finish()
return err
}
} else {
// create the tus client.
c := tus.DefaultConfig()
c.Resume = true
c.HttpClient = rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)
c.HttpClient = client
c.Store, err = memorystore.NewMemoryStore()
if err != nil {
bar.Finish()
return err
}
if token, ok := tokenpkg.ContextGetToken(ctx); ok {
Expand All @@ -208,7 +183,6 @@ func uploadCommand() *command {
}
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -221,7 +195,7 @@ func uploadCommand() *command {
fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs)

// create an upload from a file.
upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint)
upload := tus.NewUpload(fd, md.Size(), metadata, fingerprint)

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
Expand All @@ -230,13 +204,10 @@ func uploadCommand() *command {
// start the uploading process.
err = uploader.Upload()
if err != nil {
bar.Finish()
return err
}
}

bar.Finish()

req2 := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Expand Down Expand Up @@ -291,21 +262,15 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

c := gowebdav.NewClient(endpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)
c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10))

err := c.WriteStream(filePath, reader, 0700)
err := c.WriteStream(filePath, fd, 0700)
if err != nil {
bar.Finish()
return err
}

bar.Finish()
fmt.Println("File uploaded")
return nil
}
Expand Down
20 changes: 6 additions & 14 deletions internal/grpc/services/appprovider/appprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func init() {

type service struct {
provider app.Provider
client *http.Client
conf *config
}

Expand Down Expand Up @@ -79,6 +80,9 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
service := &service{
conf: c,
provider: provider,
client: rhttp.GetHTTPClient(
rhttp.Timeout(5 * time.Second),
),
}

return service, nil
Expand Down Expand Up @@ -120,12 +124,6 @@ func getProvider(c *config) (app.Provider, error) {
}

func (s *service) getWopiAppEndpoints(ctx context.Context) (map[string]interface{}, error) {
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// calls to WOPI are expected to take a very short time, 5s (though hardcoded) ought to be far enough
rhttp.Timeout(time.Duration(5*int64(time.Second))),
)

// TODO this query will eventually be served by Reva.
// For the time being it is a remnant of the CERNBox-specific WOPI server, which justifies the /cbox path in the URL.
wopiurl, err := url.Parse(s.conf.WopiURL)
Expand All @@ -137,7 +135,7 @@ func (s *service) getWopiAppEndpoints(ctx context.Context) (map[string]interface
if err != nil {
return nil, err
}
appsRes, err := httpClient.Do(appsReq)
appsRes, err := s.client.Do(appsReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,12 +163,6 @@ func (s *service) OpenFileInAppProvider(ctx context.Context, req *providerpb.Ope

log := appctx.GetLogger(ctx)

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// calls to WOPI are expected to take a very short time, 5s (though hardcoded) ought to be far enough
rhttp.Timeout(time.Duration(5*int64(time.Second))),
)

wopiurl, err := url.Parse(s.conf.WopiURL)
if err != nil {
return nil, err
Expand Down Expand Up @@ -203,7 +195,7 @@ func (s *service) OpenFileInAppProvider(ctx context.Context, req *providerpb.Ope

httpReq.URL.RawQuery = q.Encode()

openRes, err := httpClient.Do(httpReq)
openRes, err := s.client.Do(httpReq)

if err != nil {
res := &providerpb.OpenFileInAppProviderResponse{
Expand Down
5 changes: 4 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ type transferClaims struct {
}

func (s *svc) sign(_ context.Context, target string) (string, error) {
ttl := time.Duration(s.c.TransferExpires) * time.Second
// Tus sends a separate request to the datagateway service for every chunk.
// For large files, this can take a long time, so we extend the expiration
// for 10 minutes. TODO: Make this configurable.
ttl := time.Duration(s.c.TransferExpires) * 10 * time.Minute
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
Expand Down
17 changes: 11 additions & 6 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
TempDirectory string `mapstructure:"temp_directory"`
}

func (c *config) init() {
Expand All @@ -53,6 +54,10 @@ func (c *config) init() {
c.Driver = "localhome"
}

if c.TempDirectory == "" {
c.TempDirectory = "/var/tmp/reva/tmp"
}

}

type svc struct {
Expand Down
17 changes: 16 additions & 1 deletion internal/http/services/dataprovider/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package dataprovider

import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -73,6 +76,18 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {
}
}

fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer os.RemoveAll(fd.Name())
defer fd.Close()
if _, err := io.Copy(fd, r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI)

// create the tus client.
Expand Down Expand Up @@ -102,7 +117,7 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {
"dir": path.Dir(fp),
}

upload := tus.NewUpload(r.Body, length, metadata, "")
upload := tus.NewUpload(fd, length, metadata, "")
defer r.Body.Close()

// create the uploader.
Expand Down
17 changes: 14 additions & 3 deletions internal/http/services/owncloud/ocdav/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"path"
"strings"

Expand Down Expand Up @@ -293,16 +294,26 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src
return fmt.Errorf("status code %d", httpDownloadRes.StatusCode)
}

fileName, fd, err := s.createChunkTempFile()
if err != nil {
return err
}
defer os.RemoveAll(fileName)
defer fd.Close()
if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil {
return err
}

// do upload
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, httpDownloadRes.Body, src.GetSize())
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize()))
if err != nil {
return err
}
}
return nil
}

func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.Reader, length uint64) error {
func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error {
var err error
log := appctx.GetLogger(ctx)

Expand Down Expand Up @@ -339,7 +350,7 @@ func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken
Str("dir", path.Dir(fn)).
Msg("tus.NewUpload")

upload := tus.NewUpload(body, int64(length), metadata, "")
upload := tus.NewUpload(body, length, metadata, "")

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/cs3org/reva/pkg/utils"
)

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/utils"
)

func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/conversions"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/utils"
"github.com/pkg/errors"
)

Expand Down
Loading

0 comments on commit acf5ed6

Please sign in to comment.