Skip to content

Commit

Permalink
Merge pull request #387 from seattleflu/ingest-seq-accession-ids
Browse files Browse the repository at this point in the history
Ingest sequencing accession IDs
  • Loading branch information
davereinhart authored Oct 7, 2024
2 parents e61ee77 + e306ec1 commit 932e855
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 5 deletions.
107 changes: 107 additions & 0 deletions lib/seattleflu/id3c/cli/command/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,113 @@
def clinical():
pass

# Parse sequencing accessions subcommand
@clinical.command("parse-sequencing")
@click.argument("accession_ids_filename", metavar = "<Sequencing accession IDs filename>")
@click.argument("record_type", metavar="<record type>", type=click.Choice(['hcov19', 'rsv-a', 'rsv-b', 'flu-a', 'flu-b']))
@click.argument("segment_accession_ids_filename", nargs=-1, metavar = "<Segment accession IDs filename>")
@click.option("-o", "--output", metavar="<output filename>",
help="The filename for the output of missing barcodes")

def parse_sequencing_accessions(accession_ids_filename, record_type, segment_accession_ids_filename, output):
"""
Process sequencing accession IDs file.
Given a <Sequencing accession IDs filename> of a TSV or Excel file, selects specific
columns of interest and reformats the queried data into a stream of JSON
documents suitable for the "upload" sibling command.
A <Segment accession IDs filename> file is required for Flu sequences.
<output filename> is the desired filepath of the output CSV of problematic
barcodes encountered while parsing. If not provided, the problematic
barcodes print to the log.
All records parsed are output to stdout as newline-delimited JSON
records. You will likely want to redirect stdout to a file.
"""
if accession_ids_filename.endswith('.tsv'):
read = pd.read_csv
else:
read = pd.read_excel

read_accessions = partial(
read,
na_values = ['NA', '', 'Unknown', 'NULL']
)
clinical_records = (
read_accessions(accession_ids_filename, sep='\t')
.pipe(trim_whitespace)
.pipe(add_provenance, accession_ids_filename))

# only keep submitted records
clinical_records = clinical_records[clinical_records.status == 'submitted']

column_map = {
'sequence_identifier': 'sequence_identifier',
'sfs_sample_barcode': 'barcode',
'strain_name': 'strain_name',
'nwgc_id': 'nwgc_id',
'gisaid_accession': 'gisaid_accession',
'genbank_accession': 'genbank_accession',
'pathogen': 'pathogen',
'_provenance': '_provenance'
}
if record_type in ['flu-a', 'flu-b']:
assert segment_accession_ids_filename and len(segment_accession_ids_filename)==1 , 'Error: Missing required segment accession IDs file.'
segment_accession_ids_filename = segment_accession_ids_filename[0]

clinical_records = clinical_records[clinical_records['pathogen'] == record_type]
column_map['subtype'] = 'subtype'
column_map['segment'] = 'segment'
if segment_accession_ids_filename.endswith('.tsv'):
read = pd.read_csv
else:
read = pd.read_excel

# Exlcuding "NA" from n/a values because it is used as neuraminidase segment code.
read_segment_accessions = partial(
read,
na_values = ['', 'Unknown', 'NULL'],
keep_default_na = False
)
clinical_records_segments = (
read_segment_accessions(segment_accession_ids_filename, sep='\t')
.pipe(trim_whitespace)
.pipe(add_provenance, segment_accession_ids_filename))

clinical_records_segments = clinical_records_segments[['strain_name', 'genbank_accession', 'sequence_id', 'segment', '_provenance']]

# Drop overlapping columns prior to merging
clinical_records.drop(columns=['genbank_accession', '_provenance'], inplace=True)
clinical_records = clinical_records.merge(clinical_records_segments, on='strain_name')

if record_type in ['rsv-a', 'rsv-b']:
assert 'subtype' not in clinical_records.columns, 'Error: unexpected column `subtype` in sequence records.'
clinical_records = clinical_records[clinical_records['pathogen'] == record_type]
elif record_type == 'hcov19':
assert 'pathogen' not in clinical_records.columns, 'Error: unexpected column `pathogen` in sequence records.'
clinical_records['pathogen'] = 'hcov19'

if clinical_records.empty:
dump_ndjson(clinical_records)
else:
clinical_records['sequence_identifier'] = clinical_records.apply(
lambda row: generate_hash(row['strain_name']) + '-' + row['pathogen'].upper().replace('-', ''), axis=1
)

clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map)

# Flu data is by segment, so skipping barcode uniqueness check
if record_type not in ['flu-a', 'flu-b']:
barcode_quality_control(clinical_records, output)

# Drop columns we're not tracking
clinical_records = clinical_records[column_map.values()]

dump_ndjson(clinical_records)


# UW Clinical subcommand
@clinical.command("parse-uw")
@click.argument("uw_filename", metavar = "<UW Clinical Data filename>")
Expand Down
123 changes: 118 additions & 5 deletions lib/seattleflu/id3c/cli/command/etl/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from id3c.db.session import DatabaseSession
from id3c.db.datatypes import Json
from id3c.cli.command.etl.redcap_det import insert_fhir_bundle

