Skip to content

Commit

Permalink
Merge pull request #13 from kedhammar/ssjunnebo_aviti
Browse files Browse the repository at this point in the history
Troubleshooting and fixing manifest handling
  • Loading branch information
ssjunnebo authored Oct 7, 2024
2 parents 1e168a2 + b57306d commit dda10c6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 94 deletions.
21 changes: 9 additions & 12 deletions taca/analysis/analysis_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ def _process(run):
#### Demultiplexing status ####
demultiplexing_status = run.get_demultiplexing_status()
if demultiplexing_status == "not started":
if run.manifest_exists():
lims_zip_path = run.find_lims_zip()
if lims_zip_path is not None:
os.mkdir(run.demux_dir)
run.copy_manifests()
run_manifests = glob.glob(
os.path.join(run.run_dir, "RunManifest_*.csv")
run.copy_manifests(lims_zip_path)
demux_manifests = run.make_demux_manifests(
manifest_to_split=run.lims_manifest
)
sub_demux_count = 0
for run_manifest in run_manifests.sort():
for demux_manifest in demux_manifests.sort():
demux_dir = f"Demultiplexing_{sub_demux_count}"
os.mkdir(demux_dir)
run.start_demux(run_manifest, demux_dir)
run.start_demux(demux_manifest, demux_dir)
sub_demux_count += 1
run.status = "demultiplexing"
if run.status_changed:
Expand All @@ -83,9 +84,7 @@ def _process(run):
f"Unknown demultiplexing status {demultiplexing_status} of run {run}. Please investigate."
)
email_subject = f"Issues processing {run}"
email_message = (
f"Unknown demultiplexing status {demultiplexing_status} of run {run}. Please investigate."
)
email_message = f"Unknown demultiplexing status {demultiplexing_status} of run {run}. Please investigate."
send_mail(email_subject, email_message, CONFIG["mail"]["recipients"])
return

