Skip to content

Commit

Permalink
another portion of refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Oct 31, 2024
1 parent 26cd9fc commit 23e256b
Show file tree
Hide file tree
Showing 19 changed files with 144 additions and 305 deletions.
2 changes: 1 addition & 1 deletion src/utils/operandi_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
OLA_HD_USER = "admin"
OLA_HD_PASSWORD = "JW24G.xR"

OPERANDI_VERSION = get_distribution("operandi_utils").version
OPERANDI_VERSION = '2.17.0'


class AccountType(str, Enum):
Expand Down
6 changes: 3 additions & 3 deletions src/utils/operandi_utils/oton/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
__all__ = ["cli", "OTONConverter", "NextflowBlockProcess", "NextflowBlockWorkflow", "NextflowFileExecutable"]

from .cli import cli
from .converter import OTONConverter
from .oton_converter import OTONConverter
from .nf_block_process import NextflowBlockProcess
from .nf_block_workflow import NextflowBlockWorkflow
from .nf_file_executable import NextflowFileExecutable
from .parser import OCRDParser, ProcessorCallArguments
from .validator import OCRDValidator
from .ocrd_parser import OCRDParser, ProcessorCallArguments
from .ocrd_validator import OCRDValidator
4 changes: 2 additions & 2 deletions src/utils/operandi_utils/oton/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import click
from operandi_utils.oton.converter import OTONConverter
from operandi_utils.oton.validator import OCRDValidator
from operandi_utils.oton.oton_converter import OTONConverter
from operandi_utils.oton.ocrd_validator import OCRDValidator


@click.group()
Expand Down
3 changes: 3 additions & 0 deletions src/utils/operandi_utils/oton/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from json import load
from os import environ
from pkg_resources import resource_filename
from operandi_utils.constants import OPERANDI_VERSION


__all__ = [
Expand Down Expand Up @@ -55,3 +56,5 @@
PH_DIR_OUT: str = f'${BS[0]}{DIR_OUT}{BS[1]}'
PH_METS_FILE: str = f'${BS[0]}{METS_FILE}{BS[1]}'
SPACES = ' '

WORKFLOW_COMMENT = f"// This workflow was automatically generated by the v{OPERANDI_VERSION} operandi_utils.oton module"
40 changes: 14 additions & 26 deletions src/utils/operandi_utils/oton/nf_block_process.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,22 @@
import logging
from operandi_utils.oton.validator import ProcessorCallArguments
from operandi_utils.oton.constants import (
OTON_LOG_LEVEL, PH_DIR_IN, PH_DIR_OUT, PH_METS_FILE, PH_ENV_WRAPPER, SPACES)
from logging import getLevelName, getLogger
from operandi_utils.oton.ocrd_validator import ProcessorCallArguments
from operandi_utils.oton.constants import OTON_LOG_LEVEL, PH_ENV_WRAPPER, SPACES


class NextflowBlockProcess:
def __init__(self, processor_call_arguments: ProcessorCallArguments, index_pos: int, env_wrapper: bool = False):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.getLevelName(OTON_LOG_LEVEL))
self.logger = getLogger(__name__)
self.logger.setLevel(getLevelName(OTON_LOG_LEVEL))

self.processor_call_arguments: ProcessorCallArguments = processor_call_arguments
self.env_wrapper: bool = env_wrapper
self.nf_process_name: str = processor_call_arguments.executable.replace('-', '_') + "_" + str(index_pos)
self.directives = {}
self.input_params = {}
self.output_params = {}

self.env_wrapper = env_wrapper
self.nf_process_name = processor_call_arguments.executable.replace('-', '_') + "_" + str(index_pos)
self.repr_in_workflow = [
self.nf_process_name,
f'"{processor_call_arguments.input_file_grps}"',
f'"{processor_call_arguments.output_file_grps}"'
]

processor_call_arguments.input_file_grps = PH_DIR_IN
processor_call_arguments.output_file_grps = PH_DIR_OUT
processor_call_arguments.mets_file_path = PH_METS_FILE

self.ocrd_command_bash = f'{processor_call_arguments}'
self.ocrd_command_bash = processor_call_arguments.dump_bash_form()
self.ocrd_command_bash_placeholders = processor_call_arguments.dump_bash_form_with_placeholders()

def add_directive(self, directive: str, value: str):
if directive in self.directives:
Expand All @@ -51,38 +42,35 @@ def dump_directives(self) -> str:

