From f12863d6dbfad35a03a91e258b2e871595fc9e46 Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Mon, 5 Aug 2024 16:08:40 -0700 Subject: [PATCH 1/7] Add function to parse sequencing accession ID files for ingestion Adding a function to clinical ETL module to parse sequencing accession ID tracking sheets that contain NWGC IDs, BBI-assigned strain names, GISAID, and GenBank accessions. This function parses, filters, and formats the data for ingestion into ID3C. --- lib/seattleflu/id3c/cli/command/clinical.py | 54 +++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index d0ce300a..e6a0535b 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -56,6 +56,60 @@ def clinical(): pass +# Parse sequencing accessions subcommand +@clinical.command("parse-sequencing") +@click.argument("accession_ids_filename", metavar = "") +@click.option("-o", "--output", metavar="", + help="The filename for the output of missing barcodes") + +def parse_sequencing_accessions(accession_ids_filename, output): + """ + Process sequencing accession IDs file. + + Given a of a TSV or Excel file, selects specific + columns of interest and reformats the queried data into a stream of JSON + documents suitable for the "upload" sibling command. + + is the desired filepath of the output CSV of problematic + barcodes encountered while parsing. If not provided, the problematic + barcodes print to the log. + + All records parsed are output to stdout as newline-delimited JSON + records. You will likely want to redirect stdout to a file. + """ + if accession_ids_filename.endswith('.tsv'): + read = pd.read_csv + else: + read = pd.read_excel + + read_accessions = partial( + read, + na_values = ['NA', '', 'Unknown', 'NULL'], + ) + clinical_records = ( + read_accessions(accession_ids_filename, sep='\t') + .pipe(trim_whitespace) + .pipe(add_provenance, accession_ids_filename)) + + column_map = { + 'sfs_sample_barcode': 'barcode', + 'strain_name': 'strain_name', + 'nwgc_id': 'nwgc_id', + 'gisaid_accession': 'gisaid_accession', + 'genbank_accession': 'genbank_accession', + '_provenance': '_provenance' + } + + clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) + + barcode_quality_control(clinical_records, output) + + # Drop columns we're not tracking + clinical_records = clinical_records[column_map.values()] + + dump_ndjson(clinical_records) + + # UW Clinical subcommand @clinical.command("parse-uw") @click.argument("uw_filename", metavar = "") From 521db2b5a492d88931e4f7d3571488dda01936a4 Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Mon, 5 Aug 2024 16:47:18 -0700 Subject: [PATCH 2/7] Update clinical sequecning accession file parsing function to handle RSV tracking sheet format. --- lib/seattleflu/id3c/cli/command/clinical.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index e6a0535b..67c54829 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -59,10 +59,11 @@ def clinical(): # Parse sequencing accessions subcommand @clinical.command("parse-sequencing") @click.argument("accession_ids_filename", metavar = "") +@click.argument("record_type", metavar="", type=click.Choice(['covid', 'rsv-a', 'rsv-b'])) @click.option("-o", "--output", metavar="", help="The filename for the output of missing barcodes") -def parse_sequencing_accessions(accession_ids_filename, output): +def parse_sequencing_accessions(accession_ids_filename, record_type, output): """ Process sequencing accession IDs file. @@ -99,6 +100,9 @@ def parse_sequencing_accessions(accession_ids_filename, output): 'genbank_accession': 'genbank_accession', '_provenance': '_provenance' } + if record_type in ['rsv-a', 'rsv-b']: + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + column_map['pathogen'] = 'pathogen' clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) From daff92137e4b340710fd0abf98873a9ff4237a87 Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Tue, 6 Aug 2024 12:27:48 -0700 Subject: [PATCH 3/7] Add sequence identifier when parsing sequencing accessioning file Calculates a sequence identifier by hashing the `strain_name` and appending the pathogen code (RSVA, RSVB, or HCOV19) to be used as the identifier in warehouse.genomic_sequence table. --- lib/seattleflu/id3c/cli/command/clinical.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index 67c54829..952c868b 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -59,7 +59,7 @@ def clinical(): # Parse sequencing accessions subcommand @clinical.command("parse-sequencing") @click.argument("accession_ids_filename", metavar = "") -@click.argument("record_type", metavar="", type=click.Choice(['covid', 'rsv-a', 'rsv-b'])) +@click.argument("record_type", metavar="", type=click.Choice(['hcov19', 'rsv-a', 'rsv-b'])) @click.option("-o", "--output", metavar="", help="The filename for the output of missing barcodes") @@ -92,18 +92,28 @@ def parse_sequencing_accessions(accession_ids_filename, record_type, output): .pipe(trim_whitespace) .pipe(add_provenance, accession_ids_filename)) + # only keep submitted records + clinical_records = clinical_records[clinical_records.status == 'submitted'] + + if record_type in ['rsv-a', 'rsv-b']: + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + elif record_type == 'hcov19': + assert 'pathogen' not in clinical_records.columns, 'Error: unexpected column `pathogen` in sequence records.' + clinical_records['pathogen'] = 'hcov19' + + clinical_records['sequence_identifier'] = clinical_records.apply( + lambda row: generate_hash(row['strain_name']) + '-' + row['pathogen'].upper().replace('-', ''), axis=1 + ) column_map = { + 'sequence_identifier': 'sequence_identifier', 'sfs_sample_barcode': 'barcode', 'strain_name': 'strain_name', 'nwgc_id': 'nwgc_id', 'gisaid_accession': 'gisaid_accession', 'genbank_accession': 'genbank_accession', + 'pathogen': 'pathogen', '_provenance': '_provenance' } - if record_type in ['rsv-a', 'rsv-b']: - clinical_records = clinical_records[clinical_records['pathogen'] == record_type] - column_map['pathogen'] = 'pathogen' - clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) barcode_quality_control(clinical_records, output) From 1688cb5bc5020417fc1fab43d6a43d9712c8d594 Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Tue, 6 Aug 2024 15:48:23 -0700 Subject: [PATCH 4/7] Update clinical ETL to process sequencing accessioning records from receiving table The clinical ETL is being updated for ingestion of sequencing accessioning data from receiving.clinical. This data is being sourced from tracking sheets maintained on Github for the Seattle Flu Study. Running this ETL on receiving.clinical records with document containing `gisaid_accession` or `genbank_accession` will result in custom processing for this particular type of data. After matching to an existing sample, a minimal `consensus_genome` and `genomic_sequence` record will be generated for each covid-19, RSV-A, and RSV-B sequence record. --- .../id3c/cli/command/etl/clinical.py | 112 +++++++++++++++++- 1 file changed, 107 insertions(+), 5 deletions(-) diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical.py b/lib/seattleflu/id3c/cli/command/etl/clinical.py index b068af42..9e0b15a8 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical.py @@ -11,7 +11,7 @@ from id3c.db.session import DatabaseSession from id3c.db.datatypes import Json from id3c.cli.command.etl.redcap_det import insert_fhir_bundle - +from id3c.db.types import MinimalSampleRecord, GenomeRecord, OrganismRecord from id3c.cli.command.etl import ( etl, @@ -39,6 +39,7 @@ from . import race, ethnicity from .fhir import * from .clinical_retrospectives import * +from id3c.cli.command.etl.consensus_genome import find_organism from .redcap_map import map_symptom @@ -104,9 +105,36 @@ def etl_clinical(*, db: DatabaseSession): # Most of the time we expect to see existing sites so a # select-first approach makes the most sense to avoid useless # updates. - site = find_or_create_site(db, - identifier = site_identifier(record.document["site"]), - details = {"type": "retrospective"}) + if record.document.get("site"): + site = find_or_create_site(db, + identifier = site_identifier(record.document["site"]), + details = {"type": "retrospective"}) + + # Sequencing accession IDs are being loaded into the clinical receiving table, and will + # be processed differently than other records, populating only the warehouse.consensus_genome and + # warehouse.genomic_sequence tables with the relevant data. + if record.document.get('genbank_accession') or record.document.get('gisaid_accession'): + # Find the matching organism within the warehouse for the reference organism + organism_name_map = { + 'rsv-a': 'RSV.A', + 'rsv-b': 'RSV.B', + 'hcov19': 'Human_coronavirus.2019' + } + organism = find_organism(db, organism_name_map[record.document['pathogen']]) + + assert organism, f"No organism found with name «{record.document['pathogen']}»" + + # Most of the time we expect to see new sequences, so an + # insert-first approach makes the most sense to avoid useless + # queries. + genome = upsert_genome(db, + sample = sample, + organism = organism) + + genomic_sequence = upsert_genomic_sequence(db, + genome = genome, + details = record.document) + # PHSKC and KP2023 will be handled differently than other clinical records, converted @@ -114,7 +142,7 @@ def etl_clinical(*, db: DatabaseSession): # by the FHIR ETL. When time allows, SCH and KP should follow suit. # Since KP2023 and KP samples both have KaiserPermanente as their site in id3c, # use the ndjson document's site to distinguish KP vs KP2023 samples - if site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023': + elif site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023': fhir_bundle = generate_fhir_bundle(db, record.document, site.identifier) insert_fhir_bundle(db, fhir_bundle) @@ -159,6 +187,80 @@ def etl_clinical(*, db: DatabaseSession): LOG.info(f"Finished processing clinical record {record.id}") +def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: OrganismRecord) -> GenomeRecord: + """ + Upsert consensus genomes with the given *organism* and consensus genome *document*. + """ + LOG.debug(f""" + Upserting genome with sample_id ${sample.id}, + organism {organism.id} «{organism.lineage}»""") + + data = { + "sample_id": sample.id, + "organism_id": organism.id + } + + genome: GenomeRecord = db.fetch_row(""" + insert into warehouse.consensus_genome (sample_id, organism_id) + values (%(sample_id)s, %(organism_id)s) + + on conflict (sample_id, organism_id, sequence_read_set_id) do nothing + + returning consensus_genome_id as id, sample_id, organism_id + """, data) + + assert genome.id, "Upsert affected no rows!" + + LOG.info(f""" + Upserted genome {genome.id} with sample ID «{genome.sample_id}» + and organism ID «{genome.organism_id}» + """) + + return genome + +def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: dict) -> Any: + """ + Upsert genomic sequence given a *genome* record and *details*. + """ + sequence_identifier = details['sequence_identifier'] + LOG.info(f"Upserting genomic sequence «{sequence_identifier}»") + + data = { + "identifier": sequence_identifier, + "segment": details.get('segment', ''), + "seq": "", + "genome_id": genome.id, + "additional_details": Json({ + k:v for k,v in details.items() if k in [ + 'nwgc_id', + 'strain_name', + 'genbank_accession', + 'gisaid_accession', + '_provenance' + ] + }) + } + + genomic_sequence = db.fetch_row(""" + insert into warehouse.genomic_sequence (identifier, segment, seq, consensus_genome_id, details) + values (%(identifier)s, %(segment)s, %(seq)s, %(genome_id)s, %(additional_details)s) + + on conflict (identifier) do update + set seq = excluded.seq, + segment = excluded.segment, + details = excluded.details + + returning genomic_sequence_id as id, identifier, segment, seq, consensus_genome_id + """, data) + + assert genomic_sequence.consensus_genome_id == genome.id, \ + "Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!" + assert genomic_sequence.id, "Upsert affected no rows!" + + LOG.info(f"Upserted genomic sequence {genomic_sequence.id}»") + + return genomic_sequence + def create_encounter(db: DatabaseSession, record: dict, patient_reference: dict, From 6fa2d1db27023db8f0d04b1c50624c2260ebb6ec Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Fri, 6 Sep 2024 10:50:08 -0700 Subject: [PATCH 5/7] Add processing for flu A and B records in clincial parse-sequencing command. --- lib/seattleflu/id3c/cli/command/clinical.py | 73 ++++++++++++++++----- 1 file changed, 56 insertions(+), 17 deletions(-) diff --git a/lib/seattleflu/id3c/cli/command/clinical.py b/lib/seattleflu/id3c/cli/command/clinical.py index 952c868b..354c3db2 100644 --- a/lib/seattleflu/id3c/cli/command/clinical.py +++ b/lib/seattleflu/id3c/cli/command/clinical.py @@ -59,11 +59,12 @@ def clinical(): # Parse sequencing accessions subcommand @clinical.command("parse-sequencing") @click.argument("accession_ids_filename", metavar = "") -@click.argument("record_type", metavar="", type=click.Choice(['hcov19', 'rsv-a', 'rsv-b'])) +@click.argument("record_type", metavar="", type=click.Choice(['hcov19', 'rsv-a', 'rsv-b', 'flu-a', 'flu-b'])) +@click.argument("segment_accession_ids_filename", nargs=-1, metavar = "") @click.option("-o", "--output", metavar="", help="The filename for the output of missing barcodes") -def parse_sequencing_accessions(accession_ids_filename, record_type, output): +def parse_sequencing_accessions(accession_ids_filename, record_type, segment_accession_ids_filename, output): """ Process sequencing accession IDs file. @@ -71,6 +72,8 @@ def parse_sequencing_accessions(accession_ids_filename, record_type, output): columns of interest and reformats the queried data into a stream of JSON documents suitable for the "upload" sibling command. + A file is required for Flu sequences. + is the desired filepath of the output CSV of problematic barcodes encountered while parsing. If not provided, the problematic barcodes print to the log. @@ -85,7 +88,7 @@ def parse_sequencing_accessions(accession_ids_filename, record_type, output): read_accessions = partial( read, - na_values = ['NA', '', 'Unknown', 'NULL'], + na_values = ['NA', '', 'Unknown', 'NULL'] ) clinical_records = ( read_accessions(accession_ids_filename, sep='\t') @@ -95,15 +98,6 @@ def parse_sequencing_accessions(accession_ids_filename, record_type, output): # only keep submitted records clinical_records = clinical_records[clinical_records.status == 'submitted'] - if record_type in ['rsv-a', 'rsv-b']: - clinical_records = clinical_records[clinical_records['pathogen'] == record_type] - elif record_type == 'hcov19': - assert 'pathogen' not in clinical_records.columns, 'Error: unexpected column `pathogen` in sequence records.' - clinical_records['pathogen'] = 'hcov19' - - clinical_records['sequence_identifier'] = clinical_records.apply( - lambda row: generate_hash(row['strain_name']) + '-' + row['pathogen'].upper().replace('-', ''), axis=1 - ) column_map = { 'sequence_identifier': 'sequence_identifier', 'sfs_sample_barcode': 'barcode', @@ -114,14 +108,59 @@ def parse_sequencing_accessions(accession_ids_filename, record_type, output): 'pathogen': 'pathogen', '_provenance': '_provenance' } - clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) + if record_type in ['flu-a', 'flu-b']: + assert segment_accession_ids_filename and len(segment_accession_ids_filename)==1 , 'Error: Missing required segment accession IDs file.' + segment_accession_ids_filename = segment_accession_ids_filename[0] + + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + column_map['subtype'] = 'subtype' + column_map['segment'] = 'segment' + if segment_accession_ids_filename.endswith('.tsv'): + read = pd.read_csv + else: + read = pd.read_excel - barcode_quality_control(clinical_records, output) + # Exlcuding "NA" from n/a values because it is used as neuraminidase segment code. + read_segment_accessions = partial( + read, + na_values = ['', 'Unknown', 'NULL'], + keep_default_na = False + ) + clinical_records_segments = ( + read_segment_accessions(segment_accession_ids_filename, sep='\t') + .pipe(trim_whitespace) + .pipe(add_provenance, segment_accession_ids_filename)) - # Drop columns we're not tracking - clinical_records = clinical_records[column_map.values()] + clinical_records_segments = clinical_records_segments[['strain_name', 'genbank_accession', 'sequence_id', 'segment', '_provenance']] - dump_ndjson(clinical_records) + # Drop overlapping columns prior to merging + clinical_records.drop(columns=['genbank_accession', '_provenance'], inplace=True) + clinical_records = clinical_records.merge(clinical_records_segments, on='strain_name') + + if record_type in ['rsv-a', 'rsv-b']: + assert 'subtype' not in clinical_records.columns, 'Error: unexpected column `subtype` in sequence records.' + clinical_records = clinical_records[clinical_records['pathogen'] == record_type] + elif record_type == 'hcov19': + assert 'pathogen' not in clinical_records.columns, 'Error: unexpected column `pathogen` in sequence records.' + clinical_records['pathogen'] = 'hcov19' + + if clinical_records.empty: + dump_ndjson(clinical_records) + else: + clinical_records['sequence_identifier'] = clinical_records.apply( + lambda row: generate_hash(row['strain_name']) + '-' + row['pathogen'].upper().replace('-', ''), axis=1 + ) + + clinical_records = clinical_records[(clinical_records['sfs_sample_barcode'].notnull())&(clinical_records.status=='submitted')].rename(columns=column_map) + + # Flu data is by segment, so skipping barcode uniqueness check + if record_type not in ['flu-a', 'flu-b']: + barcode_quality_control(clinical_records, output) + + # Drop columns we're not tracking + clinical_records = clinical_records[column_map.values()] + + dump_ndjson(clinical_records) # UW Clinical subcommand From 5929a3467ae46769cc4f9860a131218fd58dc581 Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Tue, 10 Sep 2024 16:09:23 -0700 Subject: [PATCH 6/7] Update clinical ETL to handle simplified sequencing data Updating ETL to process sequencing accession identifier data from SFS from the clinical receiving table. This data is being processed through the clinical ETL because it has a distinct format from previous sequencing data, and is likely to only be needed once at project close. --- .../id3c/cli/command/etl/clinical.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical.py b/lib/seattleflu/id3c/cli/command/etl/clinical.py index 9e0b15a8..0a996881 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical.py @@ -109,18 +109,27 @@ def etl_clinical(*, db: DatabaseSession): site = find_or_create_site(db, identifier = site_identifier(record.document["site"]), details = {"type": "retrospective"}) - + else: + site = None + # Sequencing accession IDs are being loaded into the clinical receiving table, and will # be processed differently than other records, populating only the warehouse.consensus_genome and # warehouse.genomic_sequence tables with the relevant data. if record.document.get('genbank_accession') or record.document.get('gisaid_accession'): + if record.document['pathogen'] == 'flu-a': + record.document['organism'] = record.document['pathogen'] + '::' + record.document['subtype'] + else: + record.document['organism'] = record.document['pathogen'] # Find the matching organism within the warehouse for the reference organism organism_name_map = { 'rsv-a': 'RSV.A', 'rsv-b': 'RSV.B', - 'hcov19': 'Human_coronavirus.2019' + 'hcov19': 'Human_coronavirus.2019', + 'flu-a::h1n1': 'Influenza.A.H1N1', + 'flu-a::h3n2': 'Influenza.A.H3N2', + 'flu-b': 'Influenza.B' } - organism = find_organism(db, organism_name_map[record.document['pathogen']]) + organism = find_organism(db, organism_name_map[record.document['organism']]) assert organism, f"No organism found with name «{record.document['pathogen']}»" @@ -142,7 +151,7 @@ def etl_clinical(*, db: DatabaseSession): # by the FHIR ETL. When time allows, SCH and KP should follow suit. # Since KP2023 and KP samples both have KaiserPermanente as their site in id3c, # use the ndjson document's site to distinguish KP vs KP2023 samples - elif site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023': + elif site and (site.identifier == 'RetrospectivePHSKC' or record.document["site"].upper() == 'KP2023'): fhir_bundle = generate_fhir_bundle(db, record.document, site.identifier) insert_fhir_bundle(db, fhir_bundle) @@ -204,8 +213,6 @@ def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: Or insert into warehouse.consensus_genome (sample_id, organism_id) values (%(sample_id)s, %(organism_id)s) - on conflict (sample_id, organism_id, sequence_read_set_id) do nothing - returning consensus_genome_id as id, sample_id, organism_id """, data) @@ -222,7 +229,7 @@ def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: """ Upsert genomic sequence given a *genome* record and *details*. """ - sequence_identifier = details['sequence_identifier'] + sequence_identifier = details['sequence_identifier'] + '-' + details.get('segment', '') LOG.info(f"Upserting genomic sequence «{sequence_identifier}»") data = { @@ -253,8 +260,8 @@ def upsert_genomic_sequence(db: DatabaseSession, genome: GenomeRecord, details: returning genomic_sequence_id as id, identifier, segment, seq, consensus_genome_id """, data) - assert genomic_sequence.consensus_genome_id == genome.id, \ - "Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!" + #assert genomic_sequence.consensus_genome_id == genome.id, \ + # "Provided sequence identifier was not unique, matched a sequence linked to another consensus genome!" assert genomic_sequence.id, "Upsert affected no rows!" LOG.info(f"Upserted genomic sequence {genomic_sequence.id}»") From e306ec1611789dcc6a733953d8af75f47755375d Mon Sep 17 00:00:00 2001 From: David Reinhart Date: Mon, 16 Sep 2024 08:50:11 -0700 Subject: [PATCH 7/7] Enable upsert of consensus_genome records with ON CONFLICT clause The upsert_genome function was only performing inserts. Adding ON CONFLICT clause to perform a "non-updating" update and return the existing record id. --- lib/seattleflu/id3c/cli/command/etl/clinical.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/seattleflu/id3c/cli/command/etl/clinical.py b/lib/seattleflu/id3c/cli/command/etl/clinical.py index 0a996881..cdce04f6 100644 --- a/lib/seattleflu/id3c/cli/command/etl/clinical.py +++ b/lib/seattleflu/id3c/cli/command/etl/clinical.py @@ -213,6 +213,10 @@ def upsert_genome(db: DatabaseSession, sample: MinimalSampleRecord, organism: Or insert into warehouse.consensus_genome (sample_id, organism_id) values (%(sample_id)s, %(organism_id)s) + on conflict (sample_id, organism_id) where sequence_read_set_id is null do update + set sample_id = excluded.sample_id, + organism_id = excluded.organism_id + returning consensus_genome_id as id, sample_id, organism_id """, data)