Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decomposedfs fix revision download #3473

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fced7bd
rewrite finish upload to get atomic size diff
butonic Nov 17, 2022
c58f166
decomposedfs: make finish upload atomic
butonic Nov 17, 2022
150fe7d
add lock functions
butonic Nov 17, 2022
4dae4b6
allow locking non existing files, fix locking with existing lock
butonic Nov 17, 2022
3d0977a
make returned error recognizable
butonic Nov 17, 2022
11fc7c4
more lock fixes
butonic Nov 17, 2022
47f29a1
do not log nil error
butonic Nov 18, 2022
bf30d98
don't overwrite original error when deleting the blob fails
butonic Nov 18, 2022
786d0b3
always release node lock
butonic Nov 18, 2022
b955128
keep correct mtimes
butonic Nov 18, 2022
3db9243
fix adler checksum
butonic Nov 18, 2022
38c1d93
stat before closing
butonic Nov 18, 2022
cb990cd
fix lint ... and proper revision download is not covered by the CS3 api
butonic Nov 18, 2022
57aaca2
fix permissions when downloading grants
butonic Nov 18, 2022
82419a3
update changelog
butonic Nov 18, 2022
0686a7c
fix locks and revision restore
butonic Nov 18, 2022
b162894
assemble permissions on the node when checking a revision
butonic Nov 18, 2022
d9534ef
fix typos
butonic Nov 18, 2022
36b8a84
allow revision download when user has initiate download and list revi…
butonic Nov 18, 2022
c90d8c4
fix reading revision node
butonic Nov 21, 2022
8679103
do not forget revision delimiter
butonic Nov 21, 2022
f2d29bb
drop old revision
butonic Nov 21, 2022
2401c1f
remove unexpected failures
butonic Nov 21, 2022
19ee0cc
update changelog and unexpected passes
butonic Nov 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/decomposedfs-finish-upload-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: decomposedfs fix revision download

We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions.

https://github.com/cs3org/reva/pull/3473
7 changes: 7 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"syscall"

cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
Expand Down Expand Up @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
}

node, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
if md.Metadata != nil {
if val, ok := md.Metadata["mtime"]; ok {
delete(md.Metadata, "mtime")
err := n.SetMtime(ctx, val)
if err != nil {
if err := n.SetMtimeString(val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set mtime"))
}
}
Expand All @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
// TODO unset when folder is updated or add timestamp to etag?
if val, ok := md.Metadata["etag"]; ok {
delete(md.Metadata, "etag")
err := n.SetEtag(ctx, val)
if err != nil {
if err := n.SetEtag(ctx, val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set etag"))
}
}
Expand Down
40 changes: 22 additions & 18 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,25 +495,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(ctx context.Context, mtime string) error {
sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger()
if mt, err := parseMTime(mtime); err == nil {
nodePath := n.InternalPath()
// updating mtime also updates atime
if err := os.Chtimes(nodePath, mt, mt); err != nil {
sublog.Error().Err(err).
Time("mtime", mt).
Msg("could not set mtime")
return errors.Wrap(err, "could not set mtime")
}
} else {
sublog.Error().Err(err).
Str("mtime", mtime).
Msg("could not parse mtime")
return errors.Wrap(err, "could not parse mtime")
// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string
func (n *Node) SetMtimeString(mtime string) error {
mt, err := parseMTime(mtime)
if err != nil {
return err
}
return nil
return n.SetMtime(mt)
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(mtime time.Time) error {
nodePath := n.InternalPath()
// updating mtime also updates atime
return os.Chtimes(nodePath, mtime, mtime)
}

// SetEtag sets the temporary etag of a node if it differs from the current etag
Expand Down Expand Up @@ -929,6 +924,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) {
return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10))
}

// GetBlobSize reads the blobsize from the extended attributes
func (n *Node) GetBlobSize() (treesize uint64, err error) {
var b string
if b, err = n.Xattr(xattrs.TreesizeAttr); err != nil {
return
}
return strconv.ParseUint(b, 10, 64)
}

// SetChecksum writes the checksum with the given checksum type to the extended attributes
func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) {
return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil)))
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ var _ = Describe("Node", func() {
n, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())

blobsize := 239485734
blobsize := int64(239485734)
n.Name = "TestName"
n.BlobID = "TestBlobID"
n.Blobsize = int64(blobsize)
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata()
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expect(n2.Name).To(Equal("TestName"))
Expect(n2.BlobID).To(Equal("TestBlobID"))
Expect(n2.Blobsize).To(Equal(int64(blobsize)))
Expect(n2.Blobsize).To(Equal(blobsize))
})
})

Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/utils/decomposedfs/node/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package node