def dump_parameters_input(self) -> str:
dump = ''
dump += f'{SPACES}input:\n'
for key, value in self.input_params.items():
dump += f'{SPACES}{SPACES}{value} {key}\n'
dump += '\n'
return dump

def dump_parameters_output(self) -> str:
dump = ''
dump += f'{SPACES}output:\n'
for key, value in self.output_params.items():
dump += f'{SPACES}{SPACES}{value} {key}\n'
dump += '\n'
return dump

def dump_script(self) -> str:
dump = ''
dump += f'{SPACES}script:\n'
dump += f'{SPACES}{SPACES}"""\n'
dump += f'{SPACES}{SPACES}'
if self.env_wrapper:
dump += f'{PH_ENV_WRAPPER} '
dump += f'{self.ocrd_command_bash}\n'
dump += f'{self.ocrd_command_bash_placeholders}\n'
dump += f'{SPACES}{SPACES}"""\n'
return dump

def file_representation(self):
representation = f'process {self.nf_process_name}'
representation += ' {\n'
representation += self.dump_directives()
representation += self.dump_parameters_input()
representation += self.dump_parameters_output()
representation += self.dump_script()
representation += f'{SPACES}input:\n{self.dump_parameters_input()}'
representation += f'{SPACES}output:\n{self.dump_parameters_output()}'
representation += f'{SPACES}script:\n{self.dump_script()}'
representation += '}\n'
self.logger.debug(f"\n{representation}")
return representation
30 changes: 18 additions & 12 deletions src/utils/operandi_utils/oton/nf_block_workflow.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
import logging
from logging import getLevelName, getLogger
from typing import List
from operandi_utils.oton.constants import (
OTON_LOG_LEVEL, PARAMS_KEY_METS_PATH, PARAMS_KEY_INPUT_FILE_GRP, SPACES)
from operandi_utils.oton.constants import OTON_LOG_LEVEL, PARAMS_KEY_METS_PATH, PARAMS_KEY_INPUT_FILE_GRP, SPACES
from operandi_utils.oton.nf_block_process import NextflowBlockProcess


class NextflowBlockWorkflow:
def __init__(self, workflow_name: str, nf_processes: List[str]):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.getLevelName(OTON_LOG_LEVEL))
def __init__(self, workflow_name: str, nf_processes: List[NextflowBlockProcess]):
self.logger = getLogger(__name__)
self.logger.setLevel(getLevelName(OTON_LOG_LEVEL))

self.workflow_name = workflow_name
self.nf_processes: List[str] = nf_processes
self.nf_blocks_process: List[NextflowBlockProcess] = nf_processes

def file_representation(self):
representation = 'workflow {\n'
representation += f'{SPACES}{self.workflow_name}:\n'

self.logger.info(self.nf_blocks_process)

previous_nfp = None
for nfp in self.nf_processes:
nfp_0, nfp_1, nfp_2 = nfp[0], nfp[1], nfp[2]
for nfp in self.nf_blocks_process:
nfp_1 = nfp.processor_call_arguments.input_file_grps
nfp_2 = nfp.processor_call_arguments.output_file_grps
representation += f'{SPACES}{SPACES}{nfp.nf_process_name}('
if previous_nfp is None:
representation += f'{SPACES}{SPACES}{nfp_0}({PARAMS_KEY_METS_PATH}, {PARAMS_KEY_INPUT_FILE_GRP}, {nfp_2})\n'
representation += f'{PARAMS_KEY_METS_PATH}, {PARAMS_KEY_INPUT_FILE_GRP}, "{nfp_2}")'
else:
representation += f'{SPACES}{SPACES}{nfp_0}({previous_nfp}.out, {nfp_1}, {nfp_2})\n'
previous_nfp = nfp_0
representation += f'{previous_nfp}.out, "{nfp_1}", "{nfp_2}")'
representation += f'\n'
previous_nfp = nfp.nf_process_name

representation += '}'

Expand Down
60 changes: 25 additions & 35 deletions src/utils/operandi_utils/oton/nf_file_executable.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import logging
from logging import getLevelName, getLogger
from typing import List, Tuple

