Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/transform: add apply-geolocation-rules #41

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions ingest/bin/apply-geolocation-rules
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Applies user curated geolocation rules to the geolocation fields in the NDJSON
records from stdin. The modified records are output to stdout. This does not do
any additional transformations on top of the user curations.
"""
import argparse
import json
from collections import defaultdict
from sys import exit, stderr, stdin, stdout


class CyclicGeolocationRulesError(Exception):
pass


def load_geolocation_rules(geolocation_rules_file):
"""
Loads the geolocation rules from the provided *geolocation_rules_file*.
Returns the rules as a dict:
{
regions: {
countries: {
divisions: {
locations: corrected_geolocations_tuple
}
}
}
}
"""
geolocation_rules = defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
with open(geolocation_rules_file, 'r') as rules_fh:
for line in rules_fh:
# ignore comments
if line.lstrip()[0] == '#':
continue

row = line.strip('\n').split('\t')
# Skip lines that cannot be split into raw and annotated geolocations
if len(row) != 2:
print(
f"WARNING: Could not decode geolocation rule {line!r}.",
"Please make sure rules are formatted as",
"'region/country/division/location<tab>region/country/division/location'.",
file=stderr)
continue

# remove trailing comments
row[-1] = row[-1].partition('#')[0].rstrip()
raw , annot = tuple( row[0].split('/') ) , tuple( row[1].split('/') )

# Skip lines where raw or annotated geolocations cannot be split into 4 fields
if len(raw) != 4:
print(
f"WARNING: Could not decode the raw geolocation {row[0]!r}.",
"Please make sure it is formatted as 'region/country/division/location'.",
file=stderr
)
continue

if len(annot) != 4:
print(
f"WARNING: Could not decode the annotated geolocation {row[1]!r}.",
"Please make sure it is formatted as 'region/country/division/location'.",
file=stderr
)
continue


geolocation_rules[raw[0]][raw[1]][raw[2]][raw[3]] = annot

return geolocation_rules


def get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal = None):
"""
Gets the annotated geolocation for the *raw_geolocation* in the provided
*geolocation_rules*.

Recursivley traverses the *geolocation_rules* until we get the annotated
geolocation, which must be a Tuple. Returns `None` if there are no
applicable rules for the provided *raw_geolocation*.

Rules are applied in the order of region, country, division, location.
First checks the provided raw values for geolocation fields, then if there
are not matches, tries to use general rules marked with '*'.
"""
# Always instantiate the rule traversal as an empty list if not provided,
# e.g. the first call of this recursive function
if rule_traversal is None:
rule_traversal = []

current_rules = geolocation_rules
# Traverse the geolocation rules based using the rule_traversal values
for field_value in rule_traversal:
current_rules = current_rules.get(field_value)
# If we hit `None`, then we know there are no matching rules, so stop the rule traversal
if current_rules is None:
break

# We've found the tuple of the annotated geolocation
if isinstance(current_rules, tuple):
return current_rules

# We've reach the next level of geolocation rules,
# so try to traverse the rules with the next target in raw_geolocation
if isinstance(current_rules, dict):
next_traveral_target = raw_geolocation[len(rule_traversal)]
rule_traversal.append(next_traveral_target)
return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal)

# We did not find any matching rule for the last traversal target
if current_rules is None:
# If we've used all general rules and we still haven't found a match,
# then there are no applicable rules for this geolocation
if all(value == '*' for value in rule_traversal):
return None

# Find the index of the first of the consecutive '*' from the
# end of the rule_traversal
# [A, *, B, *] => first_consecutive_general_rule_index = 3
# [A, *, *, *] => first_consecutive_general_rule_index = 1
first_consecutive_general_rule_index = 0
for index, field_value in reversed(list(enumerate(rule_traversal))):
if field_value == '*':
first_consecutive_general_rule_index = index
else:
break

# Edit the raw value of the field directly before the first consecutive '*'
# in the rule traversal in hopes that by moving to a general rule on
# a higher level, we can find a matching rule.
rule_traversal[first_consecutive_general_rule_index - 1] = '*'
Comment on lines +119 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joverlee521 I'm a bit confused here, could you provide an example of what this is trying to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this might be hard to explain. I'll walk through an example.

Example rules:

rule # raw annotated
1 a/b/c/d 1/2/3/4
2 a/*/*/* 1/*/*/*

A example case: ( a, b, c, e )

Here is how rule_traversal will be edited per functional call:

  1. [a]
  2. [a, b]
  3. [a, b, c] <- Up to this point, we are matching rule # 1
  4. [a, b, c, e] <- This doesn't match any available rules, so we try to backfill with *
  5. [a, b, c, * ] <- Still no matching rule, continue * backfill
  6. [a, b, *, *] <- Still no matching rule, continue * backfill
  7. [a, *, *, * ] <- This now matches rule # 2!

And of course as I explain this, I'm realizing there are cases where this logic would not work...I'll create a PR to update this tomorrow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #55 for the updated logic 🙏


return get_annotated_geolocation(geolocation_rules, raw_geolocation, rule_traversal)


def transform_geolocations(geolocation_rules, geolocation):
"""
Transform the provided *geolocation* by looking it up in the provided
*geolocation_rules*.

