From 35e2b64849372e85eefbeb3717f1a734afaba9e9 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Wed, 4 Dec 2024 13:23:21 +0100 Subject: [PATCH 1/9] Create dds project --- taca/delivery/cli.py | 74 ++++++++- taca/delivery/deliver.py | 31 +++- taca/delivery/delivery_classes.py | 244 ++++++++++++++++++++++++++++++ 3 files changed, 339 insertions(+), 10 deletions(-) diff --git a/taca/delivery/cli.py b/taca/delivery/cli.py index 48f9e448..c7de218a 100644 --- a/taca/delivery/cli.py +++ b/taca/delivery/cli.py @@ -1,8 +1,13 @@ """CLI for the delivery subcommand.""" # furure todo: rename to "deliver" when fully implemented and taca-ngi-pipieline is deprecated +import logging + import click from taca.delivery import deliver +from taca.utils.config import load_yaml_config + +logger = logging.getLogger(__name__) @click.group() @@ -11,7 +16,7 @@ def delivery(): pass -# Stage data +### Stage data ### @delivery.command() @click.option( "-f", @@ -33,22 +38,79 @@ def stage(project, flowcells, samples): deliver.stage(project, flowcells=flowcells_to_stage, samples=samples_to_stage) -# Upload data +### Upload data ### @delivery.command() @click.option( - "-s", "--stage_dir", type=str, required=True, help="Staged directory to upload", ) +@click.option( + "--order_portal", + default=None, + envvar="ORDER_PORTAL", + required=True, + type=click.File("r"), + help="Path to order portal credantials to retrive project information", +) +@click.option( + "--statusdb_config", + default=None, + envvar="STATUS_DB_CONFIG", + required=True, + type=click.File("r"), + help="Path to statusdb config file", +) +@click.option( + "--pi_email", + default=None, + type=str, + help="PI email, to override PI email stored in order portal", +) +@click.option( + "--add_user", + multiple=True, + type=click.STRING, + help="Add additional user to DDS project. Multiple users can be given by calling this option multiple times", +) +@click.option( + "--project_description", + default=None, + type=click.STRING, + help="Override project description in order portal, e.g. if project not in order portal", +) +@click.option( + "--ignore_orderportal_members", + is_flag=True, + default=False, + help="Do not fetch member information from the order portal", +) @click.argument("project") -def upload(project, stage_dir): +def upload( + project, + stage_dir, + statusdb_config=None, + order_portal=None, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, +): """Upload a staged project to DDS.""" - deliver.upload_to_dds(project, stage_dir) + load_yaml_config(statusdb_config.name) + load_yaml_config(order_portal.name) + deliver.upload_to_dds( + project, + stage_dir, + pi_email=pi_email, + add_user=list(set(add_user)), + project_description=project_description, + ignore_orderportal_members=ignore_orderportal_members, + ) -# Release data +### Release data ### @delivery.command() @click.option( "-d", diff --git a/taca/delivery/deliver.py b/taca/delivery/deliver.py index ec6d74be..37d5959a 100644 --- a/taca/delivery/deliver.py +++ b/taca/delivery/deliver.py @@ -4,7 +4,7 @@ import os import sys -from taca.delivery.delivery_classes import get_staging_object +from taca.delivery.delivery_classes import get_staging_object, get_upload_object from taca.utils.config import CONFIG logger = logging.getLogger(__name__) @@ -42,11 +42,34 @@ def stage(project, flowcells, samples): # future todo: update statusdb with status "staged" (project, FC or sample level? Maybe new delivery DB?) -def upload_to_dds(project, dds_id): - "Upload staged data to DDS" +def upload_to_dds( + project, + stage_dir, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, +): + """Upload staged data to DDS""" + upload_object = get_upload_object( + project, + stage_dir, + pi_email, + add_user, + project_description, + ignore_orderportal_members, + ) + upload_object.create_dds_project() + upload_object.upload_data() + # Get information about project from statusdb + # Create a DDS project + # Upload all data in stage_dir to DDS project + # Future todo: Update statusdb with status "uploaded" and DDS project ID pass def release_dds_project(project, dds_id): - "Release DDS project to user" + """Release DDS project to user""" + # Release DDS project + # Update statusdb with status "delivered" pass diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index 3b8ba771..91e99c04 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -1,12 +1,18 @@ """Delivery classes for TACA.""" import glob +import json import logging import os +import re +import subprocess from datetime import datetime +import requests + from taca.utils.config import CONFIG from taca.utils.filesystem import do_symlink +from taca.utils.statusdb import StatusdbSession logger = logging.getLogger(__name__) @@ -21,6 +27,26 @@ def get_staging_object(project, project_dir, flowcells, samples): return StageAnalysis(project, project_dir, flowcells, samples) +def get_upload_object( + project, + stage_dir, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, +): + """Instantiate upload object.""" + # Future todo: determine data type and instantiate appropriate object + return UploadNanopore( + project, + stage_dir, + pi_email, + add_user, + project_description, + ignore_orderportal_members, + ) + + class Stage: """Defines a generic staging object.""" @@ -111,3 +137,221 @@ def stage_data(self): # Generate md5sums # Symlink all data and reports to DELIVERY/PID/DDSID pass + + +class Upload: + """Defines a generic upload object.""" + + def __init__( + self, + project, + stage_dir, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, + ): + self.project_id = project + self.stage_dir = stage_dir + self.config_statusdb = CONFIG.get("statusdb", None) + self.orderportal = CONFIG.get("order_portal", None) + self.status_db_connection = StatusdbSession(self.config_statusdb) + self.order_details = self.get_order_details() + self.pi_email = self.get_pi_email(pi_email) + self.other_member_details = self.get_other_member_details( + add_user, ignore_orderportal_members + ) + self.project_description = self.get_project_description(project_description) + + def get_order_details(self): + """Fetch order details from order portal""" + projects_db = self.status_db_connection.connection["projects"] + view = projects_db.view("order_portal/ProjectID_to_PortalID") + rows = view[self.project_id].rows + if len(rows) < 1: + raise AssertionError(f"Project {self.project_id} not found in StatusDB") + if len(rows) > 1: + raise AssertionError( + f"Project {self.project_id} has more than one entry in StatusDB orderportal_db" + ) + portal_id = rows[0].value + # Get project info from order portal API + get_project_url = "{}/v1/order/{}".format( + self.orderportal.get("orderportal_api_url"), portal_id + ) + headers = { + "X-OrderPortal-API-key": self.orderportal.get("orderportal_api_token") + } + response = requests.get(get_project_url, headers=headers) + if response.status_code != 200: + raise AssertionError( + "Status code returned when trying to get " + "project info from the order portal: " + f"{portal_id} was not 200. Response was: {response.content}" + ) + return json.loads(response.content) + + def get_pi_email(self, given_pi_email): + """Determine the PI email.""" + if given_pi_email: + logger.warning( + f"PI email for project {self.project_id} specified by user: {given_pi_email}" + ) + return given_pi_email + else: + found_pi_email = self.order_details["fields"]["project_pi_email"] + logger.info( + f"PI email for project {self.project_id} found: {found_pi_email}" + ) + return found_pi_email + + def get_other_member_details( + self, other_member_emails=[], ignore_orderportal_members=False + ): + """Set other contact details if available. This is not mandatory so + the method will not raise error if it could not find any contact + """ + other_member_details = [] + if not ignore_orderportal_members: + logger.info("Fetching additional members from order portal.") + try: + owner_email = self.order_details.get("owner", {}).get("email") + if ( + owner_email + and owner_email != self.pi_email + and owner_email not in other_member_details + ): + other_member_details.append(owner_email) + bioinfo_email = self.order_details.get("fields", {}).get( + "project_bx_email" + ) + if ( + bioinfo_email + and bioinfo_email != self.pi_email + and bioinfo_email not in other_member_details + ): + other_member_details.append(bioinfo_email) + lab_email = self.order_details.get("fields", {}).get( + "project_lab_email" + ) + if ( + lab_email + and lab_email != self.pi_email + and lab_email not in other_member_details + ): + other_member_details.append(lab_email) + except (AssertionError, ValueError): + pass # nothing to worry, just move on + if other_member_emails: + logger.info( + "Other appropriate contacts were found, they will be added " + f"to DDS delivery project: {', '.join(other_member_emails)}" + ) + for email in other_member_emails: + if email not in other_member_details: + other_member_details.append(email) + return other_member_details + + def get_project_description(self, given_desc=None): + """Set project description, either given or from order portal""" + if given_desc: + logger.warning( + f"Project description for project {self.project_id} specified by user: {given_desc}" + ) + return given_desc + else: + project_name = self.order_details.get("fields", {}).get("project_ngi_name") + created_desc = f"{project_name} ({datetime.now().strftime('%Y-%m-%d')})" + logger.info( + f"Project description for project {self.project_id}: {created_desc}" + ) + return created_desc + + def create_dds_project(self): + """Create a DDS delivery project and return the ID.""" + dds_command = [ + "dds", + "--no-prompt", + "project", + "create", + "--title", + self.project_id, + "--description", + self.project_description, + "--principal-investigator", + self.pi_email, + "--owner", + self.pi_email, + ] + if self.other_member_details: + for member in self.other_member_details: + dds_command.append(f"--researcher {member}") + dds_project_id = "" + try: + output = "" + for line in self._execute(dds_command): + output += line + print(line, end="") + except subprocess.CalledProcessError as e: + logger.exception( + "An error occurred while setting up the DDS delivery project." + ) + raise e + project_pattern = re.compile("ngisthlm\d{5}") + found_project = re.search(project_pattern, output) + if found_project: + dds_project_id = found_project.group() + return dds_project_id + else: + raise AssertionError(f"DDS project NOT set up for {self.project_id}") + + def _execute(self, cmd): + """Helper function to both capture and print subprocess output. + Adapted from https://stackoverflow.com/a/4417735 + """ + popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True) + yield from iter(popen.stdout.readline, "") + popen.stdout.close() + return_code = popen.wait() + if return_code: + raise subprocess.CalledProcessError(return_code, cmd) + + def upload_data(self): + """Upload staged data to DDS.""" + pass + + +class UploadNanopore(Upload): + """Defines an object for uploading Nanopore data.""" + + def __init__( + self, + project, + stage_dir, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, + ): + super().__init__( + project, + stage_dir, + pi_email=None, + add_user=None, + project_description=None, + ignore_orderportal_members=False, + ) + + +class UploadIllumina(Upload): + """Defines an object for uploading Illumina data.""" + + def __init__(self, project, stage_dir): + super().__init__(project, stage_dir) + + +class UploadElement(Upload): + """Defines an object for uploading Element data.""" + + def __init__(self, project, stage_dir): + super().__init__(project, stage_dir) From d58d1c45dccb68c64cb7c072670131d24b49978f Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Wed, 4 Dec 2024 14:24:51 +0100 Subject: [PATCH 2/9] Upload data to DDS --- taca/delivery/deliver.py | 18 +++++++++------ taca/delivery/delivery_classes.py | 38 +++++++++++++++++++++++++++---- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/taca/delivery/deliver.py b/taca/delivery/deliver.py index 37d5959a..5cecd840 100644 --- a/taca/delivery/deliver.py +++ b/taca/delivery/deliver.py @@ -59,13 +59,17 @@ def upload_to_dds( project_description, ignore_orderportal_members, ) - upload_object.create_dds_project() - upload_object.upload_data() - # Get information about project from statusdb - # Create a DDS project - # Upload all data in stage_dir to DDS project - # Future todo: Update statusdb with status "uploaded" and DDS project ID - pass + dds_project_id = upload_object.create_dds_project() + delivery_status = upload_object.upload_data(dds_project_id) + if delivery_status: + logger.info( + f"Successfully uploaded {stage_dir} to DDS project {dds_project_id}" + ) + # Future todo: Update statusdb with status "uploaded" and DDS project ID + else: + logger.error( + f"Something went wrong when uploading data to {dds_project_id} for project {project}." + ) def release_dds_project(project, dds_id): diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index ff2195db..c50aecb4 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -305,6 +305,40 @@ def create_dds_project(self): else: raise AssertionError(f"DDS project NOT set up for {self.project_id}") + def upload_data(self, delivery_id): + """Upload staged data with DDS""" + log_dir = os.path.join( + os.path.dirname(CONFIG.get("log").get("file")), "DDS_logs" + ) + project_log_dir = os.path.join(log_dir, self.project_id) + cmd = [ + "dds", + "--no-prompt", + "data", + "put", + "--mount-dir", + project_log_dir, + "--project", + delivery_id, + "--source", + self.stage_dir, + ] + try: + output = "" + for line in self._execute(cmd): + output += line + print(line, end="") + except subprocess.CalledProcessError as e: + logger.exception( + f"DDS upload failed while uploading {self.stage_dir} to {delivery_id}" + ) + raise e + if "Upload completed!" in output: + delivery_status = "uploaded" + else: + delivery_status = None + return delivery_status + def _execute(self, cmd): """Helper function to both capture and print subprocess output. Adapted from https://stackoverflow.com/a/4417735 @@ -316,10 +350,6 @@ def _execute(self, cmd): if return_code: raise subprocess.CalledProcessError(return_code, cmd) - def upload_data(self): - """Upload staged data to DDS.""" - pass - class UploadNanopore(Upload): """Defines an object for uploading Nanopore data.""" From ec3fbb8e5c9cdb57e159bd6bfb524b8a91841c5a Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Wed, 4 Dec 2024 14:26:40 +0100 Subject: [PATCH 3/9] versioning --- VERSIONLOG.md | 2 ++ taca/__init__.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/VERSIONLOG.md b/VERSIONLOG.md index a35e42e4..6d9aca7a 100644 --- a/VERSIONLOG.md +++ b/VERSIONLOG.md @@ -1,5 +1,7 @@ # TACA Version Log +## 20241204.2 +Add support for uploading ONT data to DDS ## 20241204.1 Add support for staging ONT data on Miarka diff --git a/taca/__init__.py b/taca/__init__.py index 24242ae7..473eb432 100644 --- a/taca/__init__.py +++ b/taca/__init__.py @@ -1,3 +1,3 @@ """Main TACA module""" -__version__ = "1.3.0" +__version__ = "1.4.0" From 4cdaf2464e6c1345c7cd8dd99ce37f0929f7f8f4 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Wed, 4 Dec 2024 14:29:36 +0100 Subject: [PATCH 4/9] Add requirement --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 36b52625..c4c68a0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ flowcell_parser @ git+https://github.com/SciLifeLab/flowcell_parser pandas python_crontab python_dateutil +requests setuptools From a4eb37324483cfdbcacc2d7ed421cae624b8e883 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Fri, 6 Dec 2024 09:22:45 +0100 Subject: [PATCH 5/9] Bugfix --- taca/delivery/delivery_classes.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index c50aecb4..303c1667 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -366,10 +366,10 @@ def __init__( super().__init__( project, stage_dir, - pi_email=None, - add_user=None, - project_description=None, - ignore_orderportal_members=False, + pi_email, + add_user, + project_description, + ignore_orderportal_members, ) From 968f7895fce917f4979724902b96867b27e65489 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 10 Dec 2024 10:57:02 +0100 Subject: [PATCH 6/9] spelling Co-authored-by: Anandashankar Anil --- taca/delivery/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taca/delivery/cli.py b/taca/delivery/cli.py index c7de218a..2c1960a1 100644 --- a/taca/delivery/cli.py +++ b/taca/delivery/cli.py @@ -52,7 +52,7 @@ def stage(project, flowcells, samples): envvar="ORDER_PORTAL", required=True, type=click.File("r"), - help="Path to order portal credantials to retrive project information", + help="Path to order portal credentials to retrieve project information", ) @click.option( "--statusdb_config", From 02aee5ebcf18b90849b94b0d2beb2dccdc0bb1db Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 10 Dec 2024 11:10:18 +0100 Subject: [PATCH 7/9] Make fetching additional contacts more robust --- taca/delivery/delivery_classes.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index 303c1667..e163f27e 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -214,38 +214,38 @@ def get_other_member_details( other_member_details = [] if not ignore_orderportal_members: logger.info("Fetching additional members from order portal.") - try: - owner_email = self.order_details.get("owner", {}).get("email") + if self.order_details.get("owner"): + owner_email = self.order_details.get("owner").get("email") if ( owner_email and owner_email != self.pi_email and owner_email not in other_member_details ): other_member_details.append(owner_email) - bioinfo_email = self.order_details.get("fields", {}).get( - "project_bx_email" - ) + if self.order_details.get("fields"): + bioinfo_email = self.order_details.get("fields").get("project_bx_email") if ( bioinfo_email and bioinfo_email != self.pi_email and bioinfo_email not in other_member_details ): other_member_details.append(bioinfo_email) - lab_email = self.order_details.get("fields", {}).get( - "project_lab_email" - ) + lab_email = self.order_details.get("fields").get("project_lab_email") if ( lab_email and lab_email != self.pi_email and lab_email not in other_member_details ): other_member_details.append(lab_email) - except (AssertionError, ValueError): - pass # nothing to worry, just move on + if other_member_details: + logger.info( + "The following additional contacts were found and will be added " + f"to the DDS delivery project: {', '.join(other_member_details)}" + ) if other_member_emails: logger.info( - "Other appropriate contacts were found, they will be added " - f"to DDS delivery project: {', '.join(other_member_emails)}" + "The following additional contacts were provided by the operator, " + f"they will be added to the DDS delivery project: {', '.join(other_member_emails)}" ) for email in other_member_emails: if email not in other_member_details: From d38695aec4dd6f46542fdc44b2c9af4a34e9fe5a Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 10 Dec 2024 12:18:59 +0100 Subject: [PATCH 8/9] Default if field is missing --- taca/delivery/delivery_classes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index e163f27e..3f78f124 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -214,7 +214,7 @@ def get_other_member_details( other_member_details = [] if not ignore_orderportal_members: logger.info("Fetching additional members from order portal.") - if self.order_details.get("owner"): + if self.order_details.get("owner", {}): owner_email = self.order_details.get("owner").get("email") if ( owner_email @@ -222,7 +222,7 @@ def get_other_member_details( and owner_email not in other_member_details ): other_member_details.append(owner_email) - if self.order_details.get("fields"): + if self.order_details.get("fields", {}): bioinfo_email = self.order_details.get("fields").get("project_bx_email") if ( bioinfo_email From 3bbb5d920162e7fe1e1547df279bd2426cd233c7 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 10 Dec 2024 12:23:39 +0100 Subject: [PATCH 9/9] Defaults for emails from orderportal --- taca/delivery/delivery_classes.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/taca/delivery/delivery_classes.py b/taca/delivery/delivery_classes.py index 3f78f124..ca596c45 100644 --- a/taca/delivery/delivery_classes.py +++ b/taca/delivery/delivery_classes.py @@ -215,7 +215,7 @@ def get_other_member_details( if not ignore_orderportal_members: logger.info("Fetching additional members from order portal.") if self.order_details.get("owner", {}): - owner_email = self.order_details.get("owner").get("email") + owner_email = self.order_details.get("owner").get("email", {}) if ( owner_email and owner_email != self.pi_email @@ -223,14 +223,18 @@ def get_other_member_details( ): other_member_details.append(owner_email) if self.order_details.get("fields", {}): - bioinfo_email = self.order_details.get("fields").get("project_bx_email") + bioinfo_email = self.order_details.get("fields").get( + "project_bx_email", {} + ) if ( bioinfo_email and bioinfo_email != self.pi_email and bioinfo_email not in other_member_details ): other_member_details.append(bioinfo_email) - lab_email = self.order_details.get("fields").get("project_lab_email") + lab_email = self.order_details.get("fields").get( + "project_lab_email", {} + ) if ( lab_email and lab_email != self.pi_email