Expand Down Expand Up @@ -146,9 +145,7 @@ def _process(run):
run = Aviti_Run(given_run, CONFIG)
_process(run)
else:
data_dirs = CONFIG.get("element_analysis").get(
"data_dirs"
)
data_dirs = CONFIG.get("element_analysis").get("data_dirs")
for data_dir in data_dirs:
# Run folder looks like DATE_*_*, the last section is the FC side (A/B) and name
runs = glob.glob(os.path.join(data_dir, "[1-9]*_*_*"))
Expand Down
108 changes: 50 additions & 58 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def get_mask(
else:
mask += f"{current_group}{current_group_len}"
mask += f"N{diff}"
else:
mask += f"{current_group}{current_group_len}"

# Parse mask string to check that it matches the number of cycles used
assert (
Expand Down Expand Up @@ -191,7 +193,7 @@ def parse_run_parameters(self) -> None:
self.run_name = run_parameters.get("RunName")

self.run_id = run_parameters.get(
"runID"
"RunID"
) # Unique hash that we don't really use
self.side = run_parameters.get("Side") # SideA or SideB
self.side_letter = self.side[
Expand Down Expand Up @@ -331,27 +333,19 @@ def update_statusdb(self):
doc_obj = self.to_doc_obj()
self.db.upload_to_statusdb(doc_obj)

def manifest_exists(self):
zip_src_path = self.find_manifest_zip()
return os.path.isfile(zip_src_path)

def get_lims_step_id(self) -> str | None:
"""If the run was started using a LIMS-generated manifest,
the ID of the LIMS step can be extracted from it.
"""

# TODO: test me
with open(self.run_manifest_file_from_instrument) as json_file:
manifest_json = json.load(json_file)

lims_step_id = manifest_json.get("RunValues").get("lims_step_id")

assert self.manifest_exists(), "Run manifest not found"
with open(self.run_manifest_file_from_instrument) as csv_file:
manifest_lines = csv_file.readlines()
for line in manifest_lines:
if "lims_step_id" in line:
lims_step_id = line.split(",")[1]
return lims_step_id
return None
return lims_step_id

def find_manifest_zip(self):
def find_lims_zip(self) -> str | None:
# Specify dir in which LIMS drop the manifest zip files
dir_to_search = os.path.join(
self.CONFIG.get("element_analysis")
Expand All @@ -362,7 +356,8 @@ def find_manifest_zip(self):
)

# Use LIMS step ID if available, else flowcell ID, to make a query pattern
if self.lims_step_id:
self.lims_step_id = self.get_lims_step_id()
if self.lims_step_id is not None:
logging.info(
f"Using LIMS step ID '{self.lims_step_id}' to find LIMS run manifests."
)
Expand All @@ -379,44 +374,38 @@ def find_manifest_zip(self):
logger.warning(
f"No manifest found for run '{self.run_dir}' with pattern '{glob_pattern}'."
)
return False # TODO: determine whether to raise an error here instead
return None
elif len(glob_results) > 1:
logger.warning(
f"Multiple manifests found for run '{self.run_dir}' with pattern '{glob_pattern}', using latest one."
)
glob_results.sort()
zip_src_path = glob_results[-1]
lims_zip_src_path = glob_results[-1]
else:
zip_src_path = glob_results[0]
return zip_src_path
lims_zip_src_path = glob_results[0]
return lims_zip_src_path

def copy_manifests(self) -> bool:
def copy_manifests(self, zip_src_path):
"""Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir."""
zip_src_path = self.find_manifest_zip()
# Make a run subdir named after the zip file and extract manifests there
zip_name = os.path.basename(zip_src_path)
zip_dst_path = os.path.join(self.run_dir, zip_name)
os.mkdir(zip_dst_path)

# Extract the contents of the zip file into the destination directory
unzipped_manifests = []
with zipfile.ZipFile(zip_src_path, "r") as zip_ref:
zip_ref.extractall(zip_dst_path)

# Set the paths of the different manifests as attributes
manifests = os.listdir(zip_dst_path)
self.lims_full_manifest = [
m for m in manifests if re.match(r".*_untrimmed\.csv$", m)
for member in zip_ref.namelist():
# Extract each file individually into the destination directory
filename = os.path.basename(member)
if filename: # Skip directories
source = zip_ref.open(member)
target = open(os.path.join(self.run_dir, filename), "wb")
unzipped_manifests.append(target.name)
with source, target:
target.write(source.read())

# Pick out the manifest to use
self.lims_manifest = [
m for m in unzipped_manifests if re.match(r".*_untrimmed\.csv$", m)
][0]
self.lims_start_manifest = [
m for m in manifests if re.match(r".*_trimmed\.csv$", m)
][0]
self.lims_empty_manifest = [
m for m in manifests if re.match(r".*_empty\.csv$", m)
][0]
self.lims_demux_manifests = [
m for m in manifests if re.match(r".*_\d+\.csv$", m)
]

return True

def make_demux_manifests(
self, manifest_to_split: os.PathLike, outdir: os.PathLike | None = None
Expand Down Expand Up @@ -453,10 +442,10 @@ def make_demux_manifests(
df_samples = df[df["Project"] != "Control"].copy()
df_controls = df[df["Project"] == "Control"].copy()

# Bool indicating whether UMI is present
# Add bool indicating whether UMI is present
df_samples["has_umi"] = df_samples["Index2"].str.contains("N")

# Add cols denoting idx and umi masks
# Add masks for indices and UMIs
df_samples["I1Mask"] = df_samples["Index1"].apply(
lambda seq: get_mask(
seq=seq,
Expand All @@ -482,7 +471,7 @@ def make_demux_manifests(
)
)

# Re-make idx col without Ns
# Re-make Index2 column without any Ns
df_samples["Index2_umi"] = df_samples["Index2"]
df_samples.loc[:, "Index2"] = df_samples["Index2"].apply(
lambda x: x.replace("N", "")
Expand All @@ -493,15 +482,21 @@ def make_demux_manifests(
outdir = self.run_dir

# Break down into groups by non-consolable properties
grouped_df = df_samples.groupby(
["I1Mask", "I2Mask", "UmiMask", "Lane", "Recipe"]
)
grouped_df = df_samples.groupby(["I1Mask", "I2Mask", "UmiMask", "Recipe"])

# Sanity check
if sum([len(group) for _, group in grouped_df]) < len(df_samples):
msg = "Some samples were not included in any submanifest."
logging.error(msg)
raise AssertionError(msg)
elif sum([len(group) for _, group in grouped_df]) > len(df_samples):
logging.warning("Some samples were included in multiple submanifests.")

# Iterate over groups to build composite manifests
manifest_root_name = f"{self.NGI_run_id}_demux"
manifests = []
n = 0
for (I1Mask, I2Mask, UmiMask, lane, recipe), group in grouped_df:
for (I1Mask, I2Mask, UmiMask, recipe), group in grouped_df:
file_name = f"{manifest_root_name}_{n}.csv"

runValues_section = "\n".join(
Expand All @@ -510,7 +505,7 @@ def make_demux_manifests(
"KeyName, Value",
f'manifest_file, "{file_name}"',
f"manifest_group, {n+1}/{len(grouped_df)}",
f"grouped_by, I1Mask:'{I1Mask}' I2Mask:'{I2Mask}' UmiMask:'{UmiMask}' lane:{lane} recipe:'{recipe}'",
f"grouped_by, I1Mask:'{I1Mask}' I2Mask:'{I2Mask}' UmiMask:'{UmiMask}' recipe:'{recipe}'",
]
)

Expand Down Expand Up @@ -538,8 +533,9 @@ def make_demux_manifests(
)

# Add PhiX stratified by index length
# Subset controls by lane
group_controls = df_controls[df_controls["Lane"] == lane].copy()
group_controls = df_controls[
df_controls["Lane"].isin(group["Lane"].unique())
].copy()

# Trim PhiX indexes to match group
i1_len = group["Index1"].apply(len).max()
Expand Down Expand Up @@ -1089,9 +1085,7 @@ def sync_metadata(self):
os.path.join(self.run_dir, "Demultiplexing", "UnassignedSequences.csv"),
self.run_parameters_file,
]
metadata_archive = self.CONFIG.get("element_analysis").get(
"metadata_location"
)
metadata_archive = self.CONFIG.get("element_analysis").get("metadata_location")
dest = os.path.join(metadata_archive, self.NGI_run_id)
os.makedirs(dest)
for f in files_to_copy:
Expand All @@ -1102,9 +1096,7 @@ def make_transfer_indicator(self):
Path(transfer_indicator).touch()

def transfer(self):
transfer_details = self.CONFIG.get("element_analysis").get(
"transfer_details"
)
transfer_details = self.CONFIG.get("element_analysis").get("transfer_details")
command = (
"rsync"
+ " -rLav"
Expand Down
24 changes: 0 additions & 24 deletions tests/element/test_Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,30 +158,6 @@ def test_get_demultiplexing_status(

assert run.get_demultiplexing_status() == p["expected"]

@pytest.mark.skip(reason="Not implemented yet")
@pytest.mark.parametrize(
"p",
[
{"run_finished": True, "expected": True},
{"run_finished": False, "expected": False},
],
ids=["exists", "does not exist"],
)
def test_manifest_exists(
self, mock_db: mock.Mock, create_dirs: pytest.fixture, p: pytest.fixture
):
tmp: tempfile.TemporaryDirectory = create_dirs

run = to_test.Run(
create_element_run_dir(
tmp,
run_finished=p["run_finished"],
),
get_config(tmp),
)

assert run.manifest_exists() == p["expected"]

@pytest.mark.skip(reason="Not implemented yet")
def test_generate_demux_command(self, mock_db):
pass
Expand Down

0 comments on commit dda10c6

Please sign in to comment.