Skip to content

Commit

Permalink
Implement stages and filtering in IDC split script.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Jul 1, 2023
1 parent b0d0a0b commit 22b7bbb
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 23 deletions.
80 changes: 63 additions & 17 deletions src/ephemeris/_idc_split_data_manager_genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,27 @@
log = logging.getLogger(__name__)


class Filters:
stage: Optional[int] = None
data_manager: Optional[str] = None
build_id: Optional[str] = None

def filter_out_data_manager(self, data_manager: str) -> bool:
return bool(self.data_manager and data_manager != self.data_manager)

def filter_out_build_id(self, build_id: str) -> bool:
return bool(self.build_id and build_id != self.build_id)

def filter_out_stage(self, stage: int) -> bool:
return bool(self.stage is not None and self.stage != stage)


class SplitOptions:
merged_genomes_path: str
split_genomes_path: str
data_managers_path: str
is_build_complete: IsBuildComplete
filters: Filters = Filters()


def tool_id_for(indexer: str, data_managers: Dict[str, DataManager]) -> str:
Expand Down Expand Up @@ -94,34 +110,31 @@ def write_run_data_manager_to_file(run_data_manager: RunDataManager, path: str):
yaml.safe_dump(run_data_managers.dict(exclude_unset=True), of)


def split_genomes(split_options: SplitOptions) -> None:

def write_task_file(run_data_manager: RunDataManager, build_id: str, indexer: str):
split_genomes_path = split_options.split_genomes_path
if not os.path.exists(split_options.split_genomes_path):
safe_makedirs(split_genomes_path)

task_file_dir = os.path.join(split_genomes_path, build_id, indexer)
task_file = os.path.join(task_file_dir, TASK_FILE_NAME)
write_run_data_manager_to_file(run_data_manager, task_file)

def walk_over_incomplete_runs(split_options: SplitOptions):
data_managers = read_data_managers_configuration(split_options.data_managers_path)
with open(split_options.merged_genomes_path) as f:
genomes_all = yaml.safe_load(f)
genomes = genomes_all["genomes"]
for genome in genomes:
build_id = genome["id"]
if split_options.filters.filter_out_build_id(build_id):
continue

fetch_indexer = "data_manager_fetch_genome_dbkeys_all_fasta"
if not split_options.is_build_complete(build_id, fetch_indexer):
do_fetch = not split_options.filters.filter_out_data_manager(fetch_indexer)
source = genome.get("source")
if source is None:
do_fetch = False
if do_fetch and split_options.filters.filter_out_stage(0):
do_fetch = False

if do_fetch and not split_options.is_build_complete(build_id, fetch_indexer):
log.info(f"Fetching: {build_id}")
fetch_tool_id = tool_id_for(fetch_indexer, data_managers)
fetch_params = []
fetch_params.append({"dbkey_source|dbkey": genome["id"]})
source = genome.get("source")
if source is None:
continue
elif source == "ucsc":
if source == "ucsc":
fetch_params.append({"reference_source|reference_source_selector": "ucsc"})
fetch_params.append({"reference_source|requested_dbkey": genome["id"]})
fetch_params.append({"sequence_name": genome["description"]})
Expand All @@ -146,12 +159,18 @@ def write_task_file(run_data_manager: RunDataManager, build_id: str, indexer: st
# Not needed according to Marius
# data_table_reload=["all_fasta", "__dbkeys__"],
)
write_task_file(fetch_run_data_manager, build_id, fetch_indexer)
yield (build_id, fetch_indexer, fetch_run_data_manager)
else:
log.debug(f"Fetch is already completed: {build_id}")

indexers = genome.get("indexers", [])
for indexer in indexers:
if split_options.filters.filter_out_data_manager(indexer):
continue

if split_options.filters.filter_out_stage(1):
continue

if split_options.is_build_complete(build_id, indexer):
log.debug(f"Build is already completed: {build_id} {indexer}")
continue
Expand Down Expand Up @@ -179,7 +198,22 @@ def write_task_file(run_data_manager: RunDataManager, build_id: str, indexer: st
params=params,
items=[item],
)
write_task_file(run_data_manager, build_id, indexer)
yield (build_id, indexer, run_data_manager)


def split_genomes(split_options: SplitOptions) -> None:

def write_task_file(build_id: str, indexer: str, run_data_manager: RunDataManager):
split_genomes_path = split_options.split_genomes_path
if not os.path.exists(split_options.split_genomes_path):
safe_makedirs(split_genomes_path)

task_file_dir = os.path.join(split_genomes_path, build_id, indexer)
task_file = os.path.join(task_file_dir, TASK_FILE_NAME)
write_run_data_manager_to_file(run_data_manager, task_file)

for build_id, indexer, run_data_manager in walk_over_incomplete_runs(split_options):
write_task_file(build_id, indexer, run_data_manager)


class GalaxyHistoryIsBuildComplete:
Expand All @@ -199,6 +233,12 @@ def _parser():
parser.add_argument('--merged-genomes-path', '-m', default="genomes.yml")
parser.add_argument('--split-genomes-path', '-s', default="data_manager_tasks")
parser.add_argument('--data-managers-path', default="data_managers.yml")

# filters
parser.add_argument('--filter-stage', default=None)
parser.add_argument('--filter-data-manager', default=None)
parser.add_argument('--filter-build-id', default=None)

return parser


Expand All @@ -225,6 +265,12 @@ def main():
split_options.split_genomes_path = args.split_genomes_path
split_options.is_build_complete = is_build_complete

filters = Filters()
filters.build_id = args.filter_build_id
filters.data_manager = args.filter_data_manager
filters.stage = args.filter_stage
split_options.filters = filters

split_genomes(split_options)


Expand Down
80 changes: 74 additions & 6 deletions tests/test_split_genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import yaml

from ephemeris._idc_split_data_manager_genomes import (
Filters,
GalaxyHistoryIsBuildComplete,
RunDataManagers,
split_genomes,
Expand Down Expand Up @@ -62,19 +63,22 @@ def read_and_validate_run_data_manager_yaml(path):
return RunDataManagers(**yaml.safe_load(f))


def test_split_genomes(tmp_path: Path):
setup_mock_idc_dir(tmp_path)

split_path = tmp_path / "split"

def split_options_for(tmp_path: Path) -> SplitOptions:
history_names = ["idc-hg19_rCRS_pUC18_phiX174-data_manager_star_index_builder"]
is_build_complete = GalaxyHistoryIsBuildComplete(history_names)

split_options = SplitOptions()
split_options.merged_genomes_path = tmp_path / "genomes.yml"
split_options.split_genomes_path = str(split_path)
split_options.split_genomes_path = str(tmp_path / "split")
split_options.data_managers_path = tmp_path / "data_managers.yml"
split_options.is_build_complete = is_build_complete
return split_options


def test_split_genomes(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
split_genomes(split_options)
new_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_twobit_builder"
complete_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_star_index_builder"
Expand All @@ -89,3 +93,67 @@ def test_split_genomes(tmp_path: Path):
assert data_manager.id == "toolshed.g2.bx.psu.edu/repos/devteam/data_manager_twobit_builder/twobit_builder_data_manager/0.0.2"
assert data_manager.items[0]["id"] == "hg19_rCRS_pUC18_phiX174"
assert data_manager.items[0]["dbkey"] == "hg19_rCRS_pUC18_phiX174"


def test_split_genomes_filter_on_data_manager(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
filters = Filters()
filters.data_manager = "data_manager_star_index_builder"
split_options.filters = filters

split_genomes(split_options)
new_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_twobit_builder"
assert not new_task.exists()

filters.data_manager = "data_manager_twobit_builder"
split_genomes(split_options)
assert new_task.exists()


def test_split_genomes_filter_on_build_id(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
filters = Filters()
filters.build_id = "rn6"
split_options.filters = filters

split_genomes(split_options)
filtered_out_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_twobit_builder"
assert not filtered_out_task.exists()

filtered_in_task = split_path / "rn6" / "data_manager_twobit_builder"
assert filtered_in_task.exists()


def test_split_genomes_filter_on_stage_0(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
filters = Filters()
filters.stage = 0
split_options.filters = filters

split_genomes(split_options)
filtered_out_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_twobit_builder"
assert not filtered_out_task.exists()

filtered_in_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_fetch_genome_dbkeys_all_fasta"
assert filtered_in_task.exists()

def test_split_genomes_filter_on_stage_1(tmp_path: Path):
setup_mock_idc_dir(tmp_path)
split_path = tmp_path / "split"
split_options = split_options_for(tmp_path)
filters = Filters()
filters.stage = 1
split_options.filters = filters

split_genomes(split_options)
filtered_out_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_fetch_genome_dbkeys_all_fasta"
assert not filtered_out_task.exists()

filtered_in_task = split_path / "hg19_rCRS_pUC18_phiX174" / "data_manager_twobit_builder"
assert filtered_in_task.exists()

0 comments on commit 22b7bbb

Please sign in to comment.