Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZocaloResults: add parameter to use results from GPU #763

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
22e7263
ZocaloResults: add parameter to use results from GPU
olliesilvester Aug 29, 2024
92af2e9
Warn if CPU results arrived before GPU results
olliesilvester Aug 29, 2024
90e8385
Update src/dodal/devices/zocalo/zocalo_results.py
olliesilvester Aug 29, 2024
4871d73
Correct typo
olliesilvester Aug 29, 2024
f181325
Update with new criteria and add tests
olliesilvester Sep 2, 2024
0632f5a
more tests for codecov
olliesilvester Sep 2, 2024
9b1b4c9
remove extra comments
olliesilvester Sep 3, 2024
99b8f49
Change toggle name
olliesilvester Sep 3, 2024
149c2bd
use deepdiff to get differences between gpu and cpu results
olliesilvester Sep 5, 2024
984a134
Review response and simplify deepdiff
olliesilvester Sep 6, 2024
560c600
Spell better
olliesilvester Sep 6, 2024
fc3bd5c
improve test
olliesilvester Sep 6, 2024
7d6c7f8
Merge branch 'main' into 559_zocalo_results_multiple_sources
olliesilvester Sep 6, 2024
bd35cab
better comment
olliesilvester Sep 6, 2024
71f6e13
Merge remote-tracking branch 'origin/main' into 559_zocalo_results_mu…
olliesilvester Sep 6, 2024
0c0b45d
fix linting
olliesilvester Sep 6, 2024
ad506a1
Move deepdiff to regular dependancy
olliesilvester Sep 6, 2024
362a89e
Always use GPU results
olliesilvester Sep 19, 2024
286b7bc
Merge remote-tracking branch 'origin/main' into 559_zocalo_results_mu…
olliesilvester Sep 19, 2024
de5cc4b
Add new test
olliesilvester Sep 19, 2024
ff848a6
Merge branch 'main' into 559_zocalo_results_multiple_sources
olliesilvester Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions src/dodal/devices/zocalo/zocalo_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,25 @@
]


# TODO this may give uneccessary warnings if dicts are different within rounding errors
def get_dict_differences(
olliesilvester marked this conversation as resolved.
Show resolved Hide resolved
dict1: dict, dict1_source: str, dict2: dict, dict2_source: str
) -> str:
differences_str = ""
for key in dict1.keys():
if not dict1[key] == dict2[key]:
differences_str += f"Results differed in {key}: {dict1_source} contains {dict1[key]} while {dict2_source} contains {dict2[key]} \n"
if differences_str:
LOGGER.warning(differences_str)
return differences_str


class ZocaloResults(StandardReadable, Triggerable):
"""An ophyd device which can wait for results from a Zocalo job. These jobs should
be triggered from a plan-subscribed callback using the run_start() and run_end()
methods on dodal.devices.zocalo.ZocaloTrigger.

See https://github.com/DiamondLightSource/dodal/wiki/How-to-Interact-with-Zocalo"""
See https://diamondlightsource.github.io/dodal/main/how-to/zocalo.html"""

def __init__(
self,
Expand All @@ -71,6 +84,7 @@
sort_key: str = DEFAULT_SORT_KEY.value,
timeout_s: float = DEFAULT_TIMEOUT,
prefix: str = "",
use_fastest_zocalo_result: bool = False,
) -> None:
self.zocalo_environment = zocalo_environment
self.sort_key = SortKeys[sort_key]
Expand All @@ -79,6 +93,7 @@
self._prefix = prefix
self._raw_results_received: Queue = Queue()
self.transport: CommonTransport | None = None
self.use_fastest_zocalo_result = use_fastest_zocalo_result

self.results, self._results_setter = soft_signal_r_and_setter(
list[XrcResult], name="results"
Expand Down Expand Up @@ -165,7 +180,40 @@
)

raw_results = self._raw_results_received.get(timeout=self.timeout_s)
LOGGER.info(f"Zocalo: found {len(raw_results['results'])} crystals.")
source_of_first_results, source_of_second_results = (
("CPU", "GPU")
if not raw_results["ispyb_ids"].get("gpu")
else ("GPU", "CPU")
)
olliesilvester marked this conversation as resolved.
Show resolved Hide resolved

# Wait for results from CPU and GPU, warn and continue if one timed out, error if both time out
if self.use_fastest_zocalo_result:
if source_of_first_results == "CPU":
LOGGER.warning("Recieved zocalo results from CPU before GPU")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
LOGGER.warning("Recieved zocalo results from CPU before GPU")
LOGGER.warning("Received zocalo results from CPU before GPU")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dperl-dls and I combined have misspelt this word 10 times across mx_bluesky and dodal!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So have I tbf

raw_results_two_sources = [raw_results]
try:
raw_results_two_sources.append(
self._raw_results_received.get(timeout=self.timeout_s / 2)
)

# Compare results from both sources and warn if they aren't the same
differences_str = get_dict_differences(
raw_results_two_sources[0]["results"][0],
source_of_first_results,
raw_results_two_sources[1]["results"][0],
source_of_second_results,
)
if differences_str:
LOGGER.warning(differences_str)

