Skip to content

Commit

Permalink
Run processor inside worker in separate process
Browse files Browse the repository at this point in the history
  • Loading branch information
medihack committed Nov 16, 2023
1 parent 702e335 commit 8f5a56d
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 152 deletions.
12 changes: 8 additions & 4 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

- Before new release
-- test job_utils
-- exclude autoreload when tests are saved (Custom Filter in server command watched files)
-- Test canceled task/job in test_workers.py

- Replace 7z with .zip format. Really save in patient folder?
- Figure out if favicon works in all browsers
- Rename C_STORE to C-STORE and so on in dimse connector
- Replace AssertionError with assert
Expand All @@ -16,14 +16,11 @@
-- Check aborted attribute every 10 seconds or so and the may be kill the process
-- Set task to FAILURE with message "Task manually / forcefully aborted"
- Make single task cancelable, retriable, resumeable, ...
- Upgrade REDIS server on RADIS
- Unfix pyright and its VS code extension
- Replace sherlock on RADIS
- Use DicomLogEntry during C-STORE
- Allow to restart or cancel specific dicom task
- Fix dicom explorer search over Accession Number
- Make warning when only one image fails
- Upgrade psycopg on RADIS
- Use django-stubs instead of django-types (also on RADIS)
- Exclude SR and PR when in pseudonymization mode
- Cancel processing tasks actively
Expand Down Expand Up @@ -112,6 +109,7 @@

## Maybe

- exclude test folders from autorelad in ServerCommand (maybe a custom filter is needed)
- Switch from Daphne to Uvicorn (maybe it has faster restart times during development)
- Switch from Celery to Huey
- Upgrade postgres server to v15, but we have to migrate the data then as the database files are incompatible a newer version
Expand Down Expand Up @@ -189,3 +187,9 @@
- Get rid of jQuery in ADIT and RADIS
- Get rid of Jumbotron
- Get rid of those not used accounts views and login form
- Move over to SVG sprites
- Get rid of RabbitMQ
- Port over ServerCommand
- Upgrade REDIS server on RADIS
- Remove sherlock on RADIS as no need for distributed lock there and we use Redis here for that
- Upgrade psycopg on RADIS
51 changes: 8 additions & 43 deletions adit/core/tests/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,20 @@ def dicom_worker(mocker: MockerFixture):

@pytest.mark.django_db
def test_worker_with_task_that_succeeds(mocker: MockerFixture, dicom_worker: DicomWorker):
# Arrange
dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)
dicom_task = ExampleTransferTaskFactory.create(
status=DicomTask.Status.PENDING,
job=dicom_job,
)
queued_task = QueuedTask.objects.create(content_object=dicom_task, priority=5)

queued_task_was_locked = False

def process_dicom_task(self, dicom_task):
nonlocal queued_task_was_locked
queued_task.refresh_from_db()
queued_task_was_locked = queued_task.locked
assert QueuedTask.objects.get(pk=queued_task.pk).locked
return (DicomTask.Status.SUCCESS, "Success!", [])

mocker.patch.object(ExampleProcessor, "process_dicom_task", process_dicom_task)

# Act
dicom_worker.process_next_task()

# Assert
assert queued_task_was_locked
dicom_worker.check_and_process_next_task()

dicom_job.refresh_from_db()
assert dicom_job.status == DicomJob.Status.SUCCESS
Expand All @@ -69,29 +60,20 @@ def process_dicom_task(self, dicom_task):

@pytest.mark.django_db
def test_worker_with_task_that_fails(mocker: MockerFixture, dicom_worker: DicomWorker):
# Arrange
dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)
dicom_task = ExampleTransferTaskFactory.create(
status=DicomTask.Status.PENDING,
job=dicom_job,
)
queued_task = QueuedTask.objects.create(content_object=dicom_task, priority=5)

queued_task_was_locked = False

def process_dicom_task(self, dicom_task):
nonlocal queued_task_was_locked
queued_task.refresh_from_db()
queued_task_was_locked = queued_task.locked
assert QueuedTask.objects.get(pk=queued_task.pk).locked
return (DicomTask.Status.FAILURE, "Failure!", [])

mocker.patch.object(ExampleProcessor, "process_dicom_task", process_dicom_task)

# Act
dicom_worker.process_next_task()

# Assert
assert queued_task_was_locked
dicom_worker.check_and_process_next_task()

dicom_job.refresh_from_db()
assert dicom_job.status == DicomJob.Status.FAILURE
Expand All @@ -109,29 +91,20 @@ def process_dicom_task(self, dicom_task):
def test_worker_with_task_that_raises_non_retriable_error(
mocker: MockerFixture, dicom_worker: DicomWorker
):
# Arrange
dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)
dicom_task = ExampleTransferTaskFactory.create(
status=DicomTask.Status.PENDING,
job=dicom_job,
)
queued_task = QueuedTask.objects.create(content_object=dicom_task, priority=5)

queued_task_was_locked = False

def process_dicom_task(self, dicom_task):
nonlocal queued_task_was_locked
queued_task.refresh_from_db()
queued_task_was_locked = queued_task.locked
assert QueuedTask.objects.get(pk=queued_task.pk).locked
raise Exception("Unexpected error!")

mocker.patch.object(ExampleProcessor, "process_dicom_task", process_dicom_task)

# Act
dicom_worker.process_next_task()

# Assert
assert queued_task_was_locked
dicom_worker.check_and_process_next_task()

dicom_job.refresh_from_db()
assert dicom_job.status == DicomJob.Status.FAILURE
Expand All @@ -149,30 +122,22 @@ def process_dicom_task(self, dicom_task):
def test_worker_with_task_that_raises_retriable_error(
mocker: MockerFixture, dicom_worker: DicomWorker
):
# Arrange
dicom_job = ExampleTransferJobFactory.create(status=DicomJob.Status.PENDING)
dicom_task = ExampleTransferTaskFactory.create(
status=DicomTask.Status.PENDING,
job=dicom_job,
)
queued_task = QueuedTask.objects.create(content_object=dicom_task, priority=5)

queued_task_was_locked = False

def process_dicom_task(self, dicom_task):
nonlocal queued_task_was_locked
queued_task.refresh_from_db()
queued_task_was_locked = queued_task.locked
assert QueuedTask.objects.get(pk=queued_task.pk).locked
raise RetriableDicomError("Retriable error!")

mocker.patch.object(ExampleProcessor, "process_dicom_task", process_dicom_task)

# Act
dicom_worker.process_next_task()
dicom_worker.check_and_process_next_task()

# Assert
queued_task.refresh_from_db()
assert queued_task_was_locked
assert not queued_task.locked

dicom_job.refresh_from_db()
Expand Down
2 changes: 1 addition & 1 deletion adit/core/utils/dicom_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def _download_series_with_c_move(
# The requested images are sent to the receiver container (a C-STORE SCP server)
# by the C-MOVE operation. Then those are send via the transmitter (over TCP socket)
# which we consume in a separate thread.
with ThreadPoolExecutor() as executor:
with ThreadPoolExecutor(max_workers=1) as executor:
consume_future = executor.submit(
self._consume_from_receiver,
query.StudyInstanceUID,
Expand Down
Loading

0 comments on commit 8f5a56d

Please sign in to comment.