Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ocdav: upload file to storage provider after assembling chunks #1253

Merged
merged 9 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/ocdav-putchunked.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: Upload file to storage provider after assembling chunks

In the PUT handler for chunked uploads in ocdav, we store the individual
chunks in temporary file but do not write the assembled file to storage.
This PR fixes that.

https://github.com/cs3org/reva/pull/1253
12 changes: 12 additions & 0 deletions cmd/reva/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/c-bata/go-prompt"
"github.com/cs3org/reva/pkg/rhttp"
)

var (
Expand All @@ -37,6 +40,8 @@ var (

gitCommit, buildDate, version, goVersion string

client *http.Client

commands = []*command{
versionCommand(),
configureCommand(),
Expand Down Expand Up @@ -94,6 +99,13 @@ func main() {
}
}

client = rhttp.GetHTTPClient(
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)

generateMainUsage()
executor := Executor{Timeout: timeout}
completer := Completer{DisableArgPrompt: disableargprompt}
Expand Down
45 changes: 5 additions & 40 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/cs3org/reva/internal/http/services/datagateway"
"github.com/pkg/errors"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand Down Expand Up @@ -84,7 +82,6 @@ func uploadCommand() *command {
if err != nil {
return err
}
defer fd.Close()

fmt.Printf("Local file size: %d bytes\n", md.Size())

Expand Down Expand Up @@ -149,14 +146,9 @@ func uploadCommand() *command {

dataServerURL := res.UploadEndpoint

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

if *disableTusFlag {
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader)
httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, fd)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -166,38 +158,21 @@ func uploadCommand() *command {
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
httpReq.URL.RawQuery = q.Encode()

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)

httpRes, err := httpClient.Do(httpReq)
httpRes, err := client.Do(httpReq)
if err != nil {
bar.Finish()
return err
}
defer httpRes.Body.Close()
if httpRes.StatusCode != http.StatusOK {
bar.Finish()
return err
}
} else {
// create the tus client.
c := tus.DefaultConfig()
c.Resume = true
c.HttpClient = rhttp.GetHTTPClient(
rhttp.Context(ctx),
// TODO make insecure configurable
rhttp.Insecure(true),
// TODO make timeout configurable
rhttp.Timeout(time.Duration(24*int64(time.Hour))),
)
c.HttpClient = client
c.Store, err = memorystore.NewMemoryStore()
if err != nil {
bar.Finish()
return err
}
if token, ok := tokenpkg.ContextGetToken(ctx); ok {
Expand All @@ -208,7 +183,6 @@ func uploadCommand() *command {
}
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
bar.Finish()
return err
}

Expand All @@ -221,7 +195,7 @@ func uploadCommand() *command {
fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs)

// create an upload from a file.
upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint)
upload := tus.NewUpload(fd, md.Size(), metadata, fingerprint)

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
Expand All @@ -230,13 +204,10 @@ func uploadCommand() *command {
// start the uploading process.
err = uploader.Upload()
if err != nil {
bar.Finish()
return err
}
}

bar.Finish()

req2 := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Expand Down Expand Up @@ -291,21 +262,15 @@ func checkUploadWebdavRef(endpoint string, opaque *typespb.Opaque, md os.FileInf
return errors.New("opaque entry decoder not recognized: " + fileOpaque.Decoder)
}

bar := pb.New(int(md.Size())).SetUnits(pb.U_BYTES)
bar.Start()
reader := bar.NewProxyReader(fd)

c := gowebdav.NewClient(endpoint, "", "")
c.SetHeader(tokenpkg.TokenHeader, token)
c.SetHeader("Upload-Length", strconv.FormatInt(md.Size(), 10))

err := c.WriteStream(filePath, reader, 0700)
err := c.WriteStream(filePath, fd, 0700)
if err != nil {
bar.Finish()
return err
}

bar.Finish()
fmt.Println("File uploaded")
return nil
}
Expand Down
20 changes: 6 additions & 14 deletions internal/grpc/services/appprovider/appprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func init() {

type service struct {
provider app.Provider
client *http.Client
conf *config
}

Expand Down Expand Up @@ -79,6 +80,9 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
service := &service{
conf: c,
provider: provider,
client: rhttp.GetHTTPClient(
rhttp.Timeout(5 * time.Second),
),
}

return service, nil
Expand Down Expand Up @@ -120,12 +124,6 @@ func getProvider(c *config) (app.Provider, error) {
}

func (s *service) getWopiAppEndpoints(ctx context.Context) (map[string]interface{}, error) {
httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// calls to WOPI are expected to take a very short time, 5s (though hardcoded) ought to be far enough
rhttp.Timeout(time.Duration(5*int64(time.Second))),
)

// TODO this query will eventually be served by Reva.
// For the time being it is a remnant of the CERNBox-specific WOPI server, which justifies the /cbox path in the URL.
wopiurl, err := url.Parse(s.conf.WopiURL)
Expand All @@ -137,7 +135,7 @@ func (s *service) getWopiAppEndpoints(ctx context.Context) (map[string]interface
if err != nil {
return nil, err
}
appsRes, err := httpClient.Do(appsReq)
appsRes, err := s.client.Do(appsReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -165,12 +163,6 @@ func (s *service) OpenFileInAppProvider(ctx context.Context, req *providerpb.Ope

log := appctx.GetLogger(ctx)

httpClient := rhttp.GetHTTPClient(
rhttp.Context(ctx),
// calls to WOPI are expected to take a very short time, 5s (though hardcoded) ought to be far enough
rhttp.Timeout(time.Duration(5*int64(time.Second))),
)

wopiurl, err := url.Parse(s.conf.WopiURL)
if err != nil {
return nil, err
Expand Down Expand Up @@ -203,7 +195,7 @@ func (s *service) OpenFileInAppProvider(ctx context.Context, req *providerpb.Ope

httpReq.URL.RawQuery = q.Encode()

openRes, err := httpClient.Do(httpReq)
openRes, err := s.client.Do(httpReq)

if err != nil {
res := &providerpb.OpenFileInAppProviderResponse{
Expand Down
5 changes: 4 additions & 1 deletion internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ type transferClaims struct {
}

func (s *svc) sign(_ context.Context, target string) (string, error) {
ttl := time.Duration(s.c.TransferExpires) * time.Second
// Tus sends a separate request to the datagateway service for every chunk.
// For large files, this can take a long time, so we extend the expiration
// for 10 minutes. TODO: Make this configurable.
ttl := time.Duration(s.c.TransferExpires) * 10 * time.Minute
claims := transferClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(ttl).Unix(),
Expand Down
17 changes: 11 additions & 6 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func init() {
}

type config struct {
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
Prefix string `mapstructure:"prefix" docs:"data;The prefix to be used for this HTTP service"`
Driver string `mapstructure:"driver" docs:"localhome;The storage driver to be used."`
Drivers map[string]map[string]interface{} `mapstructure:"drivers" docs:"url:pkg/storage/fs/localhome/localhome.go;The configuration for the storage driver"`
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure"`
DisableTus bool `mapstructure:"disable_tus" docs:"false;Whether to disable TUS uploads."`
TempDirectory string `mapstructure:"temp_directory"`
}

func (c *config) init() {
Expand All @@ -53,6 +54,10 @@ func (c *config) init() {
c.Driver = "localhome"
}

if c.TempDirectory == "" {
c.TempDirectory = "/var/tmp/reva/tmp"
}

}

type svc struct {
Expand Down
17 changes: 16 additions & 1 deletion internal/http/services/dataprovider/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package dataprovider

import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -73,6 +76,18 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {
}
}

fd, err := ioutil.TempFile(fmt.Sprintf("/%s", s.conf.TempDirectory), "")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer os.RemoveAll(fd.Name())
defer fd.Close()
if _, err := io.Copy(fd, r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

dataServerURL := fmt.Sprintf("http://%s%s", r.Host, r.RequestURI)

// create the tus client.
Expand Down Expand Up @@ -102,7 +117,7 @@ func (s *svc) doTusPut(w http.ResponseWriter, r *http.Request) {
"dir": path.Dir(fp),
}

upload := tus.NewUpload(r.Body, length, metadata, "")
upload := tus.NewUpload(fd, length, metadata, "")
defer r.Body.Close()

// create the uploader.
Expand Down
17 changes: 14 additions & 3 deletions internal/http/services/owncloud/ocdav/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"path"
"strings"

Expand Down Expand Up @@ -293,16 +294,26 @@ func (s *svc) descend(ctx context.Context, client gateway.GatewayAPIClient, src
return fmt.Errorf("status code %d", httpDownloadRes.StatusCode)
}

fileName, fd, err := s.createChunkTempFile()
Copy link
Contributor

@butonic butonic Oct 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above ... this copies the file to disk before sending it out again ... just read from the body

if err != nil {
return err
}
defer os.RemoveAll(fileName)
defer fd.Close()
if _, err := io.Copy(fd, httpDownloadRes.Body); err != nil {
return err
}

// do upload
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, httpDownloadRes.Body, src.GetSize())
err = s.tusUpload(ctx, uRes.UploadEndpoint, uRes.Token, dst, fd, int64(src.GetSize()))
if err != nil {
return err
}
}
return nil
}

func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.Reader, length uint64) error {
func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken string, fn string, body io.ReadSeeker, length int64) error {
var err error
log := appctx.GetLogger(ctx)

Expand Down Expand Up @@ -339,7 +350,7 @@ func (s *svc) tusUpload(ctx context.Context, dataServerURL string, transferToken
Str("dir", path.Dir(fn)).
Msg("tus.NewUpload")

upload := tus.NewUpload(body, int64(length), metadata, "")
upload := tus.NewUpload(body, length, metadata, "")

// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/cs3org/reva/pkg/utils"
)

func (s *svc) handleGet(w http.ResponseWriter, r *http.Request, ns string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/utils"
)

func (s *svc) handleHead(w http.ResponseWriter, r *http.Request, ns string) {
Expand Down
2 changes: 1 addition & 1 deletion internal/http/services/owncloud/ocdav/propfind.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/internal/http/services/owncloud/ocs/conversions"
"github.com/cs3org/reva/internal/http/utils"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/utils"
"github.com/pkg/errors"
)

Expand Down
Loading