From bd3994f661f2ecbddb7144f81efa8e1b067b3ee4 Mon Sep 17 00:00:00 2001 From: wetzelj Date: Wed, 7 Dec 2022 10:03:15 -0500 Subject: [PATCH] Bug Fixes and Unit Tests for BLANK & REPLACE Actions & Sequences (#242) * Expand BLANK Action to other VRs 1. Expanded BLANK Action to other VRs. 2. Added unit tests for validating action interaction. 3. Added table of action interactions to help docs. * Update recipe-headers.md * Update test_action_interaction.py Correcting bad test method names. * Update deid-data install version * Update parser.py Reverted expand_field_expression to always return desired as a Tag. Instead convert to DataElement in blank_field. * Expanding blank to all VRs Updating BLANK test coverage to all VRs. * Updating field counts in unit tests. Updating unit tests failing due to field count changes in test image. * Update test_blank_action.py Restructured blank_action unit tests. * Fix for #244 - Replace fails for numeric VR Removing deid-data pin. * Update actions.py Committing review suggestions. * Header Sequence Updates Corrected issue #243 which prevented tags within sequences from being acted upon. Added unit tests for sequence processing * Update test_replace_identifiers.py Updated test_replace_identifiers to target ctbrain1.dcm by name. * Update test_dicom_funcs.py Updated test_user_provided_func to target ctbrain1.dcm --- .github/workflows/main.yaml | 2 +- CHANGELOG.md | 4 +- deid/dicom/parser.py | 19 +- deid/tests/test_action_interaction.py | 1519 ++++++++++++++++++++++++ deid/tests/test_blank_action.py | 167 +++ deid/tests/test_dicom_funcs.py | 5 +- deid/tests/test_replace_action.py | 190 +++ deid/tests/test_replace_identifiers.py | 122 +- deid/tests/test_sequence_blank.py | 221 ++++ deid/tests/test_sequence_jitter.py | 186 +++ deid/tests/test_sequence_remove.py | 227 ++++ deid/tests/test_sequence_replace.py | 233 ++++ deid/tests/test_utils_files.py | 4 +- deid/utils/actions.py | 63 +- deid/version.py | 2 +- docs/_docs/user-docs/recipe-headers.md | 74 ++ 16 files changed, 2951 insertions(+), 87 deletions(-) create mode 100644 deid/tests/test_action_interaction.py create mode 100644 deid/tests/test_blank_action.py create mode 100644 deid/tests/test_replace_action.py create mode 100644 deid/tests/test_sequence_blank.py create mode 100644 deid/tests/test_sequence_jitter.py create mode 100644 deid/tests/test_sequence_remove.py create mode 100644 deid/tests/test_sequence_replace.py diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 3f394af..131f107 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -40,5 +40,5 @@ jobs: run: | export PATH="/usr/share/miniconda/bin:$PATH" source activate testing - pip install deid-data==0.0.17 + pip install deid-data python -m unittest discover -s deid/tests/ -p '[t|T]est*.py' diff --git a/CHANGELOG.md b/CHANGELOG.md index db7af98..3c10acb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,9 @@ and **Merged pull requests**. Critical items to know are: Referenced versions in headers are tagged on Github, in parentheses are for pypi. ## [vxx](https://github.com/pydicom/deid/tree/master) (master) - +- Expand BLANK Action to additional VRs [#241](https://github.com/pydicom/deid/issues/241) (0.3.2) + - Correct issues with REPLACE action on numeric VRs [#244](https://github.com/pydicom/deid/issues/244) + - Correct issue with actions on fields within sequences [#243](https://github.com/pydicom/deid/issues/243) - pre-commit for linting and formatting (0.3.1) - Add `ctpcoordinates` and `ctpkeepcoordinates` to handle different formats (0.3.0) - Minimum Python required is 3.7, numpy 1.20 diff --git a/deid/dicom/parser.py b/deid/dicom/parser.py index 2eebceb..36de8b2 100644 --- a/deid/dicom/parser.py +++ b/deid/dicom/parser.py @@ -160,8 +160,9 @@ def get_nested_field(self, field, return_parent=False): # Otherwise it's an index into a sequence else: - # If the parent has been removed, we can't continue - if not int(uid) in parent: + # If the sequence is outside the bounds of the array of items + # within the sequence, we can't continue. + if int(uid) < 0 or int(uid) >= len(parent.value): return None, desired parent = parent[int(uid)] @@ -186,16 +187,10 @@ def blank_field(self, field): """ Blank a field """ - element = self.get_nested_field(field) - - # Assert we have a data element, and can blank a string - if element: - if not isinstance(element, DataElement): - bot.warning("Issue parsing %s as a DataElement, not blanked." % field) - elif element.VR in ["US", "SS"]: - element.value = "" - else: - bot.warning("Unrecognized VR for %s, skipping blank." % field) + # Returns the parent, and a DataElement (indexes into parent by tag) + parent, desired = self.get_nested_field(field, return_parent=True) + if parent and desired in parent: + parent[desired].value = None def replace_field(self, field, value): """ diff --git a/deid/tests/test_action_interaction.py b/deid/tests/test_action_interaction.py new file mode 100644 index 0000000..d9c844f --- /dev/null +++ b/deid/tests/test_action_interaction.py @@ -0,0 +1,1519 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import replace_identifiers +from deid.tests.common import create_recipe, get_file +from deid.utils import get_installdir + +global generate_uid + + +class TestRuleInteractions(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def test_add_add_should_have_second_value(self): + """RECIPE RULE + ADD PatientIdentityRemoved No + ADD PatientIdentityRemoved Yes + """ + + print("Test ADD/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "PatientIdentityRemoved" + + action1 = "ADD" + value1 = "No" + + action2 = "ADD" + value2 = "Yes" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + with self.assertRaises(KeyError): + inputfile[field].value + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(value2, outputfile[field].value) + + def test_add_blank_should_be_blank(self): + """RECIPE RULE + ADD PregnancyStatus 1 + BLANK PregnancyStatus + """ + + print("Test ADD/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "PregnancyStatus" + + action1 = "ADD" + value1 = "1" + + action2 = "BLANK" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(1, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(None, outputfile[field].value) + + def test_add_jitter_should_combine(self): + """RECIPE RULE + ADD StudyDate 20221128 + JITTER StudyDate 5 + """ + + print("Test ADD/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "ADD" + value1 = "20221128" + + action2 = "JITTER" + value2 = "5" + + valueexpected = "20221203" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_add_keep_should_have_add_value(self): + """RECIPE RULE + ADD StudyDate 20221128 + KEEP StudyDate + """ + + print("Test ADD/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "ADD" + value1 = "20221128" + + action2 = "KEEP" + + valueexpected = "20221128" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_add_replace_should_have_replace_value(self): + """RECIPE RULE + ADD StudyDate 20221128 + REPLACE StudyDate 20221129 + """ + + print("Test ADD/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "ADD" + value1 = "20221128" + + action2 = "REPLACE" + value2 = "20221129" + + valueexpected = "20221129" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_add_remove_should_be_removed(self): + """RECIPE RULE + ADD StudyDate 20221128 + REMOVE StudyDate + """ + + print("Test ADD/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "ADD" + value1 = "20221128" + + action2 = "REMOVE" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + def test_blank_add_should_have_add_value(self): + """RECIPE RULE + BLANK Manufacturer + ADD Manufacturer Testing + """ + + print("Test BLANK/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + + action2 = "ADD" + value2 = "Testing" + + valueexpected = "Testing" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_blank_blank_should_be_blank(self): + """This is a bit of a nonsensical test, but is included for completeness. + RECIPE RULE + BLANK Manufacturer + BLANK Manufacturer + """ + + print("Test BLANK/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + action2 = "BLANK" + + valueexpected = "" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_blank_jitter_should_be_blank(self): + """RECIPE RULE + BLANK StudyDate + JITTER StudyDate 1 + """ + + print("Test BLANK/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + + action2 = "JITTER" + value2 = "1" + + valueexpected = "" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_blank_keep_should_be_original_value(self): + """RECIPE RULE + BLANK Manufacturer + KEEP Manufacturer + """ + + print("Test BLANK/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + action2 = "KEEP" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_blank_replace_should_be_replace_value(self): + """RECIPE RULE + BLANK Manufacturer + REPLACE Manufacturer Testing + """ + + print("Test BLANK/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + + action2 = "REPLACE" + value2 = "Testing" + + valueexpected = "Testing" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_blank_remove_should_be_removed(self): + """RECIPE RULE + BLANK StudyDate + REMOVE StudyDate + """ + + print("Test BLANK/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "BLANK" + action2 = "REMOVE" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + def test_jitter_add_should_have_add_value(self): + """RECIPE RULE + JITTER StudyDate 1 + ADD StudyDate 20221129 + """ + + print("Test JITTER/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "ADD" + value2 = "20221129" + + valueexpected = value2 + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(valueexpected, currentValue) + self.assertNotEqual("20221130", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_jitter_blank_should_be_blank(self): + """RECIPE RULE + JITTER StudyDate 1 + BLANK StudyDate + """ + + print("Test JITTER/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "BLANK" + + valueexpected = "" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_jitter_jitter_should_combine(self): + """RECIPE RULE + JITTER StudyDate 1 + JITTER StudyDate 2 + """ + + print("Test JITTER/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "JITTER" + value2 = "2" + + valueexpected = "20230104" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(valueexpected, currentValue) + self.assertEqual("20230101", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_jitter_keep_should_be_original_value(self): + """RECIPE RULE + JITTER StudyDate 1 + KEEP StudyDate + """ + + print("Test JITTER/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "KEEP" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_jitter_replace_should_have_replace_value(self): + """RECIPE RULE + JITTER StudyDate 1 + REPLACE StudyDate 20221129 + """ + + print("Test JITTER/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "REPLACE" + value2 = "20221129" + + valueexpected = value2 + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(valueexpected, currentValue) + self.assertEqual("20230101", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_jitter_remove_should_ignore_remove(self): + """RECIPE RULE + JITTER StudyDate 1 + REMOVE StudyDate + """ + + print("Test JITTER/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "JITTER" + value1 = "1" + + action2 = "REMOVE" + valueexpected = "20230102" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertEqual("20230101", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_add_should_be_add_value(self): + """RECIPE RULE + KEEP Manufacturer + ADD Manufacturer Testing + """ + + print("Test KEEP/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "KEEP" + + action2 = "ADD" + value2 = "Testing" + + valueexpected = value2 + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_blank_should_be_original_value(self): + """RECIPE RULE + KEEP Manufacturer + BLANK Manufacturer + """ + + print("Test KEEP/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "KEEP" + action2 = "BLANK" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_jitter_should_be_original_value(self): + """RECIPE RULE + KEEP StudyDate + JITTER StudyDate 1 + """ + + print("Test KEEP/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "KEEP" + + action2 = "JITTER" + value2 = "1" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("20230102", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_keep_should_be_original_value(self): + """This is a bit of a nonsensical test, but is included for completeness. + RECIPE RULE + KEEP Manufacturer + KEEP Manufacturer + """ + + print("Test KEEP/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "KEEP" + action2 = "KEEP" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_replace_should_be_original_value(self): + """RECIPE RULE + KEEP Manufacturer + REPLACE Manufacturer Testing + """ + + print("Test KEEP/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "KEEP" + + action2 = "REPLACE" + value2 = "Testing" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_keep_remove_should_be_original_value(self): + """RECIPE RULE + KEEP StudyDate + REMOVE StudyDate + """ + + print("Test KEEP/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "KEEP" + action2 = "REMOVE" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_replace_add_should_have_add_value(self): + """RECIPE RULE + REPLACE Manufacturer TestingReplace + ADD Manufacturer TestingAdd + """ + + print("Test REPLACE/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REPLACE" + value1 = "TestingReplace" + + action2 = "ADD" + value2 = "TestingAdd" + + valueexpected = value2 + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_replace_blank_should_be_blank(self): + """RECIPE RULE + REPLACE Manufacturer TestingReplace + BLANK Manufacturer + """ + + print("Test REPLACE/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REPLACE" + value1 = "TestingReplace" + + action2 = "BLANK" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(None, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("", outputfile[field].value) + + def test_replace_jitter_should_combine(self): + """RECIPE RULE + REPLACE StudyDate 20221128 + JITTER StudyDate 5 + """ + + print("Test REPLACE/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "REPLACE" + value1 = "20221128" + + action2 = "JITTER" + value2 = "5" + + valueexpected = "20221203" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_replace_keep_should_have_original_value(self): + """RECIPE RULE + REPLACE StudyDate 20221128 + KEEP StudyDate + """ + + print("Test REPLACE/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "REPLACE" + value1 = "20221128" + + action2 = "KEEP" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(value1, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_replace_replace_should_have_second_replace_value(self): + """RECIPE RULE + REPLACE StudyDate 20221128 + REPLACE StudyDate 20221129 + """ + + print("Test REPLACE/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "REPLACE" + value1 = "20221128" + + action2 = "REPLACE" + value2 = "20221129" + + valueexpected = "20221129" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_replace_remove_should_be_replace_value(self): + """RECIPE RULE + REPLACE StudyDate 20221128 + REMOVE StudyDate + """ + + print("Test REPLACE/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "REPLACE" + value1 = "20221128" + + action2 = "REMOVE" + + actions = [ + {"action": action1, "field": field, "value": value1}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(value1, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(value1, outputfile[field].value) + + def test_remove_add_should_be_add_value(self): + """RECIPE RULE + REMOVE Manufacturer + ADD Manufacturer Testing + """ + + print("Test REMOVE/ADD Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REMOVE" + + action2 = "ADD" + value2 = "Testing" + + valueexpected = value2 + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertNotEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_remove_blank_should_be_removed(self): + """RECIPE RULE + REMOVE Manufacturer + BLANK Manufacturer + """ + + print("Test REMOVE/BLANK Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REMOVE" + action2 = "BLANK" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + def test_remove_jitter_should_jittered_date(self): + """RECIPE RULE + REMOVE StudyDate + JITTER StudyDate 1 + """ + + print("Test REMOVE/JITTER Interaction") + dicom_file = get_file(self.dataset) + + field = "StudyDate" + + action1 = "REMOVE" + + action2 = "JITTER" + value2 = "1" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("20230102", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("20230102", outputfile[field].value) + + def test_remove_keep_should_be_original_value(self): + """RECIPE RULE + REMOVE Manufacturer + KEEP Manufacturer + """ + + print("Test REMOVE/KEEP Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REMOVE" + action2 = "KEEP" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + valueexpected = currentValue + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertEqual(valueexpected, currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_remove_replace_should_be_replace_value(self): + """RECIPE RULE + REMOVE Manufacturer + REPLACE Manufacturer Testing + """ + + print("Test REMOVE/REPLACE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REMOVE" + + action2 = "REPLACE" + value2 = "Testing" + valueexpected = value2 + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field, "value": value2}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(valueexpected, outputfile[field].value) + + def test_remove_remove_should_remove(self): + """This is a bit of a nonsensical test, but is included for completeness. + RECIPE RULE + REMOVE StudyDate + REMOVE StudyDate + """ + + print("Test REMOVE/REMOVE Interaction") + dicom_file = get_file(self.dataset) + + field = "Manufacturer" + + action1 = "REMOVE" + action2 = "REMOVE" + + actions = [ + {"action": action1, "field": field}, + {"action": action2, "field": field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_blank_action.py b/deid/tests/test_blank_action.py new file mode 100644 index 0000000..c3a6754 --- /dev/null +++ b/deid/tests/test_blank_action.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import replace_identifiers +from deid.tests.common import create_recipe, get_file +from deid.utils import get_installdir + +global generate_uid + + +class TestBlankAction(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def run_blank_test(self, VR, Field, Expected): + print(f"Test BLANK {VR}") + dicom_file = get_file(self.dataset) + + actions = [ + {"action": "BLANK", "field": Field}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[Field].value + currentVR = inputfile[Field].VR + + self.assertNotEqual(None, currentValue) + self.assertNotEqual("", currentValue) + self.assertEqual(VR, currentVR) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(Expected, outputfile[Field].value) + + def test_blank_AE(self): + self.run_blank_test("AE", "NetworkID", "") + + def test_blank_AS(self): + self.run_blank_test("AS", "PatientAge", "") + + def test_blank_AT(self): + self.run_blank_test("AT", "00110004", None) + + def test_blank_CS(self): + self.run_blank_test("CS", "BodyPartExamined", "") + + def test_blank_DA(self): + self.run_blank_test("DA", "StudyDate", "") + + def test_blank_DS(self): + self.run_blank_test("DS", "PatientWeight", None) + + def test_blank_DT(self): + self.run_blank_test("DT", "AcquisitionDateTime", "") + + def test_blank_FD(self): + self.run_blank_test("FD", "SingleCollimationWidth", None) + + def test_blank_FL(self): + self.run_blank_test("FL", "CalciumScoringMassFactorDevice", None) + + def test_blank_IS(self): + self.run_blank_test("IS", "Exposure", None) + + def test_blank_LO(self): + self.run_blank_test("LO", "PatientID", "") + + def test_blank_LT(self): + self.run_blank_test("LT", "AdditionalPatientHistory", "") + + def test_blank_OB(self): + self.run_blank_test("OB", "00110011", None) + + def test_blank_OD(self): + self.run_blank_test("OD", "00110012", None) + + def test_blank_OF(self): + self.run_blank_test("OF", "00110013", None) + + def test_blank_OL(self): + self.run_blank_test("OL", "00110014", None) + + def test_blank_OV(self): + self.run_blank_test("OV", "00110016", None) + + def test_blank_OW(self): + self.run_blank_test("OW", "00110015", None) + + def test_blank_PN(self): + self.run_blank_test("PN", "ReferringPhysicianName", "") + + def test_blank_SH(self): + self.run_blank_test("SH", "AccessionNumber", "") + + def test_blank_SL(self): + self.run_blank_test("SL", "00110001", None) + + def test_blank_SQ(self): + self.run_blank_test("SQ", "ProcedureCodeSequence", []) + + def test_blank_SS(self): + self.run_blank_test("SS", "00110002", None) + + def test_blank_ST(self): + self.run_blank_test("ST", "InstitutionAddress", "") + + def test_blank_SV(self): + self.run_blank_test("SV", "00110007", None) + + def test_blank_TM(self): + self.run_blank_test("TM", "StudyTime", "") + + def test_blank_UC(self): + self.run_blank_test("UC", "00110009", "") + + def test_blank_UI(self): + self.run_blank_test("UI", "FrameOfReferenceUID", "") + + def test_blank_UL(self): + self.run_blank_test("UL", "00311101", None) + + def test_blank_UN(self): + self.run_blank_test("UN", "00110003", None) + + def test_blank_UR(self): + self.run_blank_test("UR", "00110008", "") + + def test_blank_US(self): + self.run_blank_test("US", "PregnancyStatus", None) + + def test_blank_UT(self): + self.run_blank_test("UT", "00291022", "") + + def test_blank_UV(self): + self.run_blank_test("UV", "00110010", None) + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_dicom_funcs.py b/deid/tests/test_dicom_funcs.py index 4145e7a..397cd5c 100644 --- a/deid/tests/test_dicom_funcs.py +++ b/deid/tests/test_dicom_funcs.py @@ -11,6 +11,7 @@ import unittest from deid.data import get_dataset +from deid.dicom import get_files from deid.dicom.parser import DicomParser from deid.tests.common import create_recipe, get_file, get_same_file from deid.utils import get_installdir @@ -35,7 +36,7 @@ def test_user_provided_func(self): REMOVE ALL func:myfunction """ print("Test user provided func") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) def myfunction(dicom, value, field, item): from pydicom.tag import Tag @@ -56,7 +57,7 @@ def myfunction(dicom, value, field, item): parser.define("myfunction", myfunction) parser.parse() - self.assertEqual(160, len(parser.dicom)) + self.assertEqual(174, len(parser.dicom)) with self.assertRaises(KeyError): parser.dicom["ReferringPhysicianName"].value with self.assertRaises(KeyError): diff --git a/deid/tests/test_replace_action.py b/deid/tests/test_replace_action.py new file mode 100644 index 0000000..eb4c671 --- /dev/null +++ b/deid/tests/test_replace_action.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import replace_identifiers +from deid.tests.common import create_recipe, get_file +from deid.utils import get_installdir + +global generate_uid + + +class TestReplaceAction(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def run_replace_test(self, VR, Field, newValue, expected=None): + print(f"Test REPLACE {VR}") + dicom_file = get_file(self.dataset) + + if expected is None: + expected = newValue + + actions = [ + {"action": "REPLACE", "field": Field, "value": newValue}, + ] + recipe = create_recipe(actions) + + inputfile = read_file(dicom_file) + currentValue = inputfile[Field].value + currentVR = inputfile[Field].VR + + self.assertNotEqual(newValue, currentValue) + self.assertEqual(VR, currentVR) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual(expected, outputfile[Field].value) + + def test_replace_AE(self): + self.run_replace_test("AE", "NetworkID", "TEST_AE") + + def test_replace_AS(self): + self.run_replace_test("AS", "PatientAge", "TEST_AS") + + def test_replace_AT(self): + self.run_replace_test("AT", "00110004", "00110077") + + def test_replace_CS(self): + self.run_replace_test("CS", "BodyPartExamined", "TEST_CS") + + def test_replace_DA(self): + self.run_replace_test("DA", "StudyDate", "19000101") + + def test_replace_DS(self): + self.run_replace_test("DS", "PatientWeight", "501") + + def test_replace_DT(self): + self.run_replace_test("DT", "AcquisitionDateTime", "19000101012421.621000") + + def test_replace_FD(self): + self.run_replace_test("FD", "SingleCollimationWidth", "1.3", 1.3) + + def test_replace_FL(self): + self.run_replace_test( + "FL", + "CalciumScoringMassFactorDevice", + "0.7799999713897705", + float("0.7799999713897705"), + ) + + def test_replace_IS(self): + self.run_replace_test("IS", "Exposure", "400") + + def test_replace_LO(self): + self.run_replace_test("LO", "PatientID", "TEST_LO") + + def test_replace_LT(self): + self.run_replace_test("LT", "AdditionalPatientHistory", "TEST_LT") + + def test_replace_OB_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OB", "00110011", ??????) + self.assertTrue(True) + + def test_replace_OD_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OD", "00110012", ??????) + self.assertTrue(True) + + def test_replace_OF_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OF", "00110013", ??????) + self.assertTrue(True) + + def test_replace_OL_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OL", "00110014", ??????) + self.assertTrue(True) + + def test_replace_OV_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OV", "00110015", ??????) + self.assertTrue(True) + + def test_replace_OW_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("OW", "00110016", ??????) + self.assertTrue(True) + + def test_replace_PN(self): + self.run_replace_test("PN", "ReferringPhysicianName", "TEST_PN") + + def test_replace_SH(self): + self.run_replace_test("SH", "AccessionNumber", "TEST_SH") + + def test_replace_SL(self): + self.run_replace_test("SL", "00110001", "112345", 112345) + + def test_replace_SQ_fake_test(self): + # Should this be implemented or should this be excluded from REPLACE? + # self.run_replace_test("SQ", "ProcedureCodeSequence, ??????) + self.assertTrue(True) + + def test_replace_SS(self): + self.run_replace_test("SS", "00110002", "1123", 1123) + + def test_replace_ST(self): + self.run_replace_test("ST", "InstitutionAddress", "TEST_ST") + + def test_replace_SV(self): + self.run_replace_test("SV", "00110007", "-12345677", -12345677) + + def test_replace_TM(self): + self.run_replace_test("TM", "StudyTime", "010101.621000") + + def test_replace_UC(self): + self.run_replace_test("UC", "00110009", "TEST_UC") + + def test_replace_UI(self): + self.run_replace_test("UI", "FrameOfReferenceUID", "1.2.840.10008.5.1.4.1.1.7") + + def test_replace_UL(self): + self.run_replace_test("UL", "00311101", "888888", 888888) + + def test_replace_UN(self): + self.run_replace_test( + "UN", "00110003", "x0000000001", bytes("x0000000001", "utf-8") + ) + + def test_replace_UR(self): + self.run_replace_test("UR", "00110008", "http://example.com?q=2") + + def test_replace_US(self): + self.run_replace_test("US", "PregnancyStatus", "410", 410) + + def test_replace_UT(self): + self.run_replace_test("UT", "00291022", "TEST_UT") + + def test_replace_UV(self): + self.run_replace_test("UV", "00110010", "1844674407", 1844674407) + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_replace_identifiers.py b/deid/tests/test_replace_identifiers.py index c1ff433..1a3f5d6 100644 --- a/deid/tests/test_replace_identifiers.py +++ b/deid/tests/test_replace_identifiers.py @@ -14,9 +14,9 @@ from pydicom.sequence import Sequence from deid.data import get_dataset -from deid.dicom import get_identifiers, replace_identifiers +from deid.dicom import get_files, get_identifiers, replace_identifiers from deid.dicom.parser import DicomParser -from deid.tests.common import create_recipe, get_file +from deid.tests.common import create_recipe from deid.utils import get_installdir global generate_uid @@ -39,7 +39,7 @@ def test_add_private_constant(self): ADD 11112221 SIMPSON """ print("Test add private tag constant value") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "ADD", "field": "11112221", "value": "SIMPSON"}] recipe = create_recipe(actions) @@ -59,7 +59,7 @@ def test_add_private_constant_save_true(self): ADD 11112221 SIMPSON """ print("Test add private tag constant value") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "ADD", "field": "11112221", "value": "SIMPSON"}] recipe = create_recipe(actions) @@ -83,7 +83,7 @@ def test_add_public_constant(self): """ print("Test add public tag constant value") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "ADD", "field": "PatientIdentityRemoved", "value": "YES"}] recipe = create_recipe(actions) @@ -105,7 +105,7 @@ def test_replace_with_constant(self): """ print("Test replace tags with constant values") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) newfield1 = "AccessionNumber" newvalue1 = "987654321" @@ -118,16 +118,12 @@ def test_replace_with_constant(self): ] recipe = create_recipe(actions) - # Create a DicomParser to easily find fields - parser = DicomParser(dicom_file) - parser.parse() - - # The first in the list is the highest level - field1 = list(parser.find_by_name(newfield1).values())[0] - field2 = list(parser.find_by_name(newfield2).values())[0] + inputfile = read_file(dicom_file) + field1 = inputfile[newfield1].value + field2 = inputfile[newfield2].value - self.assertNotEqual(newvalue1, field1.element.value) - self.assertNotEqual(newvalue2, field2.element.value) + self.assertNotEqual(newvalue1, field1) + self.assertNotEqual(newvalue2, field2) result = replace_identifiers( dicom_files=dicom_file, @@ -148,7 +144,7 @@ def test_jitter_replace_compounding(self): """ print("Test replace tags with constant values") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) newfield1 = "AcquisitionDate" newvalue1 = "20210330" @@ -183,7 +179,7 @@ def test_remove(self): """ print("Test remove of public and private tags") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) field1name = "InstitutionName" field2name = "00190010" @@ -234,7 +230,7 @@ def test_add_tag_variable(self): """ print("Test add tag constant value from variable") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "ADD", "field": "11112221", "value": "var:myVar"}, @@ -264,7 +260,7 @@ def test_add_tag_variable_save_true(self): """ print("Test add tag constant value from variable") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "ADD", "field": "11112221", "value": "var:myVar"}, @@ -296,7 +292,7 @@ def test_jitter_date(self): """ print("Test date jitter") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "JITTER", "field": "StudyDate", "value": "1"}] recipe = create_recipe(actions) @@ -318,7 +314,7 @@ def test_jitter_timestamp(self): """ print("Test timestamp jitter") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "JITTER", "field": "AcquisitionDateTime", "value": "1"}] recipe = create_recipe(actions) @@ -343,7 +339,7 @@ def test_expanders(self): """ print("Test contains, endswith, and startswith expanders") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "contains:Collimation"}, @@ -360,7 +356,7 @@ def test_expanders(self): strip_sequences=False, ) self.assertEqual(1, len(result)) - self.assertEqual(157, len(result[0])) + self.assertEqual(171, len(result[0])) with self.assertRaises(KeyError): result[0]["ExposureTime"].value with self.assertRaises(KeyError): @@ -375,7 +371,7 @@ def test_expander_except(self): """ print("Test except expander") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "except:Manufacturer"}] recipe = create_recipe(actions) @@ -409,7 +405,7 @@ def test_fieldset_remove(self): """ print("Test public tag fieldset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "fields:field_set1"}] fields = OrderedDict() @@ -461,7 +457,7 @@ def test_valueset_remove(self): """ print("Test public tag valueset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "values:value_set1"}] values = OrderedDict() @@ -507,7 +503,7 @@ def test_fieldset_remove_private(self): """ print("Test private tag fieldset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "fields:field_set2_private"}] fields = OrderedDict() @@ -522,7 +518,7 @@ def test_fieldset_remove_private(self): self.assertTrue("(0009, 0010)" in parser.lookup["field_set2_private"]) self.assertTrue("(0010, 0020)" in parser.lookup["field_set2_private"]) - self.assertEqual(162, len(parser.dicom)) + self.assertEqual(176, len(parser.dicom)) self.assertEqual("SIEMENS CT VA0 COAD", parser.dicom["00190010"].value) with self.assertRaises(KeyError): parser.dicom["00090010"].value @@ -539,7 +535,7 @@ def test_valueset_private(self): """ print("Test private tag valueset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "values:value_set2_private"}] values = OrderedDict() @@ -568,7 +564,7 @@ def test_tag_expanders_taggroup(self): REMOVE contains:0009 """ print("Test expanding tag by tag number part (matches group numbers only)") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "contains:0009"}] recipe = create_recipe(actions) @@ -588,7 +584,7 @@ def test_tag_expanders_midtag(self): Should remove: (0008, 103e) Series Description """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "contains:8103"}] recipe = create_recipe(actions) @@ -615,7 +611,7 @@ def test_tag_expanders_tagelement(self): print( "Test expanding tag by tag number part (matches groups and element numbers)" ) - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "REMOVE", "field": "contains:0010"}] recipe = create_recipe(actions) @@ -628,7 +624,7 @@ def test_tag_expanders_tagelement(self): disable_skip=True, ) self.assertEqual(1, len(result)) - self.assertEqual(139, len(result[0])) + self.assertEqual(152, len(result[0])) with self.assertRaises(KeyError): result[0]["00090010"].value with self.assertRaises(KeyError): @@ -640,7 +636,7 @@ def test_remove_all_func(self): REMOVE ALL func:contains_hibbard """ print("Test tag removal by") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) def contains_hibbard(dicom, value, field, item): from pydicom.tag import Tag @@ -663,7 +659,7 @@ def contains_hibbard(dicom, value, field, item): parser.define("contains_hibbard", contains_hibbard) parser.parse() - self.assertEqual(160, len(parser.dicom)) + self.assertEqual(174, len(parser.dicom)) with self.assertRaises(KeyError): parser.dicom["ReferringPhysicianName"].value with self.assertRaises(KeyError): @@ -680,7 +676,7 @@ def test_remove_all_keep_field_compounding_should_keep(self): KEEP StudyDate ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -703,7 +699,7 @@ def test_remove_except_field_keep_other_field_compounding_should_keep(self): REMOVE ALL ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "except:Manufacturer"}, @@ -728,7 +724,7 @@ def test_remove_all_add_field_compounding_should_add(self): ADD PatientIdentityRemoved Yes ADD StudyDate 19700101 """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -751,7 +747,7 @@ def test_remove_all_blank_field_compounding_should_remove(self): ADD PatientIdentityRemoved Yes BLANK StudyDate """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -775,7 +771,7 @@ def test_blank_field_keep_field_compounding_should_keep(self): BLANK StudyDate KEEP StudyDate """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "ADD", "field": "PatientIdentityRemoved", "value": "Yes"}, @@ -799,7 +795,7 @@ def test_remove_keep_add_field_compounding_should_add(self): ADD StudyDate 19700101 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -823,7 +819,7 @@ def test_remove_all_replace_one_should_replace(self): REPLACE StudyDate 19700101 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -846,7 +842,7 @@ def test_remove_all_jitter_one_should_jitter(self): JITTER StudyDate 1 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -870,7 +866,7 @@ def test_remove_all_keep_one_replace_it_should_keep(self): REPLACE StudyDate 19700101 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -895,7 +891,7 @@ def test_remove_all_keep_one_jitter_it_should_keep(self): JITTER StudyDate 1 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "ALL"}, @@ -919,7 +915,7 @@ def test_remove_field_replace_it_should_replace(self): REPLACE StudyDate 19700101 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "StudyDate"}, @@ -942,7 +938,7 @@ def test_remove_field_jitter_it_should_jitter(self): JITTER StudyDate 1 ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "StudyDate"}, @@ -965,7 +961,7 @@ def test_remove_field_keep_same_field_compounding_should_keep(self): KEEP StudyDate ADD PatientIdentityRemoved Yes """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "StudyDate"}, @@ -986,7 +982,7 @@ def test_remove_except_is_acting_as_substring(self): %header REMOVE except:Manufacturer """ - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "except:Manufacturer"}, @@ -1009,7 +1005,7 @@ def test_strip_sequences(self): ADD PatientIdentityRemoved YES """ print("Test strip_sequences") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [{"action": "ADD", "field": "PatientIdentityRemoved", "value": "YES"}] recipe = create_recipe(actions) @@ -1021,7 +1017,7 @@ def test_strip_sequences(self): strip_sequences=True, ) self.assertEqual(1, len(result)) - self.assertEqual(156, len(result[0])) + self.assertEqual(170, len(result[0])) with self.assertRaises(KeyError): result[0]["00081110"].value for tag in result[0]: @@ -1039,7 +1035,7 @@ def test_nested_replace(self): REPLACE contains:StudyInstanceUID var:new_val """ print("Test nested_replace") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ { @@ -1079,7 +1075,7 @@ def test_jitter_compounding(self): JITTER StudyDate 2 """ print("Test jitter compounding") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "JITTER", "field": "StudyDate", "value": "1"}, @@ -1095,7 +1091,7 @@ def test_jitter_compounding(self): ) self.assertEqual(1, len(result)) - self.assertEqual(155, len(result[0])) + self.assertEqual(169, len(result[0])) self.assertEqual("20230104", result[0]["StudyDate"].value) def test_addremove_compounding(self): @@ -1110,7 +1106,7 @@ def test_addremove_compounding(self): REMOVE PatientIdentityRemoved """ print("Test addremove compounding") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "ADD", "field": "PatientIdentityRemoved", "value": "YES"}, @@ -1126,7 +1122,7 @@ def test_addremove_compounding(self): ) self.assertEqual(1, len(result)) - self.assertEqual(155, len(result[0])) + self.assertEqual(169, len(result[0])) with self.assertRaises(KeyError): result[0]["PatientIdentityRemoved"].value @@ -1142,7 +1138,7 @@ def test_removeadd_compounding(self): ADD StudyDate 20200805 """ print("Test remove/add compounding") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) actions = [ {"action": "REMOVE", "field": "PatientID"}, @@ -1158,7 +1154,7 @@ def test_removeadd_compounding(self): ) self.assertEqual(1, len(result)) - self.assertEqual(155, len(result[0])) + self.assertEqual(169, len(result[0])) self.assertEqual("123456", result[0]["PatientID"].value) def test_valueset_empty_remove(self): @@ -1175,7 +1171,7 @@ def test_valueset_empty_remove(self): import pydicom print("Test empty value valueset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) original_dataset = pydicom.dcmread(dicom_file) actions = [{"action": "REMOVE", "field": "values:value_set1"}] @@ -1216,7 +1212,7 @@ def test_valueset_remove_one_empty(self): import pydicom print("Test one empty value valueset") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) original_dataset = pydicom.dcmread(dicom_file) actions = [{"action": "REMOVE", "field": "values:value_set1"}] @@ -1261,7 +1257,7 @@ def test_jitter_values(self): import pydicom print("Test jitter from values list") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) original_dataset = pydicom.dcmread(dicom_file) actions = [{"action": "JITTER", "field": "values:value_set1", "value": "1"}] @@ -1304,7 +1300,7 @@ def test_jitter_private_tag(self): import pydicom print("Test jitter private tag") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) original_dataset = pydicom.dcmread(dicom_file) actions = [{"action": "JITTER", "field": "00291019", "value": "1"}] @@ -1333,7 +1329,7 @@ def test_jitter_blank_date(self): import pydicom print("Test jitter date field containing space") - dicom_file = get_file(self.dataset) + dicom_file = next(get_files(self.dataset, pattern="ctbrain1.dcm")) original_dataset = pydicom.dcmread(dicom_file) actions = [{"action": "JITTER", "field": "ContentDate", "value": "1"}] diff --git a/deid/tests/test_sequence_blank.py b/deid/tests/test_sequence_blank.py new file mode 100644 index 0000000..cd3ba55 --- /dev/null +++ b/deid/tests/test_sequence_blank.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import get_files, replace_identifiers +from deid.tests.common import create_recipe +from deid.utils import get_installdir + +global generate_uid + + +class TestSequenceBlank(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def test_blank_single_named_field(self): + print("Test BLANK on a single-occurrence named field.") + field = "PatientName" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("", outputfile[field].value) + + def test_blank_single_tag_field(self): + print("Test BLANK on a single-occurrence private field.") + field = "00110009" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("", outputfile[field].value) + + def test_blank_one_level_one_occurrence(self): + print("Test BLANK one level one occurrence") + field = "00150002" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070001"] + self.assertEqual(currentparent.VR, "SQ") + + # We know this is a single-occurrence sequence - just target first occurrence + sequencevalue = currentparent.value[0] + currentvalue = sequencevalue[field].value + + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070001"] + self.assertEqual(outputparent.VR, "SQ") + + outputsequence = outputparent.value[0] + self.assertEqual("", outputsequence[field].value) + + def test_blank_one_level_multiple_occurrences(self): + print("Test BLANK one level multiple occurrences") + field = "00150003" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070002"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070002"] + self.assertEqual(outputparent.VR, "SQ") + + for sequencevalue in outputparent: + self.assertEqual("", sequencevalue[field].value) + + def test_blank_multiple_levels_multiple_occurrences(self): + print("Test BLANK multiple levels multiple occurrences") + field = "00150006" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + level1parent = inputfile["00070003"] + self.assertEqual(level1parent.VR, "SQ") + + for sequenceoccurrence in level1parent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertIsNotNone(level2dataset[field].value) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070003"] + self.assertEqual(outputparent.VR, "SQ") + + for sequenceoccurrence in outputparent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertEqual("", level2dataset[field].value) + + def test_blank_nested_named_field(self): + print("Test BLANK on a nested named field.") + field = "AccessionNumber" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "BLANK", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + currentparent = inputfile["RequestAttributesSequence"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["RequestAttributesSequence"] + self.assertEqual(outputparent.VR, "SQ") + outputsqvalue = outputparent.value + + for sequencevalue in outputsqvalue: + self.assertEqual("", sequencevalue[field].value) + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_sequence_jitter.py b/deid/tests/test_sequence_jitter.py new file mode 100644 index 0000000..3cf5a52 --- /dev/null +++ b/deid/tests/test_sequence_jitter.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import get_files, replace_identifiers +from deid.tests.common import create_recipe +from deid.utils import get_installdir + +global generate_uid + + +class TestSequenceJitter(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def test_jitter_single_named_field(self): + print("Test JITTER on a single-occurrence named field.") + field = "StudyDate" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "JITTER", "field": field, "value": "1"}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("20230102", outputfile[field].value) + + def test_jitter_single_tag_field(self): + print("Test JITTER on a single-occurrence private field.") + field = "00110017" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "JITTER", "field": field, "value": "1"}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("20260318", outputfile[field].value) + + def test_jitter_one_level_one_occurrence(self): + print("Test JITTER one level one occurrence") + field = "00150012" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "JITTER", "field": field, "value": "1"}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070011"] + self.assertEqual(currentparent.VR, "SQ") + + # We know this is a single-occurrence sequence - just target first occurrence + sequencevalue = currentparent.value[0] + currentvalue = sequencevalue[field].value + + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070011"] + self.assertEqual(outputparent.VR, "SQ") + + outputsequence = outputparent.value[0] + self.assertEqual("19900707", outputsequence[field].value) + + def test_jitter_one_level_multiple_occurrences(self): + print("Test JITTER one level multiple occurrences") + field = "00150013" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "JITTER", "field": field, "value": "1"}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070012"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070012"] + self.assertEqual(outputparent.VR, "SQ") + + self.assertEqual("19850409", outputparent.value[0][field].value) + self.assertEqual("20050131", outputparent.value[1][field].value) + + def test_jitter_multiple_levels_multiple_occurrences(self): + print("Test JITTER multiple levels multiple occurrences") + field = "00150016" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "JITTER", "field": field, "value": "1"}]) + + inputfile = read_file(dicom_file) + level1parent = inputfile["00070013"] + self.assertEqual(level1parent.VR, "SQ") + + for sequenceoccurrence in level1parent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertIsNotNone(level2dataset[field].value) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070013"] + self.assertEqual(outputparent.VR, "SQ") + + for sequenceoccurrence in outputparent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertIn(level2dataset[field].value, ["19380929", "20221207"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_sequence_remove.py b/deid/tests/test_sequence_remove.py new file mode 100644 index 0000000..34ea293 --- /dev/null +++ b/deid/tests/test_sequence_remove.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import get_files, replace_identifiers +from deid.tests.common import create_recipe +from deid.utils import get_installdir + +global generate_uid + + +class TestSequenceRemove(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def test_remove_single_named_field(self): + print("Test REMOVE on a single-occurrence named field.") + field = "PatientName" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + def test_remove_single_tag_field(self): + print("Test REMOVE on a single-occurrence private field.") + field = "00110002" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + with self.assertRaises(KeyError): + _ = outputfile[field].value + + def test_remove_one_level_one_occurrence(self): + print("Test REMOVE one level one occurrence") + field = "00150002" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070001"] + self.assertEqual(currentparent.VR, "SQ") + + # We know this is a single-occurrence sequence - just target first occurrence + sequencevalue = currentparent.value[0] + currentvalue = sequencevalue[field].value + + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070001"] + self.assertEqual(outputparent.VR, "SQ") + + outputsequence = outputparent.value[0] + with self.assertRaises(KeyError): + _ = outputsequence[field].value + + def test_remove_one_level_multiple_occurrences(self): + print("Test REMOVE one level multiple occurrences") + field = "00150003" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070002"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070002"] + self.assertEqual(outputparent.VR, "SQ") + + for sequencevalue in outputparent: + with self.assertRaises(KeyError): + _ = sequencevalue[field].value + + def test_remove_multiple_levels_multiple_occurrences(self): + print("Test REMOVE multiple levels multiple occurrences") + field = "00150006" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + level1parent = inputfile["00070003"] + self.assertEqual(level1parent.VR, "SQ") + + for sequenceoccurrence in level1parent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertIsNotNone(level2dataset[field].value) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070003"] + self.assertEqual(outputparent.VR, "SQ") + + for sequenceoccurrence in outputparent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + with self.assertRaises(KeyError): + _ = level2dataset[field].value + + def test_remove_nested_named_field(self): + print("Test REMOVE on a nested named field.") + field = "AccessionNumber" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe([{"action": "REMOVE", "field": field}]) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + currentparent = inputfile["RequestAttributesSequence"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["RequestAttributesSequence"] + self.assertEqual(outputparent.VR, "SQ") + outputsqvalue = outputparent.value + + for sequencevalue in outputsqvalue: + with self.assertRaises(KeyError): + _ = sequencevalue[field].value + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_sequence_replace.py b/deid/tests/test_sequence_replace.py new file mode 100644 index 0000000..c7b336e --- /dev/null +++ b/deid/tests/test_sequence_replace.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python + +__author__ = "Vanessa Sochat" +__copyright__ = "Copyright 2016-2022, Vanessa Sochat" +__license__ = "MIT" + +import os +import shutil +import tempfile +import unittest + +from pydicom import read_file + +from deid.data import get_dataset +from deid.dicom import get_files, replace_identifiers +from deid.tests.common import create_recipe +from deid.utils import get_installdir + +global generate_uid + + +class TestSequenceReplace(unittest.TestCase): + def setUp(self): + self.pwd = get_installdir() + self.deid = os.path.abspath("%s/../examples/deid/deid.dicom" % self.pwd) + self.dataset = get_dataset("humans") + self.tmpdir = tempfile.mkdtemp() + print("\n######################START######################") + + def tearDown(self): + shutil.rmtree(self.tmpdir) + print("\n######################END########################") + + def test_replace_single_named_field(self): + print("Test REPLACE on a single-occurrence named field.") + field = "PatientName" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("ReplacementValue", outputfile[field].value) + + def test_replace_single_tag_field(self): + print("Test REPLACE on a single-occurrence private field.") + field = "00110009" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + self.assertEqual("ReplacementValue", outputfile[field].value) + + def test_replace_one_level_one_occurrence(self): + print("Test REPLACE one level one occurrence") + field = "00150002" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070001"] + self.assertEqual(currentparent.VR, "SQ") + + # We know this is a single-occurrence sequence - just target first occurrence + sequencevalue = currentparent.value[0] + currentvalue = sequencevalue[field].value + + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070001"] + self.assertEqual(outputparent.VR, "SQ") + + outputsequence = outputparent.value[0] + self.assertEqual("ReplacementValue", outputsequence[field].value) + + def test_replace_one_level_multiple_occurrences(self): + print("Test REPLACE one level multiple occurrences") + field = "00150003" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + currentparent = inputfile["00070002"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070002"] + self.assertEqual(outputparent.VR, "SQ") + + for sequencevalue in outputparent: + self.assertEqual("ReplacementValue", sequencevalue[field].value) + + def test_replace_multiple_levels_multiple_occurrences(self): + print("Test REPLACE multiple levels multiple occurrences") + field = "00150006" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + level1parent = inputfile["00070003"] + self.assertEqual(level1parent.VR, "SQ") + + for sequenceoccurrence in level1parent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertIsNotNone(level2dataset[field].value) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["00070003"] + self.assertEqual(outputparent.VR, "SQ") + + for sequenceoccurrence in outputparent: + for sequence2value in sequenceoccurrence: + self.assertEqual(sequence2value.VR, "SQ") # 0007,0004 + level2value = sequence2value.value + self.assertIsNotNone(level2value) + + for level2dataset in level2value: + self.assertEqual("ReplacementValue", level2dataset[field].value) + + def test_replace_nested_named_field(self): + print("Test REPLACE on a nested named field.") + field = "AccessionNumber" + dicom_file = next(get_files(self.dataset, pattern="ctbrain2.dcm")) + recipe = create_recipe( + [{"action": "REPLACE", "field": field, "value": "ReplacementValue"}] + ) + + inputfile = read_file(dicom_file) + currentValue = inputfile[field].value + self.assertIsNotNone(currentValue) + + currentparent = inputfile["RequestAttributesSequence"] + self.assertEqual(currentparent.VR, "SQ") + + for sequencevalue in currentparent: + currentvalue = sequencevalue[field].value + self.assertIsNotNone(currentvalue) + + result = replace_identifiers( + dicom_files=dicom_file, + deid=recipe, + save=True, + remove_private=False, + strip_sequences=False, + ) + + outputfile = read_file(result[0]) + self.assertEqual(1, len(result)) + + outputparent = outputfile["RequestAttributesSequence"] + self.assertEqual(outputparent.VR, "SQ") + outputsqvalue = outputparent.value + + for sequencevalue in outputsqvalue: + self.assertEqual("ReplacementValue", sequencevalue[field].value) + + +if __name__ == "__main__": + unittest.main() diff --git a/deid/tests/test_utils_files.py b/deid/tests/test_utils_files.py index ad58e2c..081d727 100644 --- a/deid/tests/test_utils_files.py +++ b/deid/tests/test_utils_files.py @@ -33,7 +33,7 @@ def test_get_files(self): found = 0 for dicom_file in get_files(self.dataset): found += 1 - expected = 1 + expected = 2 self.assertEqual(found, expected) print("Case 2: Ask for files from empty folder") @@ -50,7 +50,7 @@ def test_get_files_as_list(self): dicom_files = list(get_files(self.dataset)) found = len(dicom_files) - expected = 1 + expected = 2 self.assertEqual(found, expected) print("Case 2: Ask for files from empty folder") diff --git a/deid/utils/actions.py b/deid/utils/actions.py index ca5b26d..dffca44 100644 --- a/deid/utils/actions.py +++ b/deid/utils/actions.py @@ -7,6 +7,7 @@ import dateutil.parser +from deid.dicom.fields import DicomField from deid.logger import bot @@ -26,7 +27,7 @@ def parse_value(dicom, value, item=None, field=None, funcs=None): item = dict() # Does the user want a custom value? - if re.search("[:]", value): + if re.search("(^var:)|(^func:)|(^deid_func:)", value): value_type, value_option = value.split(":", 1) if value_type.lower() == "var": @@ -66,10 +67,24 @@ def parse_value(dicom, value, item=None, field=None, funcs=None): # item is the lookup, value from the recipe, and field # The field is an entire dicom element object return item[value_option](dicom=dicom, value=value, field=field, item=item) - - bot.warning("%s is not a valid value type, skipping." % (value_type)) - return None - return value + else: + bot.warning(f"{value_type} is not a valid value type, skipping.") + return None + + # Determine if the value is for an existing field. If so, + # the value must be converted to conform to the appropriate Python type. + # Otherwise the field can remain as string and be auto-added as such. + existingField = False + if isinstance(field, str) and dicom is not None and field in dicom: + existingField = True + fieldName = dicom[field].name + fieldVR = dicom[field].VR + elif isinstance(field, DicomField): + existingField = True + fieldName = field.name + fieldVR = field.element.VR + + return convert_value(fieldName, fieldVR, value) if existingField else value def parse_keyvalue_pairs(pairs): @@ -141,3 +156,41 @@ def get_timestamp(item_date, item_time=None, jitter_days=None, format=None): timestamp = timestamp + timedelta(days=jitter_days) return timestamp.strftime(format) + + +def convert_value(field, VR, value): + """ + convert_value converts the value specified into the appropriate Python types + for writing by pydicom. + https://pydicom.github.io/pydicom/dev/guides/element_value_types.html + + If the value cannot be casted to the appropriate type, it is converted to None + and will be blanked by the operation. + """ + + if VR in ["FL", "FD"]: + try: + return float(value) + except (ValueError, TypeError): + bot.warning( + f"Value ({value}) is not a valid value for VR {VR} field: {field}. Field will be BLANKED." + ) + return None + elif VR in ["OB", "OD", "OF", "OL", "OV", "OW", "UN"]: + try: + return bytes(value, "utf-8") + except (ValueError, TypeError): + bot.warning( + f"Value ({value}) is not a valid value for VR {VR} field: {field}. Field will be BLANKED." + ) + return None + elif VR in ["SL", "SS", "SV", "UL", "US", "UV"]: + try: + return int(value) + except (ValueError, TypeError): + bot.warning( + f"Value ({value}) is not a valid value for VR {VR} field: {field}. Field will be BLANKED." + ) + return None + + return value diff --git a/deid/version.py b/deid/version.py index d4af9e0..c078092 100644 --- a/deid/version.py +++ b/deid/version.py @@ -2,7 +2,7 @@ __copyright__ = "Copyright 2016-2022, Vanessa Sochat" __license__ = "MIT" -__version__ = "0.3.1" +__version__ = "0.3.2" AUTHOR = "Vanessa Sochat" AUTHOR_EMAIL = "vsoch@users.noreply.github.com" NAME = "deid" diff --git a/docs/_docs/user-docs/recipe-headers.md b/docs/_docs/user-docs/recipe-headers.md index 01132ce..0cda4b0 100644 --- a/docs/_docs/user-docs/recipe-headers.md +++ b/docs/_docs/user-docs/recipe-headers.md @@ -251,6 +251,80 @@ The above would remove everything except for the pixel data, and a few fields that are relevant to its dimensions. It would add a field to indicate the patient's identity was removed. +The table below shows the full details of how multiple actions defined on the same +field will interact with each other. In general the second action encountered on a field will supercede the first, however there are a few cases in which the actions are combined or the first supercedes the second. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Second Action
ADDBLANKJITTERKEEPREPLACEREMOVE
First ActionADDADD #2BLANKADD + JITTER
Actions Combined
ADD
First Action Overrides Second
REPLACEREMOVE
BLANKADDBLANKBLANK
First Action Overrides Second
KEEPREPLACEREMOVE
JITTERADDBLANKJITTER #1 + JITTER #2
Actions Combined
KEEPREPLACEJITTER
First Action Overrides Second
KEEPADDKEEP
First Action Overrides Second
KEEP
First Action Overrides Second
KEEP
First Action Overrides Second
KEEP
First Action Overrides Second
KEEP
First Action Overrides Second
REPLACEADDBLANKREPLACE + JITTER
Actions Combined
KEEPREPLACE #2REPLACE
First Action Overrides Second
REMOVEADDREMOVE
First Action Overrides Second
JITTERKEEPREPLACEREMOVE
+ ##### Jitter For jitter, you can add a hard coded number, or a variable to specify it: