diff --git a/augur/curate/__init__.py b/augur/curate/__init__.py index 257ce2ff5..bef977618 100644 --- a/augur/curate/__init__.py +++ b/augur/curate/__init__.py @@ -12,13 +12,14 @@ from augur.io.metadata import DEFAULT_DELIMITERS, InvalidDelimiter, read_table_to_dict, read_metadata_with_sequences, write_records_to_tsv from augur.io.sequences import write_records_to_fasta from augur.types import DataErrorMethod -from . import normalize_strings, passthru +from . import normalize_strings, passthru, titlecase SUBCOMMAND_ATTRIBUTE = '_curate_subcommand' SUBCOMMANDS = [ passthru, normalize_strings, + titlecase, ] diff --git a/augur/curate/titlecase.py b/augur/curate/titlecase.py new file mode 100755 index 000000000..347019f42 --- /dev/null +++ b/augur/curate/titlecase.py @@ -0,0 +1,124 @@ +""" +Applies titlecase to string fields in a metadata record +""" +import argparse + +import re +from typing import Optional, Set, Union + +from augur.errors import AugurError +from augur.io.print import print_err +from augur.types import DataErrorMethod + +def register_parser(parent_subparsers): + parser = parent_subparsers.add_parser("titlecase", + parents = [parent_subparsers.shared_parser], + help = __doc__) + + required = parser.add_argument_group(title="REQUIRED") + required.add_argument("--titlecase-fields", nargs="*", + help="List of fields to convert to titlecase.", required=True) + + optional = parser.add_argument_group(title="OPTIONAL") + optional.add_argument("--articles", nargs="*", + help="List of articles that should not be converted to titlecase.") + optional.add_argument("--abbreviations", nargs="*", + help="List of abbreviations that should not be converted to titlecase, keeps uppercase.") + + optional.add_argument("--failure-reporting", + type=DataErrorMethod, + choices=[ method for method in DataErrorMethod ], + default=DataErrorMethod.ERROR_FIRST, + help="How should failed titlecase formatting be reported.") + return parser + + +def titlecase(text: Union[str, None], articles: Set[str] = {}, abbreviations: Set[str] = {}) -> Optional[str]: + """ + Originally from nextstrain/ncov-ingest + + Returns a title cased location name from the given location name + *tokens*. Ensures that no tokens contained in the *whitelist_tokens* are + converted to title case. + + >>> articles = {'a', 'and', 'of', 'the', 'le'} + >>> abbreviations = {'USA', 'DC'} + + >>> titlecase("the night OF THE LIVING DEAD", articles) + 'The Night of the Living Dead' + + >>> titlecase("BRAINE-LE-COMTE, FRANCE", articles) + 'Braine-le-Comte, France' + + >>> titlecase("auvergne-RHÔNE-alpes", articles) + 'Auvergne-Rhône-Alpes' + + >>> titlecase("washington DC, usa", articles, abbreviations) + 'Washington DC, USA' + """ + if not isinstance(text, str): + return None + + words = enumerate(re.split(r'\b', text)) + + def changecase(index, word): + casefold = word.casefold() + upper = word.upper() + + if upper in abbreviations: + return upper + elif casefold in articles and index != 1: + return word.lower() + else: + return word.title() + + return ''.join(changecase(i, w) for i, w in words) + + +def run(args, records): + failures = [] + failure_reporting = args.failure_reporting + + articles = set() + if args.articles: + articles = set(args.articles) + + abbreviations = set() + if args.abbreviations: + abbreviations = set(args.abbreviations) + + for index, record in enumerate(records): + record = record.copy() + record_id = index + + for field in args.titlecase_fields: + titlecased_string = titlecase(record.get(field, ""), articles, abbreviations) + + failure_message = f"Failed to titlecase {field} in record {record_id}" + if titlecased_string is None: + if failure_reporting is DataErrorMethod.ERROR_FIRST: + raise AugurError(failure_message) + + if failure_reporting is DataErrorMethod.WARN: + print_err(f"WARNING: {failure_message}") + + # Keep track of failures for final summary + failures.append((record_id, field, record.get(field, ""))) + else: + record[field] = titlecased_string + + yield record + + if failure_reporting is not DataErrorMethod.SILENT and failures: + failure_message = ( + "Unable to change to titlecase for the following (record, field string):\n" + \ + '\n'.join(map(repr, failures)) + ) + if failure_reporting is DataErrorMethod.ERROR_ALL: + raise AugurError(failure_message) + + elif failure_reporting is DataErrorMethod.WARN: + print_err(f"WARNING: {failure_message}") + + else: + raise ValueError(f"Encountered unhandled failure reporting method: {failure_reporting!r}") \ No newline at end of file diff --git a/tests/functional/curate/cram/titlecase.t b/tests/functional/curate/cram/titlecase.t new file mode 100644 index 000000000..bb4210e55 --- /dev/null +++ b/tests/functional/curate/cram/titlecase.t @@ -0,0 +1,18 @@ +Setup + + $ pushd "$TESTDIR" > /dev/null + $ export AUGUR="${AUGUR:-../../../../bin/augur}" + + +Create NDJSON file for testing titlecase with different forms + + $ cat >$TMP/records.ndjson <<~~ + > {"record": 1, "authors": "john smith", "author2": "Jane Doe"} + > ~~ + + +Test output with Unicode normalization form "NFKC". + + $ cat $TMP/records.ndjson \ + > | ${AUGUR} curate titlecase --titlecase-fields "authors" "author2" + {"record": 1, "authors": "John Smith", "author2": "Jane Doe"}