Skip to content

Commit

Permalink
Harmonize ingest with pathogen repo guide #35
Browse files Browse the repository at this point in the history
  • Loading branch information
j23414 authored Mar 19, 2024
2 parents 8152220 + 2ffef24 commit abb8984
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 196 deletions.
19 changes: 11 additions & 8 deletions ingest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ A pair of files for each dengue serotype (denv1 - denv4)
Run the complete ingest pipeline and upload results to AWS S3 with

```sh
nextstrain build ingest --configfiles config/config.yaml config/optional.yaml
nextstrain build \
--env AWS_ACCESS_KEY_ID \
--env AWS_SECRET_ACCESS_KEY \
ingest \
upload_all \
--configfile build-configs/nextstrain-automation/config.yaml
```

### Adding new sequences not from GenBank
Expand All @@ -70,27 +75,25 @@ Do the following to include sequences from static FASTA files.
!ingest/data/{file-name}.ndjson
```

3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/config/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline.
3. Add the `file-name` (without the `.ndjson` extension) as a source to `ingest/defaults/config.yaml`. This will tell the ingest pipeline to concatenate the records to the GenBank sequences and run them through the same transform pipeline.

## Configuration

Configuration takes place in `config/config.yaml` by default.
Optional configs for uploading files and Slack notifications are in `config/optional.yaml`.
Configuration takes place in `defaults/config.yaml` by default.
Optional configs for uploading files are in `build-configs/nextstrain-automation/config.yaml`.

### Environment Variables

The complete ingest pipeline with AWS S3 uploads and Slack notifications uses the following environment variables:
The complete ingest pipeline with AWS S3 uploads uses the following environment variables:

#### Required

- `AWS_ACCESS_KEY_ID`
- `AWS_SECRET_ACCESS_KEY`
- `SLACK_TOKEN`
- `SLACK_CHANNELS`

#### Optional

These are optional environment variables used in our automated pipeline for providing detailed Slack notifications.
These are optional environment variables used in our automated pipeline.

- `GITHUB_RUN_ID` - provided via [`github.run_id` in a GitHub Action workflow](https://docs.github.com/en/actions/learn-github-actions/contexts#github-context)
- `AWS_BATCH_JOB_ID` - provided via [AWS Batch Job environment variables](https://docs.aws.amazon.com/batch/latest/userguide/job_env_vars.html)
Expand Down
62 changes: 10 additions & 52 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,23 @@ min_version(
"7.7.0"
) # Snakemake 7.7.0 introduced `retries` directive used in fetch-sequences

if not config:

configfile: "config/config.yaml"


send_slack_notifications = config.get("send_slack_notifications", False)
configfile: "defaults/config.yaml"

serotypes = ['all', 'denv1', 'denv2', 'denv3', 'denv4']

def _get_all_targets(wildcards):
# Default targets are the metadata TSV and sequences FASTA files
all_targets = expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes)


# Add additional targets based on upload config
upload_config = config.get("upload", {})

for target, params in upload_config.items():
files_to_upload = params.get("files_to_upload", {})

if not params.get("dst"):
print(
f"Skipping file upload for {target!r} because the destination was not defined."
)
else:
all_targets.extend(
expand(
[f"data/upload/{target}/{{remote_file_name}}.done"],
zip,
remote_file_name=files_to_upload.keys(),
)
)

# Add additional targets for Nextstrain's internal Slack notifications
if send_slack_notifications:
all_targets.extend(
[
"data/notify/genbank-record-change.done",
"data/notify/metadata-diff.done",
]
)

if config.get("trigger_rebuild", False):
all_targets.append("data/trigger/rebuild.done")

return all_targets


rule all:
input:
_get_all_targets,

expand(["results/sequences_{serotype}.fasta", "results/metadata_{serotype}.tsv"], serotype=serotypes)

include: "workflow/snakemake_rules/fetch_sequences.smk"
include: "workflow/snakemake_rules/transform.smk"
include: "workflow/snakemake_rules/split_serotypes.smk"
include: "workflow/snakemake_rules/nextclade.smk"

include: "rules/fetch_from_ncbi.smk"
include: "rules/curate.smk"
include: "rules/split_serotypes.smk"
include: "rules/nextclade.smk"

if config.get("upload", False):
# Include custom rules defined in the config.
if "custom_rules" in config:
for rule_file in config["custom_rules"]:

include: "workflow/snakemake_rules/upload.smk"
include: rule_file
29 changes: 29 additions & 0 deletions ingest/build-configs/nextstrain-automation/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# This configuration file should contain all required configuration parameters
# for the ingest workflow to run with additional Nextstrain automation rules.

# Custom rules to run as part of the Nextstrain automated workflow
# The paths should be relative to the ingest directory.
custom_rules:
- build-configs/nextstrain-automation/upload.smk

# Nextstrain CloudFront domain to ensure that we invalidate CloudFront after the S3 uploads
# This is required as long as we are using the AWS CLI for uploads
cloudfront_domain: "data.nextstrain.org"

# Nextstrain AWS S3 Bucket with pathogen prefix
s3_dst: "s3://nextstrain-data/files/workflows/dengue"

# Mapping of files to upload
files_to_upload:
genbank.ndjson.xz: data/genbank.ndjson
all_sequences.ndjson.xz: data/sequences.ndjson
metadata_all.tsv.zst: results/metadata_all.tsv
sequences_all.fasta.zst: results/sequences_all.fasta
metadata_denv1.tsv.zst: results/metadata_denv1.tsv
sequences_denv1.fasta.zst: results/sequences_denv1.fasta
metadata_denv2.tsv.zst: results/metadata_denv2.tsv
sequences_denv2.fasta.zst: results/sequences_denv2.fasta
metadata_denv3.tsv.zst: results/metadata_denv3.tsv
sequences_denv3.fasta.zst: results/sequences_denv3.fasta
metadata_denv4.tsv.zst: results/metadata_denv4.tsv
sequences_denv4.fasta.zst: results/sequences_denv4.fasta
42 changes: 42 additions & 0 deletions ingest/build-configs/nextstrain-automation/upload.smk
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""
This part of the workflow handles uploading files to AWS S3.
Files to upload must be defined in the `files_to_upload` config param, where
the keys are the remote files and the values are the local filepaths
relative to the ingest directory.
Produces a single file for each uploaded file:
"results/upload/{remote_file}.upload"
The rule `upload_all` can be used as a target to upload all files.
"""
import os


rule upload_to_s3:
input:
file_to_upload=lambda wildcards: config["files_to_upload"][wildcards.remote_file],
output:
"results/upload/{remote_file}.upload",
params:
quiet="" if send_notifications else "--quiet",
s3_dst=config["s3_dst"],
cloudfront_domain=config["cloudfront_domain"],
shell:
"""
./vendored/upload-to-s3 \
{params.quiet} \
{input.file_to_upload:q} \
{params.s3_dst:q}/{wildcards.remote_file:q} \
{params.cloudfront_domain} 2>&1 | tee {output}
"""


rule upload_all:
input:
uploads=[
f"results/upload/{remote_file}.upload"
for remote_file in config["files_to_upload"].keys()
],
output:
touch("results/upload_all.done")
25 changes: 0 additions & 25 deletions ingest/config/optional.yaml

This file was deleted.

File renamed without changes.
41 changes: 20 additions & 21 deletions ingest/config/config.yaml → ingest/defaults/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,27 @@ ncbi_datasets_fields:
- submitter-names
- submitter-affiliation

# Params for the transform rule
transform:
# Params for the curate rule
curate:
# NCBI Fields to rename to Nextstrain field names.
# This is the first step in the pipeline, so any references to field names
# in the configs below should use the new field names
field_map: [
'accession=genbank_accession',
'accession-rev=genbank_accession_rev',
'isolate-lineage=strain',
'sourcedb=database', # necessary for applying geo location rules
'geo-region=region',
'geo-location=location',
'host-name=host',
'isolate-collection-date=date',
'release-date=release_date',
'update-date=update_date',
'virus-tax-id=virus_tax_id',
'virus-name=virus_name',
'sra-accs=sra_accessions',
'submitter-names=authors',
'submitter-affiliation=institution',
]
field_map:
accession: genbank_accession
accession-rev: genbank_accession_rev
isolate-lineage: strain
sourcedb: database
geo-region: region
geo-location: location
host-name: host
isolate-collection-date: date
release-date: release_date
update-date: update_date
virus-tax-id: virus_tax_id
virus-name: virus_name
sra-accs: sra_accessions
submitter-names: authors
submitter-affiliation: institution
# Standardized strain name regex
# Currently accepts any characters because we do not have a clear standard for strain names
strain_regex: '^.+$'
Expand Down Expand Up @@ -77,9 +76,9 @@ transform:
geolocation_rules_url: 'https://raw.githubusercontent.com/nextstrain/ncov-ingest/master/source-data/gisaid_geoLocationRules.tsv'
# Local geolocation rules that are only applicable to mpox data
# Local rules can overwrite the general geolocation rules provided above
local_geolocation_rules: 'source-data/geolocation-rules.tsv'
local_geolocation_rules: 'defaults/geolocation-rules.tsv'
# User annotations file
annotations: 'source-data/annotations.tsv'
annotations: 'defaults/annotations.tsv'
# ID field used to merge annotations
annotations_id: 'genbank_accession'
# Field to use as the sequence ID in the FASTA file
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
This part of the workflow handles transforming the data into standardized
This part of the workflow handles curating the data into standardized
formats and expects input file
sequences_ndjson = "data/sequences.ndjson"
Expand All @@ -9,15 +9,15 @@ This will produce output files as
metadata = "results/metadata_all.tsv"
sequences = "results/sequences_all.fasta"
Parameters are expected to be defined in `config.transform`.
Parameters are expected to be defined in `config.curate`.
"""


rule fetch_general_geolocation_rules:
output:
general_geolocation_rules="data/general-geolocation-rules.tsv",
params:
geolocation_rules_url=config["transform"]["geolocation_rules_url"],
geolocation_rules_url=config["curate"]["geolocation_rules_url"],
shell:
"""
curl {params.geolocation_rules_url} > {output.general_geolocation_rules}
Expand All @@ -27,7 +27,7 @@ rule fetch_general_geolocation_rules:
rule concat_geolocation_rules:
input:
general_geolocation_rules="data/general-geolocation-rules.tsv",
local_geolocation_rules=config["transform"]["local_geolocation_rules"],
local_geolocation_rules=config["curate"]["local_geolocation_rules"],
output:
all_geolocation_rules="data/all-geolocation-rules.tsv",
shell:
Expand All @@ -36,32 +36,39 @@ rule concat_geolocation_rules:
"""


rule transform:
def format_field_map(field_map: dict[str, str]) -> str:
"""
Format dict to `"key1"="value1" "key2"="value2"...` for use in shell commands.
"""
return " ".join([f'"{key}"="{value}"' for key, value in field_map.items()])


rule curate:
input:
sequences_ndjson="data/sequences.ndjson",
all_geolocation_rules="data/all-geolocation-rules.tsv",
annotations=config["transform"]["annotations"],
annotations=config["curate"]["annotations"],
output:
metadata="data/metadata_all.tsv",
sequences="results/sequences_all.fasta",
log:
"logs/transform.txt",
"logs/curate.txt",
params:
field_map=config["transform"]["field_map"],
strain_regex=config["transform"]["strain_regex"],
strain_backup_fields=config["transform"]["strain_backup_fields"],
date_fields=config["transform"]["date_fields"],
expected_date_formats=config["transform"]["expected_date_formats"],
articles=config["transform"]["titlecase"]["articles"],
abbreviations=config["transform"]["titlecase"]["abbreviations"],
titlecase_fields=config["transform"]["titlecase"]["fields"],
authors_field=config["transform"]["authors_field"],
authors_default_value=config["transform"]["authors_default_value"],
abbr_authors_field=config["transform"]["abbr_authors_field"],
annotations_id=config["transform"]["annotations_id"],
metadata_columns=config["transform"]["metadata_columns"],
id_field=config["transform"]["id_field"],
sequence_field=config["transform"]["sequence_field"],
field_map=format_field_map(config["curate"]["field_map"]),
strain_regex=config["curate"]["strain_regex"],
strain_backup_fields=config["curate"]["strain_backup_fields"],
date_fields=config["curate"]["date_fields"],
expected_date_formats=config["curate"]["expected_date_formats"],
articles=config["curate"]["titlecase"]["articles"],
abbreviations=config["curate"]["titlecase"]["abbreviations"],
titlecase_fields=config["curate"]["titlecase"]["fields"],
authors_field=config["curate"]["authors_field"],
authors_default_value=config["curate"]["authors_default_value"],
abbr_authors_field=config["curate"]["abbr_authors_field"],
annotations_id=config["curate"]["annotations_id"],
metadata_columns=config["curate"]["metadata_columns"],
id_field=config["curate"]["id_field"],
sequence_field=config["curate"]["sequence_field"],
shell:
"""
(cat {input.sequences_ndjson} \
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ rule concat_nextclade_subtype_results:
output:
nextclade_subtypes="results/nextclade_subtypes.tsv",
params:
id_field=config["transform"]["id_field"],
id_field=config["curate"]["id_field"],
nextclade_field=config["nextclade"]["nextclade_field"],
shell:
"""
Expand All @@ -75,7 +75,7 @@ rule append_nextclade_columns:
output:
metadata_all="results/metadata_all.tsv",
params:
id_field=config["transform"]["id_field"],
id_field=config["curate"]["id_field"],
nextclade_field=config["nextclade"]["nextclade_field"],
shell:
"""
Expand Down
Loading

0 comments on commit abb8984

Please sign in to comment.