This will use all rules that apply to the geolocation and rules will
be applied in the order of region, country, division, location.

Returns the original geolocation if no geolocation rules apply.

Raises a `CyclicGeolocationRulesError` if more than 1000 rules have
been applied to the raw geolocation.
"""
transformed_values = geolocation
rules_applied = 0
continue_to_apply = True

while continue_to_apply:
annotated_values = get_annotated_geolocation(geolocation_rules, transformed_values)

# Stop applying rules if no annotated values were found
if annotated_values is None:
continue_to_apply = False
else:
rules_applied += 1

if rules_applied > 1000:
raise CyclicGeolocationRulesError(
"ERROR: More than 1000 geolocation rules applied on the same entry {geolocation!r}."
)

# Create a new list of values for comparison to previous values
new_values = list(transformed_values)
for index, value in enumerate(annotated_values):
# Keep original value if annotated value is '*'
if value != '*':
new_values[index] = value

# Stop applying rules if this rule did not change the values,
# since this means we've reach rules with '*' that no longer change values
if new_values == transformed_values:
continue_to_apply = False

transformed_values = new_values

return transformed_values


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--region-field", default="region",
help="Field that contains regions in NDJSON records.")
parser.add_argument("--country-field", default="country",
help="Field that contains countries in NDJSON records.")
parser.add_argument("--division-field", default="division",
help="Field that contains divisions in NDJSON records.")
parser.add_argument("--location-field", default="location",
help="Field that contains location in NDJSON records.")
parser.add_argument("--geolocation-rules", metavar="TSV", required=True,
help="TSV file of geolocation rules with the format: " +
"'<raw_geolocation><tab><annotated_geolocation>' where the raw and annotated geolocations " +
"are formated as '<region>/<country>/<division>/<location>'. " +
"If creating a general rule, then the raw field value can be substituted with '*'." +
"Lines starting with '#' will be ignored as comments." +
"Trailing '#' will be ignored as comments.")

args = parser.parse_args()

location_fields = [args.region_field, args.country_field, args.division_field, args.location_field]

geolocation_rules = load_geolocation_rules(args.geolocation_rules)

for record in stdin:
record = json.loads(record)

try:
annotated_values = transform_geolocations(geolocation_rules, [record[field] for field in location_fields])
except CyclicGeolocationRulesError as e:
print(e, file=stderr)
exit(1)

for index, field in enumerate(location_fields):
record[field] = annotated_values[index]

json.dump(record, stdout, allow_nan=False, indent=None, separators=',:')
print()
5 changes: 5 additions & 0 deletions ingest/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ transform:
authors_default_value: '?'
# Field name for the generated abbreviated authors
abbr_authors_field: 'abbr_authors'
# General geolocation rules to apply to geolocation fields
geolocation_rules_url: 'https://raw.githubusercontent.com/nextstrain/ncov-ingest/master/source-data/gisaid_geoLocationRules.tsv'
# Local geolocation rules that are only applicable to monkeypox data
# Local rules can overwrite the general geolocation rules provided above
local_geolocation_rules: './source-data/geolocation-rules.tsv'
# User annotations file
annotations: './source-data/annotations.tsv'
# ID field used to merge annotations
Expand Down
14 changes: 14 additions & 0 deletions ingest/source-data/geolocation-rules.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Africa/Cote d'Ivoire/*/* Africa/Côte d'Ivoire/*/*
Africa/Cote d'Ivoire/Tai National Park/* Africa/Côte d'Ivoire/Bas-Sassandra/Tai National Park
Africa/Democratic Republic of Congo/Province Bandundu/* Africa/Democratic Republic of Congo/Bandundu/*
Africa/Democratic Republic of Congo/Province Equateur/* Africa/Democratic Republic of Congo/Équateur/*
Africa/Democratic Republic of Congo/Province Kasai Occidental/* Africa/Democratic Republic of Congo/Kasaï-Occidental/*
Africa/Democratic Republic of Congo/Province Kasai Oriental/* Africa/Democratic Republic of Congo/Kasaï-Oriental/*
Africa/Democratic Republic of Congo/Province P. Oriental/* Africa/Democratic Republic of Congo/Orientale/*
Africa/Democratic Republic of Congo/Yangdongi/ Africa/Democratic Repubic of Congo/Mongala/Yangdongi
Africa/Democratic Republic of Congo/Zaire/* Africa/Democratic Repubic qof Congo//
Europe/France/Paris/* Europe/France/Ile de France/Paris FR
Europe/Italy/Fvg/Gorizia Europe/Italy/Friuli Venezia Giulia/Gorizia
# Unclear which location is the real location
Europe/Netherlands/Utrecht/Rotterdam Europe/Netherlands//
North America/USA/Washington/Dc North America/USA/Washington DC/
26 changes: 25 additions & 1 deletion ingest/workflow/snakemake_rules/transform.smk
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,31 @@ This will produce output files as
Parameters are expected to be defined in `config.transform`.
"""

