Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delegated operator fixes #4908

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions fiftyone/factory/repos/delegated_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ def update_run_state(
else None
)

needs_pipeline_update = False

if run_state == ExecutionRunState.COMPLETED:
update = {
"$set": {
Expand All @@ -272,10 +274,11 @@ def update_run_state(
}
}

if outputs_schema:
update["$set"]["metadata.outputs_schema"] = {
"$ifNull": [outputs_schema, {}]
}
if outputs_schema:
update["$set"]["metadata.outputs_schema"] = (
outputs_schema or {}
)
needs_pipeline_update = True

elif run_state == ExecutionRunState.FAILED:
update = {
Expand Down Expand Up @@ -325,9 +328,15 @@ def update_run_state(
if required_state is not None:
collection_filter["run_state"] = required_state

# Using pipeline update instead of a single update doc fixes a case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the update can either be a pipeline or a single update, why do we need to only make it a pipeline under certain conditions and not just always like the current code? Is there a different bug that can happen by consistently using pipeline for all cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nonstandard. I'd prefer to use the standard approach and deviate only when necessary.

# where `metadata` is null and so accessing the dotted field
# `metadata.output_schema` creates the document instead of erroring.
if needs_pipeline_update:
update = [update]

doc = self._collection.find_one_and_update(
filter=collection_filter,
update=[update],
update=update,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should be an array (aggregation pipeline). Otherwise passing a dictionary will be interpreted as the new doc so you'd essentially be setting the doc to have a new field "$set" with value equal to the dictionary value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be thinking of find_one_and_replace(). This change works and tests pass.
It's more standard to pass a dict with the updates. It's possible, though uncommon, to use an agg pipeline, but I didn't see why that was necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively if you don't specify $set and pass a replacement document instead, then it would behave as you say.

Copy link
Contributor

@kaixi-wang kaixi-wang Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.mongodb.com/docs/manual/reference/method/db.collection.findOneAndUpdate/

I'm just going off of the mongo docs...

Since the change didn't require any corresponding test changes, are there tests that would catch this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah the alternative is to remove set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to go off of the pymongo docs here since that's what we use. https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find_one_and_update

I noticed that if you call find_one_and_update it actually uses the deprecated MongoDB function findAndModify.
https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/synchronous/collection.py#L3561

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes the test test_updates_progress tests for this. After talking with Ibrahim I am also making an update and adding a test for a specific corner case that's broken.

return_document=pymongo.ReturnDocument.AFTER,
)

Expand Down
30 changes: 30 additions & 0 deletions tests/unittests/operators/delegated_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from unittest import mock
from unittest.mock import patch

import bson
import pytest

import fiftyone
Expand Down Expand Up @@ -484,6 +485,35 @@ def test_sets_progress(
self.assertEqual(doc.status.label, "halfway there")
self.assertIsNotNone(doc.status.updated_at)

def test_output_schema_null_metadata(
self, mock_get_operator, mock_operator_exists
):
mock_outputs = MockOutputs()
doc = self.svc.queue_operation(
operator="@voxelfiftyone/operator/foo",
delegation_target="test_target",
context=ExecutionContext(request_params={"foo": "bar"}),
)

# Set metadata to null instead of being unset, to test that corner case
self.svc._repo._collection.find_one_and_update(
{"_id": bson.ObjectId(doc.id)}, {"$set": {"metadata": None}}
)

self.svc.set_completed(
doc.id,
result=ExecutionResult(outputs_schema=mock_outputs.to_json()),
)

doc = self.svc.get(doc_id=doc.id)
self.assertEqual(doc.run_state, ExecutionRunState.COMPLETED)
self.assertEqual(
doc.metadata,
{
"outputs_schema": mock_outputs.to_json(),
},
)

@patch(
"fiftyone.core.odm.utils.load_dataset",
)
Expand Down
Loading