from operandi_utils.oton.validator import ProcessorCallArguments
from operandi_utils.oton.ocrd_validator import ProcessorCallArguments
from operandi_utils.oton.constants import (
DIR_IN, DIR_OUT, METS_FILE,
OTON_LOG_LEVEL,
PARAMS_KEY_INPUT_FILE_GRP,
REPR_ENV_WRAPPER,
REPR_INPUT_FILE_GRP,
REPR_METS_PATH,
REPR_WORKSPACE_DIR
REPR_WORKSPACE_DIR,
WORKFLOW_COMMENT
)
from operandi_utils.oton.nf_block_process import NextflowBlockProcess
from operandi_utils.oton.nf_block_workflow import NextflowBlockWorkflow


class NextflowFileExecutable:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.getLevelName(OTON_LOG_LEVEL))
self.logger = getLogger(__name__)
self.logger.setLevel(getLevelName(OTON_LOG_LEVEL))

self.nf_lines_parameters = []
self.nf_lines_parameters: List[str] = []
self.nf_blocks_process: List[NextflowBlockProcess] = []
self.nf_blocks_workflow: List[NextflowBlockWorkflow] = []

Expand Down Expand Up @@ -48,65 +50,53 @@ def build_parameters_docker(self):
def build_parameters_apptainer(self):
raise NotImplemented("This feature is not implemented yet!")

def build_nextflow_processes_local(self, ocrd_processor: List[ProcessorCallArguments]) -> Tuple[List[str], str]:
nf_processes = []
first_file_grps = "DEFAULT"
def build_nextflow_processes_local(self, ocrd_processor: List[ProcessorCallArguments]):
index = 0
for processor in ocrd_processor:
nf_process_block = NextflowBlockProcess(processor, index, False)
nf_process_block = NextflowBlockProcess(processor, index, env_wrapper=False)
nf_process_block.add_directive(directive='maxForks', value='1')
nf_process_block.add_parameter_input(parameter=METS_FILE, parameter_type='path')
nf_process_block.add_parameter_input(parameter=DIR_IN, parameter_type='val')
nf_process_block.add_parameter_input(parameter=DIR_OUT, parameter_type='val')
nf_process_block.add_parameter_output(parameter=METS_FILE, parameter_type='path')
self.nf_blocks_process.append(nf_process_block)

# Take the input_file_grp of the first processor and change the value of the
# REPR_INPUT_FILE_GRP to set the desired file group as default
if index == 0:
first_file_grps = nf_process_block.repr_in_workflow[1].strip('"')

# This list is used when building the workflow
nf_processes.append(nf_process_block.repr_in_workflow)
self.logger.info(f"Successfully created Nextflow Process: {nf_process_block.nf_process_name}")
index += 1
return nf_processes, first_file_grps

def build_nextflow_processes_docker(self, ocrd_processor: List[ProcessorCallArguments]) -> Tuple[List[str], str]:
nf_processes = []
first_file_grps = "DEFAULT"
def build_nextflow_processes_docker(self, ocrd_processor: List[ProcessorCallArguments]):
index = 0
for processor in ocrd_processor:
nf_process_block = NextflowBlockProcess(processor, index, True)
nf_process_block = NextflowBlockProcess(processor, index, env_wrapper=True)
nf_process_block.add_directive(directive='maxForks', value='1')
nf_process_block.add_parameter_input(parameter=METS_FILE, parameter_type='path')
nf_process_block.add_parameter_input(parameter=DIR_IN, parameter_type='val')
nf_process_block.add_parameter_input(parameter=DIR_OUT, parameter_type='val')
nf_process_block.add_parameter_output(parameter=METS_FILE, parameter_type='path')
self.nf_blocks_process.append(nf_process_block)

# Take the input_file_grp of the first processor and change the value of the
# REPR_INPUT_FILE_GRP to set the desired file group as default
if index == 0:
first_file_grps = nf_process_block.repr_in_workflow[1].strip('"')

# This list is used when building the workflow
nf_processes.append(nf_process_block.repr_in_workflow)
self.logger.info(f"Successfully created Nextflow Process: {nf_process_block.nf_process_name}")
index += 1

return nf_processes, first_file_grps

def build_nextflow_processes_apptainer(self, ocrd_processor: List[ProcessorCallArguments]) -> Tuple[List[str], str]:
raise NotImplemented("This feature is not implemented yet!")

def build_main_workflow(self, nf_processes: List[str]):
nf_workflow_block = NextflowBlockWorkflow("main", nf_processes)
def __assign_first_file_grps_param(self):
first_file_grps = self.nf_blocks_process[0].processor_call_arguments.input_file_grps
index = 0
for parameter in self.nf_lines_parameters:
if PARAMS_KEY_INPUT_FILE_GRP in parameter:
self.nf_lines_parameters[index] = parameter.replace("null", first_file_grps)
break
index += 1