import (
"context"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -91,6 +93,16 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov
return NoPermissions(), nil
}

if strings.Contains(n.ID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2)
if len(kp) != 2 {
return NoPermissions(), errtypes.NotFound(n.ID)
}
// use the actual node for the permission assembly
n.ID = kp[0]
}

// check if the current user is the owner
if utils.UserIDEqual(u.Id, n.Owner()) {
return OwnerPermissions(), nil
Expand Down
16 changes: 15 additions & 1 deletion pkg/storage/utils/decomposedfs/node/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package node

import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/gofrs/flock"
"github.com/pkg/xattr"
)

Expand All @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) {
return xattrs.SetMultiple(n.InternalPath(), attribs)
}

// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock
func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) {
// TODO what if writing the lock fails?
if n.xattrsCache != nil {
for k, v := range attribs {
n.xattrsCache[k] = v
}
}

return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock)
}

// SetXattr sets an extended attribute on the write-through cache/node
func (n *Node) SetXattr(key, val string) (err error) {
if n.xattrsCache != nil {
Expand Down Expand Up @@ -80,5 +93,6 @@ func (n *Node) Xattr(key string) (string, error) {
if val, ok := n.xattrsCache[key]; ok {
return val, nil
}
return "", xattr.ENOATTR
// wrap the error as xattr does
return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR}
}
59 changes: 46 additions & 13 deletions pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package decomposedfs
import (
"context"
"io"
iofs "io/fs"
"os"
"path/filepath"
"strings"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
}
blobSize, err := node.ReadBlobSizeAttr(items[i])
if err != nil {
return nil, errors.Wrapf(err, "error reading blobsize xattr")
appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0")
}
rev.Size = uint64(blobSize)
etag, err := node.CalculateEtag(np, mtime)
Expand All @@ -99,6 +99,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
}

// DownloadRevision returns a reader for the specified revision
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) {
log := appctx.GetLogger(ctx)

Expand Down Expand Up @@ -135,14 +136,18 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe

contentPath := fs.lu.InternalPath(spaceID, revisionKey)

r, err := os.Open(contentPath)
blobid, err := node.ReadBlobIDAttr(contentPath)
if err != nil {
if errors.Is(err, iofs.ErrNotExist) {
return nil, errtypes.NotFound(contentPath)
}
return nil, errors.Wrap(err, "Decomposedfs: error opening revision "+revisionKey)
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey)
}

revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid}

reader, err := fs.tp.ReadBlob(&revisionNode)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey)
}
return r, nil
return reader, nil
}

// RestoreRevision restores the specified revision of the resource
Expand Down Expand Up @@ -194,17 +199,45 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))

err = os.Rename(nodePath, versionsPath)
// touch version node
if file, err := os.Create(versionsPath); err != nil {
return err
} else if err := file.Close(); err != nil {
return err
}

// copy blob metadata to version node
err = xattrs.CopyMetadata(nodePath, versionsPath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums
attributeName == xattrs.BlobIDAttr ||
attributeName == xattrs.BlobsizeAttr
})
if err != nil {
return
return errtypes.InternalError("failed to copy blob xattrs to version node")
}

// keep mtime from previous version
if err := os.Chtimes(versionsPath, fi.ModTime(), fi.ModTime()); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}

// copy old revision to current location
// update blob id in node

// copy blob metadata from revision to node
revisionPath := fs.lu.InternalPath(spaceID, revisionKey)
err = xattrs.CopyMetadata(revisionPath, nodePath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) ||
attributeName == xattrs.BlobIDAttr ||
attributeName == xattrs.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node")
}

if err = os.Rename(revisionPath, nodePath); err != nil {
return
// explicitly update mtime of node as writing xattrs does not change mtime
now := time.Now()
if err := os.Chtimes(nodePath, now, now); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}

return fs.tp.Propagate(ctx, n)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error {

// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) {
sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger()
sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if !t.treeTimeAccounting && !t.treeSizeAccounting {
// no propagation enabled
sublog.Debug().Msg("propagation disabled")
Expand All @@ -713,7 +713,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) {
break
}

sublog = sublog.With().Interface("node", n).Logger()
sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()

// TODO none, sync and async?
if !n.HasPropagation() {
Expand Down Expand Up @@ -757,7 +757,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node) (err error) {
}
}

if err := n.UnsetTempEtag(); err != nil {
if err := n.UnsetTempEtag(); err != nil && !xattrs.IsAttrUnset(err) {
sublog.Error().Err(err).Msg("could not remove temporary etag attribute")
}
}
Expand Down
Loading