From c2b518c61bea45b11dab273da8e834c9a860d403 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Fri, 24 Jun 2022 16:05:41 +0200 Subject: [PATCH] set node status only through node pkg Signed-off-by: jkoberg --- .../utils/decomposedfs/decomposedfs.go | 9 ++-- pkg/storage/utils/decomposedfs/node/node.go | 18 ++++++- .../decomposedfs/upload/postprocessing.go | 8 +-- .../utils/decomposedfs/upload/upload.go | 52 ++++++++++++------- 4 files changed, 60 insertions(+), 27 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index d76e5ca7e30..16b2e605ef0 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -504,16 +504,17 @@ func (fs *Decomposedfs) GetMD(ctx context.Context, ref *provider.Reference, mdKe return } - v, _ := node.GetMetadata("user.ocis.nodestatus") - if v != "processing" && !node.Exists { + isprocessed := node.IsProcessed() + if !isprocessed && !node.Exists { err = errtypes.NotFound(filepath.Join(node.ParentID, node.Name)) return } rp, err := fs.p.AssemblePermissions(ctx, node) switch { - case v == "processing": - // TODO: check permissions also for files while processing + case isprocessed: + // FIXME: how to check permissions for files while processing? + // the node is empty and holds no further information case err != nil: return nil, errtypes.InternalError(err.Error()) case !rp.Stat: diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 07c5bebb655..75acd83dce1 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -628,7 +628,7 @@ func (n *Node) AsResourceInfo(ctx context.Context, rp *provider.ResourcePermissi ParentId: parentID, } - if v, _ := n.GetMetadata("user.ocis.nodestatus"); v == "processing" { + if n.IsProcessed() { ri.Opaque = utils.AppendPlainToOpaque(ri.Opaque, "status", "processing") return ri, nil } @@ -1116,6 +1116,22 @@ func (n *Node) FindStorageSpaceRoot() error { return nil } +// MarkProcessing marks the node as being processed +func (n *Node) MarkProcessing() error { + return n.SetMetadata("user.ocis.nodestatus", "processing") +} + +// UnmarkProcessing removes the processing flag from the node +func (n *Node) UnmarkProcessing() error { + return n.RemoveMetadata("user.ocis.nodestatus") +} + +// IsProcessed returns true if the node is currently being processed +func (n *Node) IsProcessed() bool { + v, err := n.GetMetadata("user.ocis.nodestatus") + return err == nil && v == "processing" +} + // IsSpaceRoot checks if the node is a space root func IsSpaceRoot(r *Node) bool { path := r.InternalPath() diff --git a/pkg/storage/utils/decomposedfs/upload/postprocessing.go b/pkg/storage/utils/decomposedfs/upload/postprocessing.go index fe9065a0c5d..e1aaaa00bd7 100644 --- a/pkg/storage/utils/decomposedfs/upload/postprocessing.go +++ b/pkg/storage/utils/decomposedfs/upload/postprocessing.go @@ -40,7 +40,7 @@ func configurePostprocessing(upload *Upload, o options.PostprocessingOptions) po // set processing status upload.node = n - return upload.node.SetMetadata("user.ocis.nodestatus", "processing") + return upload.node.MarkProcessing() }, nil), postprocessing.NewStep("assembling", upload.finishUpload, upload.cleanup, "initialize"), }, @@ -48,14 +48,16 @@ func configurePostprocessing(upload *Upload, o options.PostprocessingOptions) po Finish: func(m map[string]error) { for alias, err := range m { if err != nil { - upload.log.Info().Str("step", alias).Err(err).Msg("postprocessing failed") + upload.log.Info().Str("ID", upload.Info.ID).Str("step", alias).Err(err).Msg("postprocessing failed") } } if upload.node != nil && !o.LockProcessing { // unset processing status - _ = upload.node.RemoveMetadata("user.ocis.nodestatus") + if err := upload.node.UnmarkProcessing(); err != nil { + upload.log.Info().Str("path", upload.node.InternalPath()).Err(err).Msg("unmarking processing failed") + } } }, } diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index 3eb4590718d..857e08b58d0 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -196,8 +196,6 @@ func (upload *Upload) finishUpload() (err error) { return errors.New("need node to finish upload") } - _ = xattrs.Set(upload.binPath, "user.ocis.nodestatus", "processing") - spaceID := upload.Info.Storage["SpaceRoot"] targetPath := n.InternalPath() sublog := appctx.GetLogger(upload.Ctx). @@ -274,6 +272,12 @@ func (upload *Upload) finishUpload() (err error) { // versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries versionsPath = upload.lu.InternalPath(spaceID, n.ID+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano)) + // we unset the processing flag before moving + // NOTE: this leads to a minimal time the node has no processing flag + if err := n.UnmarkProcessing(); err != nil { + sublog.Error().Err(err).Msg("Decomposedfs: could not unmark processing") + return err + } // This move drops all metadata!!! We copy it below with CopyMetadata // FIXME the node must remain the same. otherwise we might restore share metadata if err = os.Rename(targetPath, versionsPath); err != nil { @@ -284,20 +288,20 @@ func (upload *Upload) finishUpload() (err error) { return err } - // NOTE: In case there is an existing version we have - // - a processing flag on the version - // - a processing flag on the binPath - // - NO processing flag on the targetPath, as we just moved that file - // so we remove the processing flag from version, - _ = xattrs.Remove(versionsPath, "user.ocis.nodestatus") - // create an empty file instead, - _, _ = os.Create(targetPath) - // and set the processing flag on this - _ = xattrs.Set(targetPath, "user.ocis.nodestatus", "processing") - // TODO: that means that there is a short amount of time when there is no targetPath + // NOTE: In case there is an existing version we have no processing flag on the targetPath, + // as we just moved that file. We need to create an empty file again + // TODO: that means that there is a short amount of time when there is no targetPath or no processing flag // If clients query in exactly that moment the file will be gone from their PROPFIND // How can we omit this issue? How critical is it? - + if _, err := os.Create(targetPath); err != nil { + sublog.Error().Err(err).Msg("Decomposedfs: could not create file") + return err + } + // and set the processing flag on this + if err := n.MarkProcessing(); err != nil { + sublog.Error().Err(err).Msg("Decomposedfs: could not mark processing") + return err + } } // upload the data to the blobstore @@ -322,6 +326,11 @@ func (upload *Upload) finishUpload() (err error) { sublog.Error().Err(err).Msg("Decomposedfs: could not rename") return err } + // the rename dropped the "processing" status - we need to set it again + if err := n.MarkProcessing(); err != nil { + sublog.Error().Err(err).Msg("Decomposedfs: could not mark processing") + return err + } if versionsPath != "" { // copy grant and arbitrary metadata // FIXME ... now restoring an older revision might bring back a grant that was removed! @@ -402,7 +411,6 @@ func (upload *Upload) checkHash(expected string, h hash.Hash) error { } // cleanup cleans up after the upload is finished -// TODO: error handling? func (upload *Upload) cleanup(err error) { if upload.node != nil { // NOTE: this should not be part of the upload. The upload doesn't know @@ -410,15 +418,21 @@ func (upload *Upload) cleanup(err error) { // However, when not removing it here the testsuite will fail as it // can't handle processing status at the moment. // TODO: adjust testsuite, remove this if case and adjust PostProcessing to not wait for "assembling" - _ = upload.node.RemoveMetadata("user.ocis.nodestatus") + _ = upload.node.UnmarkProcessing() } if upload.node != nil && err != nil && upload.oldsize == nil { - _ = utils.RemoveItem(upload.node.InternalPath()) + if err := utils.RemoveItem(upload.node.InternalPath()); err != nil { + upload.log.Info().Str("path", upload.node.InternalPath()).Err(err).Msg("removing node failed") + } } - _ = os.Remove(upload.binPath) - _ = os.Remove(upload.infoPath) + if err := os.Remove(upload.binPath); err != nil { + upload.log.Error().Str("path", upload.binPath).Err(err).Msg("removing upload failed") + } + if err := os.Remove(upload.infoPath); err != nil { + upload.log.Error().Str("path", upload.infoPath).Err(err).Msg("removing upload info failed") + } } func tryWritingChecksum(log *zerolog.Logger, n *node.Node, algo string, h hash.Hash) {