def build_main_workflow(self):
self.__assign_first_file_grps_param()
nf_workflow_block = NextflowBlockWorkflow(workflow_name="main", nf_processes=self.nf_blocks_process)
self.nf_blocks_workflow.append(nf_workflow_block)

def produce_nextflow_file(self, output_path: str):
# Write Nextflow line tokens to an output file
with open(output_path, mode='w', encoding='utf-8') as nextflow_file:
nextflow_file.write(f"{WORKFLOW_COMMENT}\n")
for nextflow_line in self.nf_lines_parameters:
nextflow_file.write(f'{nextflow_line}\n')
for block in self.nf_blocks_process:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,56 +1,16 @@
import json
import logging
from logging import getLevelName, getLogger
from shlex import split as shlex_split
from typing import List, Optional, Tuple
from typing import List, Tuple

from ocrd_utils import parse_json_string_or_file, set_json_key_value_overrides
from operandi_utils.oton.constants import OCRD_ALL_JSON, OTON_LOG_LEVEL


# This class is based on ocrd.task_sequence.ProcessorTask
class ProcessorCallArguments:
def __init__(
self,
executable: str,
input_file_grps: Optional[str] = None,
output_file_grps: Optional[str] = None,
parameters: Optional[dict] = None,
mets_file_path: str = "./mets.xml"
):
if not executable:
raise ValueError(f"Missing executable name")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.getLevelName(OTON_LOG_LEVEL))

self.executable = f'ocrd-{executable}'
self.mets_file_path = mets_file_path
self.input_file_grps = input_file_grps
self.output_file_grps = output_file_grps
self.parameters = parameters if parameters else {}
self.ocrd_tool_json = OCRD_ALL_JSON.get(self.executable, None)

def __str__(self):
str_repr = f"{self.executable} -m {self.mets_file_path} -I {self.input_file_grps} -O {self.output_file_grps}"
if self.parameters:
str_repr += f" -p '{json.dumps(self.parameters)}'"
return str_repr

def self_validate(self):
if not self.ocrd_tool_json:
self.logger.error(f"Ocrd tool JSON of '{self.executable}' not found!")
raise ValueError(f"Ocrd tool JSON of '{self.executable}' not found!")
if not self.input_file_grps:
self.logger.error(f"Processor '{self.executable}' requires 'input_file_grp' but none was provided.")
raise ValueError(f"Processor '{self.executable}' requires 'input_file_grp' but none was provided.")
if 'output_file_grp' in self.ocrd_tool_json and not self.output_file_grps:
self.logger.error(f"Processor '{self.executable}' requires 'output_file_grp' but none was provided.")
raise ValueError(f"Processor '{self.executable}' requires 'output_file_grp' but none was provided.")
from operandi_utils.oton.constants import OTON_LOG_LEVEL
from operandi_utils.oton.process_call_arguments import ProcessorCallArguments


class OCRDParser:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.getLevelName(OTON_LOG_LEVEL))
self.logger = getLogger(__name__)
self.logger.setLevel(getLevelName(OTON_LOG_LEVEL))

def parse_arguments(self, processor_arguments) -> ProcessorCallArguments:
tokens = shlex_split(processor_arguments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ocrd_validators import ParameterValidator
from operandi_utils.oton.constants import OTON_LOG_LEVEL
from operandi_utils.oton.parser import OCRDParser, ProcessorCallArguments
from operandi_utils.oton.ocrd_parser import OCRDParser, ProcessorCallArguments


class OCRDValidator:
Expand All @@ -19,7 +19,7 @@ def validate(self, input_file: str) -> List[ProcessorCallArguments]:
ocrd_process_command, processor_tasks = self.ocrd_parser.read_from_file(input_file)
self.validate_ocrd_process_command(ocrd_process_command)

list_processor_call_arguments = []
list_processor_call_arguments: List[ProcessorCallArguments] = []
for processor_arguments in processor_tasks:
processor_call_arguments: ProcessorCallArguments = self.ocrd_parser.parse_arguments(processor_arguments)
list_processor_call_arguments.append(processor_call_arguments)
Expand Down
Loading

0 comments on commit 23e256b

Please sign in to comment.