from id3c.db.types import MinimalSampleRecord, GenomeRecord, OrganismRecord
from id3c.cli.command.etl import (
etl,

Expand Down Expand Up @@ -39,6 +39,7 @@
from . import race, ethnicity
from .fhir import *
from .clinical_retrospectives import *
from id3c.cli.command.etl.consensus_genome import find_organism
from .redcap_map import map_symptom


Expand Down Expand Up @@ -104,17 +105,53 @@ def etl_clinical(*, db: DatabaseSession):
# Most of the time we expect to see existing sites so a
# select-first approach makes the most sense to avoid useless
# updates.
site = find_or_create_site(db,
identifier = site_identifier(record.document["site"]),
details = {"type": "retrospective"})
if record.document.get("site"):
site = find_or_create_site(db,
identifier = site_identifier(record.document["site"]),
details = {"type": "retrospective"})
else:
site = None

# Sequencing accession IDs are being loaded into the clinical receiving table, and will
# be processed differently than other records, populating only the warehouse.consensus_genome and
# warehouse.genomic_sequence tables with the relevant data.
if record.document.get('genbank_accession') or record.document.get('gisaid_accession'):
if record.document['pathogen'] == 'flu-a':
record.document['organism'] = record.document['pathogen'] + '::' + record.document['subtype']
else:
record.document['organism'] = record.document['pathogen']
# Find the matching organism within the warehouse for the reference organism
organism_name_map = {
'rsv-a': 'RSV.A',
'rsv-b': 'RSV.B',
'hcov19': 'Human_coronavirus.2019',
'flu-a::h1n1': 'Influenza.A.H1N1',
'flu-a::h3n2': 'Influenza.A.H3N2',
'flu-b': 'Influenza.B'
}
organism = find_organism(db, organism_name_map[record.document['organism']])

assert organism, f"No organism found with name «{record.document['pathogen']}»"

# Most of the time we expect to see new sequences, so an
# insert-first approach makes the most sense to avoid useless
# queries.
genome = upsert_genome(db,
sample = sample,
organism = organism)

genomic_sequence = upsert_genomic_sequence(db,
genome = genome,
details = record.document)



# PHSKC and KP2023 will be handled differently than other clinical records, converted
# to FHIR format and inserted into receiving.fhir table to be processed
# by the FHIR ETL. When time allows, SCH and KP should follow suit.
# Since KP2023 and KP samples both have KaiserPermanente as their site in id3c,
# use the ndjson document's site to distinguish KP vs KP2023 samples
if site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023':
elif site and (site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023'):
fhir_bundle = generate_fhir_bundle(db, record.document, site.identifier)
insert_fhir_bundle(db, fhir_bundle)

Expand Down Expand Up @@ -159,6 +196,82 @@ def etl_clinical(*, db: DatabaseSession):
LOG.info(f"Finished processing clinical record {record.id}")


def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: OrganismRecord) -> GenomeRecord:
"""
Upsert consensus genomes with the given *organism* and consensus genome *document*.
"""
LOG.debug(f"""
Upserting genome with sample_id ${sample.id},
organism {organism.id} «{organism.lineage}»""")

data = {
"sample_id": sample.id,
"organism_id": organism.id
}

genome: GenomeRecord = db.fetch_row("""
insert into warehouse.consensus_genome (sample_id, organism_id)
values (%(sample_id)s, %(organism_id)s)
on conflict (sample_id, organism_id) where sequence_read_set_id is null do update
set sample_id = excluded.sample_id,
organism_id = excluded.organism_id
returning consensus_genome_id as id, sample_id, organism_id
""", data)

assert genome.id, "Upsert affected no rows!"

LOG.info(f"""
Upserted genome {genome.id} with sample ID «{genome.sample_id}»
and organism ID «{genome.organism_id}»
""")

return genome

def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: dict) -> Any:
"""
Upsert genomic sequence given a *genome* record and *details*.
"""
sequence_identifier = details['sequence_identifier'] + '-' + details.get('segment', '')
LOG.info(f"Upserting genomic sequence «{sequence_identifier}»")

data = {
"identifier": sequence_identifier,
"segment": details.get('segment', ''),
"seq": "",
"genome_id": genome.id,
"additional_details": Json({
k:v for k,v in details.items() if k in [
'nwgc_id',
'strain_name',
'genbank_accession',
'gisaid_accession',
'_provenance'
]
})
}

genomic_sequence = db.fetch_row("""
insert into warehouse.genomic_sequence (identifier, segment, seq, consensus_genome_id, details)
values (%(identifier)s, %(segment)s, %(seq)s, %(genome_id)s, %(additional_details)s)
on conflict (identifier) do update
set seq = excluded.seq,
segment = excluded.segment,
details = excluded.details
returning genomic_sequence_id as id, identifier, segment, seq, consensus_genome_id
""", data)

#assert genomic_sequence.consensus_genome_id == genome.id, \
# "Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!"
assert genomic_sequence.id, "Upsert affected no rows!"

LOG.info(f"Upserted genomic sequence {genomic_sequence.id}»")

return genomic_sequence

def create_encounter(db: DatabaseSession,
record: dict,
patient_reference: dict,
Expand Down

0 comments on commit 932e855

Please sign in to comment.