except Empty:
LOGGER.warning(
f"Zocalo results from {source_of_second_results} timed out. Using results from {source_of_first_results}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@neil-i03 To confirm here, this will mean that if we have the GPU results turned on and we do not get a result from CPU zocalo within half the usual timeout time (180/2=90s) we will use the GPU result. Do we feel confident enough in trusting this or do we still want to only ever use the CPU result for now, even if it means we will timeout?

)

LOGGER.info(
f"Zocalo results from {source_of_first_results} processing: found {len(raw_results['results'])} crystals."
)
# Sort from strongest to weakest in case of multiple crystals
await self._put_results(
sorted(
Expand Down Expand Up @@ -237,9 +285,17 @@
self.transport.ack(header) # type: ignore # we create transport here

results = message.get("results", [])
self._raw_results_received.put(
{"results": results, "ispyb_ids": recipe_parameters}
)

if self.use_fastest_zocalo_result:
self._raw_results_received.put(
{"results": results, "ispyb_ids": recipe_parameters}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could: ispyb_ids isn't a great name any more as it contains other stuff.

)
else:
# Only add to queue if results are from CPU
if not recipe_parameters.get("gpu"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: A unit test covering this would be good.

self._raw_results_received.put(

Check warning on line 296 in src/dodal/devices/zocalo/zocalo_results.py

View check run for this annotation

Codecov / codecov/patch

src/dodal/devices/zocalo/zocalo_results.py#L296

Added line #L296 was not covered by tests
{"results": results, "ispyb_ids": recipe_parameters}
)

subscription = workflows.recipe.wrap_subscribe(
self.transport,
Expand Down
210 changes: 206 additions & 4 deletions tests/devices/unit_tests/test_zocalo_results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import partial
from queue import Empty
from unittest.mock import AsyncMock, MagicMock, call, patch

import bluesky.plan_stubs as bps
Expand All @@ -7,13 +8,14 @@
from bluesky.run_engine import RunEngine
from bluesky.utils import FailedStatus
from ophyd_async.core.async_status import AsyncStatus
from workflows.recipe import RecipeWrapper

from dodal.devices.zocalo.zocalo_results import (
ZOCALO_READING_PLAN_NAME,
NoResultsFromZocalo,
NoZocaloSubscription,
XrcResult,
ZocaloResults,
get_dict_differences,
get_processing_result,
)

Expand Down Expand Up @@ -204,14 +206,20 @@ async def test_zocalo_results_trigger_log_message(
mock_wrap_subscribe, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", timeout_s=2
name="zocalo",
zocalo_environment="dev_artemis",
timeout_s=2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could: It's not great that we're waiting on this timeout for the test. Ideally we would mock it out so there's no wait on it but if not could we at least drop it down to as low as possible?

use_fastest_zocalo_result=True,
)

recipe_wrapper = MagicMock()
recipe_wrapper.recipe_step = {"parameters": {}}

def zocalo_plan():
yield from bps.stage(zocalo_results)
receive_result = mock_wrap_subscribe.mock_calls[0].args[2]
receive_result(
MagicMock(autospec=RecipeWrapper),
recipe_wrapper,
{},
{
"results": [
Expand All @@ -235,7 +243,9 @@ def zocalo_plan():
yield from bps.trigger(zocalo_results)

RE(zocalo_plan())
mock_logger.info.assert_has_calls([call("Zocalo: found 1 crystals.")])
mock_logger.info.assert_has_calls(
[call("Zocalo results from CPU processing: found 1 crystals.")]
)


@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
Expand All @@ -250,3 +260,195 @@ async def test_when_exception_caused_by_zocalo_message_then_exception_propagated
RE(bps.trigger(zocalo_results, wait=True))

assert isinstance(e.value.__cause__, NoZocaloSubscription)


@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_if_use_fastest_zocalo_results_then_wait_twice_for_results(
mock_connection, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=True
)

await zocalo_results.connect()
await zocalo_results.stage()
zocalo_results._raw_results_received.put([])
zocalo_results._raw_results_received.put([])
zocalo_results._raw_results_received.get = MagicMock()
RE(bps.trigger(zocalo_results, wait=False))
assert zocalo_results._raw_results_received.get.call_count == 2


@patch("dodal.devices.zocalo.zocalo_results.LOGGER")
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_source_of_zocalo_results_correctly_identified(
mock_connection, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=False
)
await zocalo_results.connect()
await zocalo_results.stage()

zocalo_results._raw_results_received.get = MagicMock(
return_value={"ispyb_ids": {"test": 0}, "results": []}
)
RE(bps.trigger(zocalo_results, wait=False))
mock_logger.info.assert_has_calls(
[call("Zocalo results from CPU processing: found 0 crystals.")]
)

zocalo_results._raw_results_received.get = MagicMock(
return_value={"ispyb_ids": {"gpu": True}, "results": []}
)
RE(bps.trigger(zocalo_results, wait=False))
mock_logger.info.assert_has_calls(
[call("Zocalo results from GPU processing: found 0 crystals.")]
)


# TODO figure out how to test the transport bit
# @patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
# async def test_if_not_use_fastest_zocalo_results_then_only_wait_for_cpu_results(mock_connection, RE: RunEngine):
# zocalo_results = ZocaloResults(
# name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=False
# )
# await zocalo_results.connect()
# await zocalo_results.stage()
# zocalo_results._raw_results_received.put([])
# pass


def test_compare_cpu_and_gpu_results_warns_correctly():
dict1 = {"key1": [1, 2, 3], "key2": "test", "key3": [[1, 2], [1, 2]]}
dict2 = {"key1": [1, 2, 3], "key2": "test", "key3": [[1, 2], [1, 2]]}
assert not get_dict_differences(dict1, "dict1", dict2, "dict2")
dict1 = {"key1": [2, 2, 3], "key2": "test", "key3": [[1, 2], [1, 4]]}
dict2 = {"key1": [1, 2, 3], "key2": "test", "key3": [[1, 2], [1, 3]]}
assert (
get_dict_differences(dict1, "dict1", dict2, "dict2")
== f"Results differed in key1: dict1 contains {dict1['key1']} while dict2 contains {dict2['key1']} \nResults differed in key3: dict1 contains {dict1['key3']} while dict2 contains {dict2['key3']} \n"
)


@patch("dodal.devices.zocalo.zocalo_results.LOGGER")
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_if_zocalo_results_timeout_from_one_source_then_warn(
mock_connection, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=True
)
await zocalo_results.connect()
await zocalo_results.stage()
zocalo_results._raw_results_received.get = MagicMock(
side_effect=[{"ispyb_ids": {"test": 0}, "results": []}, Empty]
)
RE(bps.trigger(zocalo_results, wait=False))
mock_logger.warning.assert_called_with(
"Zocalo results from GPU timed out. Using results from CPU"
)


@patch("dodal.devices.zocalo.zocalo_results.LOGGER")
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_if_cpu_results_arrive_before_gpu_then_warn(
mock_connection, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=True
)
await zocalo_results.connect()
await zocalo_results.stage()
zocalo_results._raw_results_received.get = MagicMock(
return_value={"ispyb_ids": {"test": 0}, "results": []}
)
RE(bps.trigger(zocalo_results, wait=False))
mock_logger.warning.assert_called_with(
"Recieved zocalo results from CPU before GPU"
)


@patch("dodal.devices.zocalo.zocalo_results.LOGGER")
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_warning_if_results_are_different(
mock_connection, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=True
)
await zocalo_results.connect()
await zocalo_results.stage()
zocalo_results._raw_results_received.get = MagicMock(
side_effect=[
{"ispyb_ids": {}, "results": [{"test": 0}]},
{"ispyb_ids": {}, "results": [{"test": 1}]},
]
)
RE(bps.trigger(zocalo_results, wait=False))
mock_logger.warning.assert_called_with(
"Results differed in test: CPU contains 0 while GPU contains 1 \n"
)


@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", autospec=True)
async def test_if_zocalo_results_timeout_before_any_results_then_error(
mock_connection, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo", zocalo_environment="dev_artemis", use_fastest_zocalo_result=True
)
await zocalo_results.connect()
await zocalo_results.stage()
zocalo_results._raw_results_received.get = MagicMock(side_effect=Empty)
with pytest.raises(NoResultsFromZocalo):
await zocalo_results.trigger()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a strange thing here where this test would fail if I used RE(bps.trigger(zocalo_results) instead, even though debugging the pytest showed the exception was being raised

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RE might swallow the exception or transform it?



@patch("dodal.devices.zocalo.zocalo_results.LOGGER")
@patch(
"dodal.devices.zocalo.zocalo_results.workflows.recipe.wrap_subscribe", autospec=True
)
@patch("dodal.devices.zocalo.zocalo_results._get_zocalo_connection", new=MagicMock())
async def test_gpu_results_ignored_if_toggle_disabled(
mock_wrap_subscribe, mock_logger, RE: RunEngine
):
zocalo_results = ZocaloResults(
name="zocalo",
zocalo_environment="dev_artemis",
timeout_s=2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, not ideal to be actually waiting on this timeout in tests

use_fastest_zocalo_result=False,
)

recipe_wrapper = MagicMock()
recipe_wrapper.recipe_step = {"parameters": {"gpu": True}}

def zocalo_plan():
yield from bps.stage(zocalo_results)
receive_result = mock_wrap_subscribe.mock_calls[0].args[2]
receive_result(
recipe_wrapper,
{},
{
"results": [
{
"centre_of_mass": [
2.207133058984911,
1.4175240054869684,
13.317215363511659,
],
"max_voxel": [2, 1, 13],
"max_count": 702.0,
"n_voxels": 12,
"total_count": 5832.0,
"bounding_box": [[1, 0, 12], [4, 3, 15]],
}
],
"status": "success",
"type": "3d",
},
)
yield from bps.trigger(zocalo_results)
mock_logger.warning.assert_called_with("Timed out waiting for zocalo results!")

RE(zocalo_plan())
Loading