Skip to content

Commit

Permalink
set node status only through node pkg
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <jkoberg@owncloud.com>
  • Loading branch information
kobergj committed Jun 24, 2022
1 parent 0c56367 commit c2b518c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 27 deletions.
9 changes: 5 additions & 4 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,16 +504,17 @@ func (fs *Decomposedfs) GetMD(ctx context.Context, ref *provider.Reference, mdKe
return
}

v, _ := node.GetMetadata("user.ocis.nodestatus")
if v != "processing" && !node.Exists {
isprocessed := node.IsProcessed()
if !isprocessed && !node.Exists {
err = errtypes.NotFound(filepath.Join(node.ParentID, node.Name))
return
}

rp, err := fs.p.AssemblePermissions(ctx, node)
switch {
case v == "processing":
// TODO: check permissions also for files while processing
case isprocessed:
// FIXME: how to check permissions for files while processing?
// the node is empty and holds no further information
case err != nil:
return nil, errtypes.InternalError(err.Error())
case !rp.Stat:
Expand Down
18 changes: 17 additions & 1 deletion pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi
ParentId: parentID,
}

if v, _ := n.GetMetadata("user.ocis.nodestatus"); v == "processing" {
if n.IsProcessed() {
ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "status", "processing")
return ri, nil
}
Expand Down Expand Up @@ -1116,6 +1116,22 @@ func (n *Node) FindStorageSpaceRoot() error {
return nil
}

// MarkProcessing marks the node as being processed
func (n *Node) MarkProcessing() error {
return n.SetMetadata("user.ocis.nodestatus", "processing")
}

// UnmarkProcessing removes the processing flag from the node
func (n *Node) UnmarkProcessing() error {
return n.RemoveMetadata("user.ocis.nodestatus")
}

// IsProcessed returns true if the node is currently being processed
func (n *Node) IsProcessed() bool {
v, err := n.GetMetadata("user.ocis.nodestatus")
return err == nil && v == "processing"
}

// IsSpaceRoot checks if the node is a space root
func IsSpaceRoot(r *Node) bool {
path := r.InternalPath()
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/utils/decomposedfs/upload/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,24 @@ func configurePostprocessing(upload *Upload, o options.PostprocessingOptions) po

// set processing status
upload.node = n
return upload.node.SetMetadata("user.ocis.nodestatus", "processing")
return upload.node.MarkProcessing()
}, nil),
postprocessing.NewStep("assembling", upload.finishUpload, upload.cleanup, "initialize"),
},
WaitFor: waitfor,
Finish: func(m map[string]error) {
for alias, err := range m {
if err != nil {
upload.log.Info().Str("step", alias).Err(err).Msg("postprocessing failed")
upload.log.Info().Str("ID", upload.Info.ID).Str("step", alias).Err(err).Msg("postprocessing failed")
}

}

if upload.node != nil && !o.LockProcessing {
// unset processing status
_ = upload.node.RemoveMetadata("user.ocis.nodestatus")
if err := upload.node.UnmarkProcessing(); err != nil {
upload.log.Info().Str("path", upload.node.InternalPath()).Err(err).Msg("unmarking processing failed")
}
}
},
}
Expand Down
52 changes: 33 additions & 19 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ func (upload *Upload) finishUpload() (err error) {
return errors.New("need node to finish upload")
}

_ = xattrs.Set(upload.binPath, "user.ocis.nodestatus", "processing")

spaceID := upload.Info.Storage["SpaceRoot"]
targetPath := n.InternalPath()
sublog := appctx.GetLogger(upload.Ctx).
Expand Down Expand Up @@ -274,6 +272,12 @@ func (upload *Upload) finishUpload() (err error) {
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))

// we unset the processing flag before moving
// NOTE: this leads to a minimal time the node has no processing flag
if err := n.UnmarkProcessing(); err != nil {
sublog.Error().Err(err).Msg("Decomposedfs: could not unmark processing")
return err
}
// This move drops all metadata!!! We copy it below with CopyMetadata
// FIXME the node must remain the same. otherwise we might restore share metadata
if err = os.Rename(targetPath, versionsPath); err != nil {
Expand All @@ -284,20 +288,20 @@ func (upload *Upload) finishUpload() (err error) {
return err
}

// NOTE: In case there is an existing version we have
// - a processing flag on the version
// - a processing flag on the binPath
// - NO processing flag on the targetPath, as we just moved that file
// so we remove the processing flag from version,
_ = xattrs.Remove(versionsPath, "user.ocis.nodestatus")
// create an empty file instead,
_, _ = os.Create(targetPath)
// and set the processing flag on this
_ = xattrs.Set(targetPath, "user.ocis.nodestatus", "processing")
// TODO: that means that there is a short amount of time when there is no targetPath
// NOTE: In case there is an existing version we have no processing flag on the targetPath,
// as we just moved that file. We need to create an empty file again
// TODO: that means that there is a short amount of time when there is no targetPath or no processing flag
// If clients query in exactly that moment the file will be gone from their PROPFIND
// How can we omit this issue? How critical is it?

if _, err := os.Create(targetPath); err != nil {
sublog.Error().Err(err).Msg("Decomposedfs: could not create file")
return err
}
// and set the processing flag on this
if err := n.MarkProcessing(); err != nil {
sublog.Error().Err(err).Msg("Decomposedfs: could not mark processing")
return err
}
}

// upload the data to the blobstore
Expand All @@ -322,6 +326,11 @@ func (upload *Upload) finishUpload() (err error) {
sublog.Error().Err(err).Msg("Decomposedfs: could not rename")
return err
}
// the rename dropped the "processing" status - we need to set it again
if err := n.MarkProcessing(); err != nil {
sublog.Error().Err(err).Msg("Decomposedfs: could not mark processing")
return err
}
if versionsPath != "" {
// copy grant and arbitrary metadata
// FIXME ... now restoring an older revision might bring back a grant that was removed!
Expand Down Expand Up @@ -402,23 +411,28 @@ func (upload *Upload) checkHash(expected string, h hash.Hash) error {
}

// cleanup cleans up after the upload is finished
// TODO: error handling?
func (upload *Upload) cleanup(err error) {
if upload.node != nil {
// NOTE: this should not be part of the upload. The upload doesn't know
// when the processing is finshed. It just cares about the actual upload
// However, when not removing it here the testsuite will fail as it
// can't handle processing status at the moment.
// TODO: adjust testsuite, remove this if case and adjust PostProcessing to not wait for "assembling"
_ = upload.node.RemoveMetadata("user.ocis.nodestatus")
_ = upload.node.UnmarkProcessing()
}

if upload.node != nil && err != nil && upload.oldsize == nil {
_ = utils.RemoveItem(upload.node.InternalPath())
if err := utils.RemoveItem(upload.node.InternalPath()); err != nil {
upload.log.Info().Str("path", upload.node.InternalPath()).Err(err).Msg("removing node failed")
}
}

_ = os.Remove(upload.binPath)
_ = os.Remove(upload.infoPath)
if err := os.Remove(upload.binPath); err != nil {
upload.log.Error().Str("path", upload.binPath).Err(err).Msg("removing upload failed")
}
if err := os.Remove(upload.infoPath); err != nil {
upload.log.Error().Str("path", upload.infoPath).Err(err).Msg("removing upload info failed")
}
}

func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) {
Expand Down

0 comments on commit c2b518c

Please sign in to comment.