rule fetch_general_geolocation_rules:
output:
general_geolocation_rules = "data/general-geolocation-rules.tsv"
params:
geolocation_rules_url = config['transform']['geolocation_rules_url']
shell:
"""
curl {params.geolocation_rules_url} > {output.general_geolocation_rules}
"""

rule concat_geolocation_rules:
input:
general_geolocation_rules = "data/general-geolocation-rules.tsv",
local_geolocation_rules = config['transform']['local_geolocation_rules']
output:
all_geolocation_rules = "data/all-geolocation-rules.tsv"
shell:
"""
cat {input.general_geolocation_rules} {input.local_geolocation_rules} >> {output.all_geolocation_rules}
"""

rule transform:
input:
sequences_ndjson = "data/sequences.ndjson"
sequences_ndjson = "data/sequences.ndjson",
all_geolocation_rules = "data/all-geolocation-rules.tsv"
output:
metadata = "data/metadata.tsv",
sequences = "data/sequences.fasta"
Expand Down Expand Up @@ -58,6 +80,8 @@ rule transform:
--authors-field {params.authors_field} \
--default-value {params.authors_default_value} \
--abbr-authors-field {params.abbr_authors_field} \
| ./bin/apply-geolocation-rules \
--geolocation-rules {input.all_geolocation_rules} \
| ./bin/merge-user-metadata \
--annotations {params.annotations} \
--id-field {params.annotations_id} \
Expand Down