From 0cc91e918155cde3fe93dfbfd8249e9c8b0d2b9f Mon Sep 17 00:00:00 2001 From: Martin Traverse Date: Thu, 31 Oct 2024 08:09:34 +0000 Subject: [PATCH] Feature / Runtime child jobs, parallel and sequential job groups (#465) * Suggested metadata for handling sequential and parallel job groups * Make graph builder hold stateful information on job resources, namespace etc * Use EJobValidation for errors in the graph builder * Make the graph builder try to continue after it hits an error * Add graph definitions, functions and builder logic to express child jobs * Engine updates to handle processing for child jobs * Update dev mode translator to handle job recursion (resources are stateful, job def is stateless) * Update code that depends on the new dev mode translator * Create an example import -> process -> export job group * Fix one warning in the graph module * Add some extra paths to git ignore for example data * Add the import -> process -> export group job to the CI job for example models * Minor fixes and tidy-ups * Remove an unneeded code change --- .gitignore | 2 + examples/models/python/config/job_group.yaml | 49 ++ .../models/python/src/tutorial/data_import.py | 30 ++ .../models/python/src/tutorial/job_group.py | 17 + .../src/main/proto/tracdap/metadata/job.proto | 45 ++ .../python/src/tracdap/rt/_exec/dev_mode.py | 369 ++++++++----- .../python/src/tracdap/rt/_exec/engine.py | 276 +++++++--- .../python/src/tracdap/rt/_exec/functions.py | 15 +- .../python/src/tracdap/rt/_exec/graph.py | 9 + .../src/tracdap/rt/_exec/graph_builder.py | 493 +++++++++++------- .../python/src/tracdap/rt/_exec/runtime.py | 6 +- .../test/tracdap_examples/test_tutorial.py | 7 + .../tracdap_test/rt/jobs/test_core_jobs.py | 8 +- 13 files changed, 922 insertions(+), 404 deletions(-) create mode 100644 examples/models/python/config/job_group.yaml create mode 100644 examples/models/python/src/tutorial/job_group.py diff --git a/.gitignore b/.gitignore index d466e3ae4..8a501945e 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,8 @@ /examples/models/python/data/outputs/** /examples/models/python/data/data/** +/examples/models/python/data/primary/** +/examples/models/python/data/generated/** /examples/models/python/data/exports/** !/examples/models/python/data/exports/.keep /examples/apps/javascript/node_modules/** diff --git a/examples/models/python/config/job_group.yaml b/examples/models/python/config/job_group.yaml new file mode 100644 index 000000000..c02b258ef --- /dev/null +++ b/examples/models/python/config/job_group.yaml @@ -0,0 +1,49 @@ + + +job: + jobGroup: + sequential: + jobs: + + - importData: + + model: tutorial.data_import.SimpleDataImport + + parameters: + storage_key: staging_data + source_file: sample_data.parquet + + outputs: + customer_loans: primary/data_import/customer_loans.csv + + storageAccess: + - staging_data + + - runModel: + + model: tutorial.using_data.UsingDataModel + + parameters: + eur_usd_rate: 1.2071 + default_weighting: 1.5 + filter_defaults: false + + inputs: + customer_loans: primary/data_import/customer_loans.csv + + outputs: + profit_by_region: generated/using_data/profit_by_region.csv + + - exportData: + + model: tutorial.data_export.DataExportExample + + parameters: + storage_key: exported_data + export_comment: "Exporting some example data" + + inputs: + profit_by_region: generated/using_data/profit_by_region.csv + + storageAccess: + - exported_data diff --git a/examples/models/python/src/tutorial/data_import.py b/examples/models/python/src/tutorial/data_import.py index 91c6ed9a1..101bc04b7 100644 --- a/examples/models/python/src/tutorial/data_import.py +++ b/examples/models/python/src/tutorial/data_import.py @@ -18,6 +18,8 @@ import pandas as pd import pytz +import tutorial.schemas as schemas + class BulkDataImport(trac.TracDataImport): @@ -149,6 +151,34 @@ def run_model(self, ctx: trac.TracDataContext): ctx.log().warning(f"Requested table [{table_name}] not found in storage [{storage_key}]") + +class SimpleDataImport(trac.TracDataImport): + + def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]: + + return trac.define_parameters( + trac.P("storage_key", trac.STRING, "TRAC external storage key"), + trac.P("source_file", trac.STRING, "Path of the source data file in external storage")) + + def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]: + + customer_loans = trac.load_schema(schemas, "customer_loans.csv") + + return {"customer_loans": trac.ModelOutputSchema(customer_loans)} + + def run_model(self, ctx: trac.TracDataContext): + + storage_key = ctx.get_parameter("storage_key") + storage = ctx.get_file_storage(storage_key) + + storage_file = ctx.get_parameter("source_file") + + with storage.read_byte_stream(storage_file) as file_stream: + + dataset = pd.read_parquet(file_stream) + ctx.put_pandas_table("customer_loans", dataset) + + if __name__ == "__main__": import tracdap.rt.launch as launch launch.launch_model(BulkDataImport, "config/data_import.yaml", "config/sys_config.yaml") diff --git a/examples/models/python/src/tutorial/job_group.py b/examples/models/python/src/tutorial/job_group.py new file mode 100644 index 000000000..8d5f9b5b7 --- /dev/null +++ b/examples/models/python/src/tutorial/job_group.py @@ -0,0 +1,17 @@ +# Copyright 2024 Accenture Global Solutions Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tracdap.rt.launch as launch + +launch.launch_job("config/job_group.yaml", "config/sys_config.yaml", dev_mode=True) diff --git a/tracdap-api/tracdap-metadata/src/main/proto/tracdap/metadata/job.proto b/tracdap-api/tracdap-metadata/src/main/proto/tracdap/metadata/job.proto index 0c1f5ab44..9f7761b4f 100644 --- a/tracdap-api/tracdap-metadata/src/main/proto/tracdap/metadata/job.proto +++ b/tracdap-api/tracdap-metadata/src/main/proto/tracdap/metadata/job.proto @@ -45,6 +45,9 @@ enum JobType { /// Export data to external locations EXPORT_DATA = 5; + + /// A job built from a collection of other jobs + JOB_GROUP = 6; } /** @@ -98,6 +101,7 @@ message JobDefinition { ImportModelJob importModel = 4; ImportDataJob importData = 5; ExportDataJob exportData = 6; + JobGroup jobGroup = 7; } } @@ -187,3 +191,44 @@ message ExportDataJob { repeated TagUpdate outputAttrs = 8; } + + +/** + * Specify the group type for a JOB_GROUP job + */ +enum JobGroupType { + JOB_GROUP_TYPE_NOT_SET = 0; + SEQUENTIAL_JOB_GROUP = 1; + PARALLEL_JOB_GROUP = 2; +} + +/** + * Specification for a JOB_GROUP job, which runs a collection of other jobs + */ +message JobGroup { + + JobGroupType jobGroupType = 1; + + oneof jobGroupDetails { + SequentialJobGroup sequential = 2; + ParallelJobGroup parallel = 3; + } +} + +/** + * A job group where each job runs in sequence + */ +message SequentialJobGroup { + + repeated JobDefinition jobs = 1; +} + +/** + * A job group where all jobs runs in parallel + */ +message ParallelJobGroup { + + repeated JobDefinition jobs = 1; +} + + diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/dev_mode.py b/tracdap-runtime/python/src/tracdap/rt/_exec/dev_mode.py index a6d59ea8a..94a8cb8d9 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/dev_mode.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/dev_mode.py @@ -34,7 +34,15 @@ re.compile(r"job\.\w+\.outputs\.\w+"), re.compile(r"job\.\w+\.models\.\w+"), re.compile(r"job\.\w+\.model"), - re.compile(r"job\.\w+\.flow")] + re.compile(r"job\.\w+\.flow"), + + re.compile(r".*\.jobs\.\d+\.\w+\.parameters\.\w+"), + re.compile(r".*\.jobs\.\d+\.\w+\.inputs\.\w+"), + re.compile(r".*\.jobs\.\d+\.\w+\.outputs\.\w+"), + re.compile(r".*\.jobs\.\d+\.\w+\.models\.\w+"), + re.compile(r".*\.jobs\.\d+\.\w+\.model"), + re.compile(r".*\.jobs\.\d+\.\w+\.flow") +] DEV_MODE_SYS_CONFIG = [] @@ -58,38 +66,6 @@ def translate_sys_config(cls, sys_config: _cfg.RuntimeConfig, config_mgr: _cfg_p return sys_config - @classmethod - def translate_job_config( - cls, - sys_config: _cfg.RuntimeConfig, - job_config: _cfg.JobConfig, - scratch_dir: pathlib.Path, - config_mgr: _cfg_p.ConfigManager, - model_class: tp.Optional[_api.TracModel.__class__]) \ - -> _cfg.JobConfig: - - cls._log.info(f"Applying dev mode config translation to job config") - - # Protobuf semantics for a blank jobId should be an object, but objectId will be an empty string - if not job_config.jobId or not job_config.jobId.objectId: - job_config = cls._process_job_id(job_config) - - if job_config.job.jobType is None or job_config.job.jobType == _meta.JobType.JOB_TYPE_NOT_SET: - job_config = cls._process_job_type(job_config) - - # Load and populate any models provided as a Python class or class name - job_config = cls._process_models(sys_config, job_config, scratch_dir, model_class) - - # Fow flows, load external flow definitions then perform auto-wiring and type inference - if job_config.job.jobType == _meta.JobType.RUN_FLOW: - job_config = cls._process_flow_definition(job_config, config_mgr) - - # Apply processing to the parameters, inputs and outputs - job_config = cls._process_parameters(job_config) - job_config = cls._process_inputs_and_outputs(sys_config, job_config) - - return job_config - @classmethod def _add_integrated_repo(cls, sys_config: _cfg.RuntimeConfig) -> _cfg.RuntimeConfig: @@ -159,6 +135,86 @@ def _resolve_storage_location(cls, bucket_key, bucket_config, config_mgr: _cfg_p cls._log.error(msg) raise _ex.EConfigParse(msg) + + def __init__(self, sys_config: _cfg.RuntimeConfig, config_mgr: _cfg_p.ConfigManager, scratch_dir: pathlib.Path): + self._sys_config = sys_config + self._config_mgr = config_mgr + self._scratch_dir = scratch_dir + self._model_loader: tp.Optional[_models.ModelLoader] = None + + def translate_job_config( + self, job_config: _cfg.JobConfig, + model_class: tp.Optional[_api.TracModel.__class__] = None) \ + -> _cfg.JobConfig: + + try: + self._log.info(f"Applying dev mode config translation to job config") + + self._model_loader = _models.ModelLoader(self._sys_config, self._scratch_dir) + self._model_loader.create_scope("DEV_MODE_TRANSLATION") + + job_config = copy.deepcopy(job_config) + job_def = job_config.job + + # Protobuf semantics for a blank jobId should be an object, but objectId will be an empty string + if not job_config.jobId or not job_config.jobId.objectId: + job_config = self._process_job_id(job_config) + + job_config, job_def = self.translate_job_def(job_config, job_def, model_class) + job_config.job = job_def + + return job_config + + finally: + self._model_loader.destroy_scope("DEV_MODE_TRANSLATION") + self._model_loader = None + + def translate_job_def( + self, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition, + model_class: tp.Optional[_api.TracModel.__class__] = None) \ + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: + + if job_def.jobType is None or job_def.jobType == _meta.JobType.JOB_TYPE_NOT_SET: + job_def = self._process_job_type(job_def) + + # Load and populate any models provided as a Python class or class name + job_config, job_def = self._process_models(job_config, job_def, model_class) + + # Fow flows, load external flow definitions then perform auto-wiring and type inference + if job_def.jobType == _meta.JobType.RUN_FLOW: + job_config, job_def = self._process_flow_definition(job_config, job_def) + + if job_def.jobType == _meta.JobType.JOB_GROUP: + job_config, job_def = self.translate_job_group(job_config, job_def) + + # Apply processing to the parameters, inputs and outputs + job_config, job_def = self._process_parameters(job_config, job_def) + job_config, job_def = self._process_inputs_and_outputs(job_config, job_def) + + return job_config, job_def + + def translate_job_group( + self, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition) \ + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: + + job_group = job_def.jobGroup + + if job_group.jobGroupType is None or job_group.jobGroupType == _meta.JobGroupType.JOB_GROUP_TYPE_NOT_SET: + job_group = self._process_job_group_type(job_group) + + group_details = self._get_job_group_detail(job_group) + + if hasattr(group_details, "jobs"): + child_jobs = [] + for child_def in group_details.jobs: + job_config, child_def = self.translate_job_def(job_config, child_def) + child_jobs.append(child_def) + group_details.jobs = child_jobs + + job_def.jobGroup = job_group + + return job_config, job_def + @classmethod def _add_job_resource( cls, job_config: _cfg.JobConfig, @@ -183,125 +239,153 @@ def _process_job_id(cls, job_config: _cfg.JobConfig): return translated_config @classmethod - def _process_job_type(cls, job_config: _cfg.JobConfig): + def _process_job_type(cls, job_def: _meta.JobDefinition): - if job_config.job.runModel is not None: + if job_def.runModel is not None: job_type = _meta.JobType.RUN_MODEL - elif job_config.job.runFlow is not None: + elif job_def.runFlow is not None: job_type = _meta.JobType.RUN_FLOW - elif job_config.job.importModel is not None: + elif job_def.importModel is not None: job_type = _meta.JobType.IMPORT_MODEL - elif job_config.job.importData is not None: + elif job_def.importData is not None: job_type = _meta.JobType.IMPORT_DATA - elif job_config.job.exportData is not None: + elif job_def.exportData is not None: job_type = _meta.JobType.EXPORT_DATA + elif job_def.jobGroup is not None: + job_type = _meta.JobType.JOB_GROUP + else: cls._log.error("Could not infer job type") raise _ex.EConfigParse("Could not infer job type") cls._log.info(f"Inferred job type = [{job_type.name}]") - job_def = copy.copy(job_config.job) + job_def = copy.copy(job_def) job_def.jobType = job_type - job_config = copy.copy(job_config) - job_config.job = job_def + return job_def - return job_config + @classmethod + def _process_job_group_type(cls, job_group: _meta.JobGroup) -> _meta.JobGroup: + + if job_group.sequential is not None: + job_group_type = _meta.JobGroupType.SEQUENTIAL_JOB_GROUP + + elif job_group.parallel is not None: + job_group_type = _meta.JobGroupType.PARALLEL_JOB_GROUP + + else: + cls._log.error("Could not infer job group type") + raise _ex.EConfigParse("Could not infer job group type") + + cls._log.info(f"Inferred job group type = [{job_group_type.name}]") + + job_group = copy.copy(job_group) + job_group.jobGroupType = job_group_type + + return job_group @classmethod - def _get_job_detail(cls, job_config: _cfg.JobConfig): + def _get_job_detail(cls, job_def: _meta.JobDefinition): + + if job_def.jobType == _meta.JobType.RUN_MODEL: + return job_def.runModel - if job_config.job.jobType == _meta.JobType.RUN_MODEL: - return job_config.job.runModel + if job_def.jobType == _meta.JobType.RUN_FLOW: + return job_def.runFlow - if job_config.job.jobType == _meta.JobType.RUN_FLOW: - return job_config.job.runFlow + if job_def.jobType == _meta.JobType.IMPORT_MODEL: + return job_def.importModel - if job_config.job.jobType == _meta.JobType.IMPORT_MODEL: - return job_config.job.importModel + if job_def.jobType == _meta.JobType.IMPORT_DATA: + return job_def.importData - if job_config.job.jobType == _meta.JobType.IMPORT_DATA: - return job_config.job.importData + if job_def.jobType == _meta.JobType.EXPORT_DATA: + return job_def.exportData - if job_config.job.jobType == _meta.JobType.EXPORT_DATA: - return job_config.job.exportData + if job_def.jobType == _meta.JobType.JOB_GROUP: + return job_def.jobGroup - raise _ex.EConfigParse(f"Could not get job details for job type [{job_config.job.jobType}]") + raise _ex.EConfigParse(f"Could not get job details for job type [{job_def.jobType}]") @classmethod + def _get_job_group_detail(cls, job_group: _meta.JobGroup): + + if job_group.jobGroupType == _meta.JobGroupType.SEQUENTIAL_JOB_GROUP: + return job_group.sequential + + if job_group.jobGroupType == _meta.JobGroupType.PARALLEL_JOB_GROUP: + return job_group.parallel + + raise _ex.EConfigParse(f"Could not get job group details for group type [{job_group.jobGroupType}]") + def _process_models( - cls, - sys_config: _cfg.RuntimeConfig, - job_config: _cfg.JobConfig, - scratch_dir: pathlib.Path, + self, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition, model_class: tp.Optional[_api.TracModel.__class__]) \ - -> _cfg.JobConfig: - - model_loader = _models.ModelLoader(sys_config, scratch_dir) - model_loader.create_scope("DEV_MODE_TRANSLATION") + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: # This processing works on the assumption that job details follow a convention for addressing models # Jobs requiring a single model have a field called "model" # Jobs requiring multiple models have a field called "models@, which is a dict - job_detail = cls._get_job_detail(job_config) + job_detail = self._get_job_detail(job_def) # If a model class is supplied in code, use that to generate the model def if model_class is not None: # Passing a model class via launch_model() is only supported for job types with a single model if not hasattr(job_detail, "model"): - raise _ex.EJobValidation(f"Job type [{job_config.job.jobType}] cannot be launched using launch_model()") + raise _ex.EJobValidation(f"Job type [{job_def.jobType}] cannot be launched using launch_model()") - model_id, model_obj = cls._generate_model_for_class(model_loader, model_class) + model_id, model_obj = self._generate_model_for_class(model_class) job_detail.model = _util.selector_for(model_id) - job_config = cls._add_job_resource(job_config, model_id, model_obj) + job_config = self._add_job_resource(job_config, model_id, model_obj) # Otherwise look for models specified as a single string, and take that as the entry point else: # Jobs with a single model if hasattr(job_detail, "model") and isinstance(job_detail.model, str): - model_id, model_obj = cls._generate_model_for_entry_point(model_loader, job_detail.model) # noqa + model_id, model_obj = self._generate_model_for_entry_point(job_detail.model) # noqa job_detail.model = _util.selector_for(model_id) - job_config = cls._add_job_resource(job_config, model_id, model_obj) + job_config = self._add_job_resource(job_config, model_id, model_obj) - # Jobs with multiple modlels + elif hasattr(job_detail, "model") and isinstance(job_detail.model, _meta.TagSelector): + if job_detail.model.objectType == _meta.ObjectType.OBJECT_TYPE_NOT_SET: + error = f"Missing required property [model] for job type [{job_def.jobType.name}]" + self._log.error(error) + raise _ex.EJobValidation(error) + + # Jobs with multiple models elif hasattr(job_detail, "models") and isinstance(job_detail.models, dict): for model_key, model_detail in job_detail.models.items(): if isinstance(model_detail, str): - model_id, model_obj = cls._generate_model_for_entry_point(model_loader, model_detail) + model_id, model_obj = self._generate_model_for_entry_point(model_detail) job_detail.models[model_key] = _util.selector_for(model_id) - job_config = cls._add_job_resource(job_config, model_id, model_obj) - - model_loader.destroy_scope("DEV_MODE_TRANSLATION") + job_config = self._add_job_resource(job_config, model_id, model_obj) - return job_config + return job_config, job_def - @classmethod def _generate_model_for_class( - cls, model_loader: _models.ModelLoader, model_class: _api.TracModel.__class__) \ + self, model_class: _api.TracModel.__class__) \ -> (_meta.TagHeader, _meta.ObjectDefinition): model_entry_point = f"{model_class.__module__}.{model_class.__name__}" + return self._generate_model_for_entry_point(model_entry_point) - return cls._generate_model_for_entry_point(model_loader, model_entry_point) - - @classmethod def _generate_model_for_entry_point( - cls, model_loader: _models.ModelLoader, model_entry_point: str) \ + self, model_entry_point: str) \ -> (_meta.TagHeader, _meta.ObjectDefinition): model_id = _util.new_object_id(_meta.ObjectType.MODEL) model_key = _util.object_key(model_id) - cls._log.info(f"Generating model definition for [{model_entry_point}] with ID = [{model_key}]") + self._log.info(f"Generating model definition for [{model_entry_point}] with ID = [{model_key}]") skeleton_modeL_def = _meta.ModelDefinition( # noqa language="python", @@ -312,8 +396,8 @@ def _generate_model_for_entry_point( inputs={}, outputs={}) - model_class = model_loader.load_model_class("DEV_MODE_TRANSLATION", skeleton_modeL_def) - model_def = model_loader.scan_model(skeleton_modeL_def, model_class) + model_class = self._model_loader.load_model_class("DEV_MODE_TRANSLATION", skeleton_modeL_def) + model_def = self._model_loader.scan_model(skeleton_modeL_def, model_class) model_object = _meta.ObjectDefinition( objectType=_meta.ObjectType.MODEL, @@ -321,56 +405,57 @@ def _generate_model_for_entry_point( return model_id, model_object - @classmethod - def _process_flow_definition(cls, job_config: _cfg.JobConfig, config_mgr: _cfg_p.ConfigManager) -> _cfg.JobConfig: + def _process_flow_definition( + self, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition) \ + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: - flow_details = job_config.job.runFlow.flow + flow_details = job_def.runFlow.flow # Do not apply translation if flow is specified as an object ID / selector (assume full config is supplied) if isinstance(flow_details, _meta.TagHeader) or isinstance(flow_details, _meta.TagSelector): - return job_config + return job_config, job_def # Otherwise, flow is specified as the path to dev-mode flow definition if not isinstance(flow_details, str): err = f"Invalid config value for [job.runFlow.flow]: Expected path or tag selector, got [{flow_details}])" - cls._log.error(err) + self._log.error(err) raise _ex.EConfigParse(err) flow_id = _util.new_object_id(_meta.ObjectType.FLOW) flow_key = _util.object_key(flow_id) - cls._log.info(f"Generating flow definition from [{flow_details}] with ID = [{flow_key}]") + self._log.info(f"Generating flow definition from [{flow_details}] with ID = [{flow_key}]") - flow_def = config_mgr.load_config_object(flow_details, _meta.FlowDefinition) + flow_def = self._config_mgr.load_config_object(flow_details, _meta.FlowDefinition) # Validate models against the flow (this could move to _impl.validation and check prod jobs as well) - cls._check_models_for_flow(flow_def, job_config) + self._check_models_for_flow(flow_def, job_def, job_config) # Auto-wiring and inference only applied to externally loaded flows for now - flow_def = cls._autowire_flow(flow_def, job_config) - flow_def = cls._apply_type_inference(flow_def, job_config) + flow_def = self._autowire_flow(flow_def, job_def, job_config) + flow_def = self._apply_type_inference(flow_def, job_def, job_config) flow_obj = _meta.ObjectDefinition( objectType=_meta.ObjectType.FLOW, flow=flow_def) + job_def = copy.copy(job_def) + job_def.runFlow = copy.copy(job_def.runFlow) + job_def.runFlow.flow = _util.selector_for(flow_id) + job_config = copy.copy(job_config) - job_config.job = copy.copy(job_config.job) - job_config.job.runFlow = copy.copy(job_config.job.runFlow) job_config.resources = copy.copy(job_config.resources) + job_config = self._add_job_resource(job_config, flow_id, flow_obj) - job_config = cls._add_job_resource(job_config, flow_id, flow_obj) - job_config.job.runFlow.flow = _util.selector_for(flow_id) - - return job_config + return job_config, job_def @classmethod - def _check_models_for_flow(cls, flow: _meta.FlowDefinition, job_config: _cfg.JobConfig): + def _check_models_for_flow(cls, flow: _meta.FlowDefinition, job_def: _meta.JobDefinition, job_config: _cfg.JobConfig): model_nodes = dict(filter(lambda n: n[1].nodeType == _meta.FlowNodeType.MODEL_NODE, flow.nodes.items())) - missing_models = list(filter(lambda m: m not in job_config.job.runFlow.models, model_nodes.keys())) - extra_models = list(filter(lambda m: m not in model_nodes, job_config.job.runFlow.models.keys())) + missing_models = list(filter(lambda m: m not in job_def.runFlow.models, model_nodes.keys())) + extra_models = list(filter(lambda m: m not in model_nodes, job_def.runFlow.models.keys())) if any(missing_models): error = f"Missing models in job definition: {', '.join(missing_models)}" @@ -384,7 +469,7 @@ def _check_models_for_flow(cls, flow: _meta.FlowDefinition, job_config: _cfg.Job for model_name, model_node in model_nodes.items(): - model_selector = job_config.job.runFlow.models[model_name] + model_selector = job_def.runFlow.models[model_name] model_obj = _util.get_job_resource(model_selector, job_config) model_inputs = set(model_obj.model.inputs.keys()) @@ -396,9 +481,9 @@ def _check_models_for_flow(cls, flow: _meta.FlowDefinition, job_config: _cfg.Job raise _ex.EJobValidation(error) @classmethod - def _autowire_flow(cls, flow: _meta.FlowDefinition, job_config: _cfg.JobConfig): + def _autowire_flow(cls, flow: _meta.FlowDefinition, job_def: _meta.JobDefinition, job_config: _cfg.JobConfig): - job = job_config.job.runFlow + job = job_def.runFlow nodes = copy.copy(flow.nodes) edges: tp.Dict[str, _meta.FlowEdge] = dict() @@ -485,7 +570,10 @@ def add_edge(target: _meta.FlowSocket): return autowired_flow @classmethod - def _apply_type_inference(cls, flow: _meta.FlowDefinition, job_config: _cfg.JobConfig) -> _meta.FlowDefinition: + def _apply_type_inference( + cls, flow: _meta.FlowDefinition, + job_def: _meta.JobDefinition, job_config: _cfg.JobConfig) \ + -> _meta.FlowDefinition: updated_flow = copy.copy(flow) updated_flow.parameters = copy.copy(flow.parameters) @@ -506,17 +594,17 @@ def socket_key(socket): if node.nodeType == _meta.FlowNodeType.PARAMETER_NODE and node_name not in flow.parameters: targets = edges_by_source.get(node_name) or [] - model_parameter = cls._infer_parameter(node_name, targets, job_config) + model_parameter = cls._infer_parameter(node_name, targets, job_def, job_config) updated_flow.parameters[node_name] = model_parameter if node.nodeType == _meta.FlowNodeType.INPUT_NODE and node_name not in flow.inputs: targets = edges_by_source.get(node_name) or [] - model_input = cls._infer_input_schema(node_name, targets, job_config) + model_input = cls._infer_input_schema(node_name, targets, job_def, job_config) updated_flow.inputs[node_name] = model_input if node.nodeType == _meta.FlowNodeType.OUTPUT_NODE and node_name not in flow.outputs: sources = edges_by_target.get(node_name) or [] - model_output = cls._infer_output_schema(node_name, sources, job_config) + model_output = cls._infer_output_schema(node_name, sources, job_def, job_config) updated_flow.outputs[node_name] = model_output return updated_flow @@ -524,13 +612,14 @@ def socket_key(socket): @classmethod def _infer_parameter( cls, param_name: str, targets: tp.List[_meta.FlowSocket], - job_config: _cfg.JobConfig) -> _meta.ModelParameter: + job_def: _meta.JobDefinition, job_config: _cfg.JobConfig) \ + -> _meta.ModelParameter: model_params = [] for target in targets: - model_selector = job_config.job.runFlow.models.get(target.node) + model_selector = job_def.runFlow.models.get(target.node) model_obj = _util.get_job_resource(model_selector, job_config) model_param = model_obj.model.parameters.get(target.socket) model_params.append(model_param) @@ -560,13 +649,14 @@ def _infer_parameter( @classmethod def _infer_input_schema( cls, input_name: str, targets: tp.List[_meta.FlowSocket], - job_config: _cfg.JobConfig) -> _meta.ModelInputSchema: + job_def: _meta.JobDefinition, job_config: _cfg.JobConfig) \ + -> _meta.ModelInputSchema: model_inputs = [] for target in targets: - model_selector = job_config.job.runFlow.models.get(target.node) + model_selector = job_def.runFlow.models.get(target.node) model_obj = _util.get_job_resource(model_selector, job_config) model_input = model_obj.model.inputs.get(target.socket) model_inputs.append(model_input) @@ -594,13 +684,14 @@ def _infer_input_schema( @classmethod def _infer_output_schema( cls, output_name: str, sources: tp.List[_meta.FlowSocket], - job_config: _cfg.JobConfig) -> _meta.ModelOutputSchema: + job_def: _meta.JobDefinition, job_config: _cfg.JobConfig) \ + -> _meta.ModelOutputSchema: model_outputs = [] for source in sources: - model_selector = job_config.job.runFlow.models.get(source.node) + model_selector = job_def.runFlow.models.get(source.node) model_obj = _util.get_job_resource(model_selector, job_config) model_input = model_obj.model.inputs.get(source.socket) model_outputs.append(model_input) @@ -624,11 +715,13 @@ def _socket_key(cls, socket): return f"{socket.node}.{socket.socket}" if socket.socket else socket.node @classmethod - def _process_parameters(cls, job_config: _cfg.JobConfig) -> _cfg.JobConfig: + def _process_parameters( + cls, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition) \ + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: # This relies on convention for naming properties across similar job types - job_detail = cls._get_job_detail(job_config) + job_detail = cls._get_job_detail(job_def) if hasattr(job_detail, "model"): model_key = _util.object_key(job_detail.model) @@ -646,7 +739,7 @@ def _process_parameters(cls, job_config: _cfg.JobConfig) -> _cfg.JobConfig: job_detail.parameters = cls._process_parameters_dict(param_specs, raw_values) - return job_config + return job_config, job_def @classmethod def _process_parameters_dict( @@ -677,10 +770,11 @@ def _process_parameters_dict( return encoded_values - @classmethod - def _process_inputs_and_outputs(cls, sys_config: _cfg.RuntimeConfig, job_config: _cfg.JobConfig) -> _cfg.JobConfig: + def _process_inputs_and_outputs( + self, job_config: _cfg.JobConfig, job_def: _meta.JobDefinition) \ + -> tp.Tuple[_cfg.JobConfig, _meta.JobDefinition]: - job_detail = cls._get_job_detail(job_config) + job_detail = self._get_job_detail(job_def) if hasattr(job_detail, "model"): model_obj = _util.get_job_resource(job_detail.model, job_config) @@ -693,7 +787,7 @@ def _process_inputs_and_outputs(cls, sys_config: _cfg.RuntimeConfig, job_config: required_outputs = flow_obj.flow.outputs else: - return job_config + return job_config, job_def job_inputs = job_detail.inputs job_outputs = job_detail.outputs @@ -705,8 +799,8 @@ def _process_inputs_and_outputs(cls, sys_config: _cfg.RuntimeConfig, job_config: model_input = required_inputs[input_key] input_schema = model_input.schema if model_input and not model_input.dynamic else None - input_id = cls._process_input_or_output( - sys_config, input_key, input_value, job_resources, + input_id = self._process_input_or_output( + input_key, input_value, job_resources, new_unique_file=False, schema=input_schema) job_inputs[input_key] = _util.selector_for(input_id) @@ -717,17 +811,16 @@ def _process_inputs_and_outputs(cls, sys_config: _cfg.RuntimeConfig, job_config: model_output= required_outputs[output_key] output_schema = model_output.schema if model_output and not model_output.dynamic else None - output_id = cls._process_input_or_output( - sys_config, output_key, output_value, job_resources, + output_id = self._process_input_or_output( + output_key, output_value, job_resources, new_unique_file=True, schema=output_schema) job_outputs[output_key] = _util.selector_for(output_id) - return job_config + return job_config, job_def - @classmethod def _process_input_or_output( - cls, sys_config, data_key, data_value, + self, data_key, data_value, resources: tp.Dict[str, _meta.ObjectDefinition], new_unique_file=False, schema: tp.Optional[_meta.SchemaDefinition] = None) \ @@ -738,8 +831,8 @@ def _process_input_or_output( if isinstance(data_value, str): storage_path = data_value - storage_key = sys_config.storage.defaultBucket - storage_format = cls.infer_format(storage_path, sys_config.storage) + storage_key = self._sys_config.storage.defaultBucket + storage_format = self.infer_format(storage_path, self._sys_config.storage) snap_version = 1 elif isinstance(data_value, dict): @@ -749,14 +842,14 @@ def _process_input_or_output( if not storage_path: raise _ex.EConfigParse(f"Invalid configuration for input [{data_key}] (missing required value 'path'") - storage_key = data_value.get("storageKey") or sys_config.storage.defaultBucket - storage_format = data_value.get("format") or cls.infer_format(storage_path, sys_config.storage) + storage_key = data_value.get("storageKey") or self._sys_config.storage.defaultBucket + storage_format = data_value.get("format") or self.infer_format(storage_path, self._sys_config.storage) snap_version = 1 else: raise _ex.EConfigParse(f"Invalid configuration for input '{data_key}'") - cls._log.info(f"Generating data definition for [{data_key}] with ID = [{_util.object_key(data_id)}]") + self._log.info(f"Generating data definition for [{data_key}] with ID = [{_util.object_key(data_id)}]") # For unique outputs, increment the snap number to find a new unique snap # These are not incarnations, bc likely in dev mode model code and inputs are changing @@ -764,7 +857,7 @@ def _process_input_or_output( if new_unique_file: - x_storage_mgr = _storage.StorageManager(sys_config) + x_storage_mgr = _storage.StorageManager(self._sys_config) x_storage = x_storage_mgr.get_file_storage(storage_key) x_orig_path = pathlib.PurePath(storage_path) x_name = x_orig_path.name @@ -781,9 +874,9 @@ def _process_input_or_output( x_name = f"{x_orig_path.stem}-{snap_version}" storage_path = str(x_orig_path.parent.joinpath(x_name)) - cls._log.info(f"Output for [{data_key}] will be snap version {snap_version}") + self._log.info(f"Output for [{data_key}] will be snap version {snap_version}") - data_obj, storage_obj = cls._generate_input_definition( + data_obj, storage_obj = self._generate_input_definition( data_id, storage_id, storage_key, storage_path, storage_format, snap_index=snap_version, delta_index=1, incarnation_index=1, schema=schema) diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/engine.py b/tracdap-runtime/python/src/tracdap/rt/_exec/engine.py index 0624ad3e0..9f69122a0 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/engine.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/engine.py @@ -39,8 +39,9 @@ class _EngineNode: """ node: _graph.Node - dependencies: tp.Dict[NodeId, _graph.DependencyType] function: tp.Optional[_func.NodeFunction] = None + + dependencies: tp.Dict[NodeId, _graph.DependencyType] = dc.field(default_factory=dict) complete: bool = False result: tp.Optional[tp.Any] = None error: tp.Optional[str] = None @@ -57,21 +58,35 @@ class _EngineContext: Represents the state of an execution graph being processed by the TRAC engine """ + engine_id: _actors.ActorId + job_key: str + root_id: NodeId + nodes: tp.Dict[NodeId, _EngineNode] pending_nodes: tp.Set[NodeId] = dc.field(default_factory=set) active_nodes: tp.Set[NodeId] = dc.field(default_factory=set) succeeded_nodes: tp.Set[NodeId] = dc.field(default_factory=set) failed_nodes: tp.Set[NodeId] = dc.field(default_factory=set) + def with_updates( + self, nodes, + pending_nodes, active_nodes, + succeeded_nodes, failed_nodes) -> "_EngineContext": + + return _EngineContext( + self.engine_id, self.job_key, self.root_id, nodes, + pending_nodes, active_nodes, succeeded_nodes, failed_nodes) + @dc.dataclass class _JobState: job_id: _meta.TagHeader - job_config: _cfg.JobConfig - actor_id: _actors.ActorId = None + monitors: tp.List[_actors.ActorId] = dc.field(default_factory=list) + + job_config: _cfg.JobConfig = None job_result: _cfg.JobResult = None job_error: Exception = None @@ -154,14 +169,35 @@ def submit_job( self._log.info(f"Job submitted: [{job_key}]") - job_processor = JobProcessor(job_key, job_config, result_spec,self._models, self._storage) + job_processor = JobProcessor(self._models, self._storage, job_key, job_config, result_spec, graph_spec=None) job_actor_id = self.actors().spawn(job_processor) - job_state = _JobState(job_config.jobId, job_config) + job_monitor_success = lambda ctx, key, result: self._notify_callback(key, result, None) + job_monitor_failure = lambda ctx, key, error: self._notify_callback(key, None, error) + job_monitor = JobMonitor(job_key, job_monitor_success, job_monitor_failure) + job_monitor_id = self.actors().spawn(job_monitor) + + job_state = _JobState(job_config.jobId) job_state.actor_id = job_actor_id + job_state.monitors.append(job_monitor_id) + job_state.job_config = job_config self._jobs[job_key] = job_state + @_actors.Message + def submit_child_job(self, child_id: _meta.TagHeader, child_graph: _graph.Graph, monitor_id: _actors.ActorId): + + child_key = _util.object_key(child_id) + + child_processor = JobProcessor(self._models, self._storage, child_key, None, None, graph_spec=child_graph) # noqa + child_actor_id = self.actors().spawn(child_processor) + + child_state = _JobState(child_id) + child_state.actor_id = child_actor_id + child_state.monitors.append(monitor_id) + + self._jobs[child_key] = child_state + @_actors.Message def get_job_list(self): @@ -184,11 +220,13 @@ def job_succeeded(self, job_key: str, job_result: _cfg.JobResult): self._log.info(f"Recording job as successful: {job_key}") - self._jobs[job_key].job_result = job_result - self._finalize_job(job_key) + job_state = self._jobs[job_key] + job_state.job_result = job_result + + for monitor_id in job_state.monitors: + self.actors().send(monitor_id, "job_succeeded", job_result) - if self._notify_callback is not None: - self._notify_callback(job_key, job_result, None) + self._finalize_job(job_key) @_actors.Message def job_failed(self, job_key: str, error: Exception): @@ -200,11 +238,13 @@ def job_failed(self, job_key: str, error: Exception): self._log.error(f"Recording job as failed: {job_key}") - self._jobs[job_key].job_error = error - self._finalize_job(job_key) + job_state = self._jobs[job_key] + job_state.job_error = error + + for monitor_id in job_state.monitors: + self.actors().send(monitor_id, "job_failed", error) - if self._notify_callback is not None: - self._notify_callback(job_key, None, error) + self._finalize_job(job_key) def _finalize_job(self, job_key: str): @@ -214,10 +254,17 @@ def _finalize_job(self, job_key: str): # For now each instance of the runtime only processes one job so no need to worry job_state = self._jobs.get(job_key) - job_actor_id = job_state.actor_id if job_state is not None else None - if job_actor_id is not None: - self.actors().stop(job_actor_id) + # Stop any monitors that were created directly by the engine + # (Other actors are responsible for stopping their own monitors) + while job_state.monitors: + monitor_id = job_state.monitors.pop() + monitor_parent = monitor_id[:monitor_id.rfind('/')] + if self.actors().id == monitor_parent: + self.actors().stop(monitor_id) + + if job_state.actor_id is not None: + self.actors().stop(job_state.actor_id ) job_state.actor_id = None def _get_job_info(self, job_key: str, details: bool = False) -> tp.Optional[_cfg.JobResult]: @@ -251,6 +298,35 @@ def _get_job_info(self, job_key: str, details: bool = False) -> tp.Optional[_cfg return job_result +class JobMonitor(_actors.Actor): + + def __init__( + self, job_key: str, + success_func: tp.Callable[[_actors.ActorContext, str, _cfg.JobResult], None], + failure_func: tp.Callable[[_actors.ActorContext, str, Exception], None]): + + super().__init__() + self._job_key = job_key + self._success_func = success_func + self._failure_func = failure_func + self._signal_sent = False + + @_actors.Message + def job_succeeded(self, job_result: _cfg.JobResult): + self._success_func(self.actors(), self._job_key, job_result) + self._signal_sent = True + + @_actors.Message + def job_failed(self, error: Exception): + self._failure_func(self.actors(), self._job_key, error) + self._signal_sent = True + + def on_stop(self): + if not self._signal_sent: + error = _ex.ETracInternal(f"No result was received for job [{self._job_key}]") + self._failure_func(self.actors(), self._job_key, error) + + class JobProcessor(_actors.Actor): """ @@ -259,26 +335,32 @@ class JobProcessor(_actors.Actor): """ def __init__( - self, job_key, job_config: _cfg.JobConfig, - result_spec: _graph.JobResultSpec, - models: _models.ModelLoader, - storage: _storage.StorageManager): + self, models: _models.ModelLoader, storage: _storage.StorageManager, + job_key: str, job_config: _cfg.JobConfig, result_spec: _graph.JobResultSpec, + graph_spec: tp.Optional[_graph.Graph]): super().__init__() self.job_key = job_key self.job_config = job_config self.result_spec = result_spec + self.graph_spec = graph_spec self._models = models self._storage = storage self._resolver = _func.FunctionResolver(models, storage) self._log = _util.logger_for_object(self) def on_start(self): + self._log.info(f"Starting job [{self.job_key}]") self._models.create_scope(self.job_key) - self.actors().spawn(GraphBuilder(self.job_config, self.result_spec, self._resolver)) + + if self.graph_spec is not None: + self.actors().send(self.actors().id, "build_graph_succeeded", self.graph_spec) + else: + self.actors().spawn(GraphBuilder(self.job_config, self.result_spec)) def on_stop(self): + self._log.info(f"Cleaning up job [{self.job_key}]") self._models.destroy_scope(self.job_key) @@ -303,9 +385,26 @@ def on_signal(self, signal: _actors.Signal) -> tp.Optional[bool]: return super().on_signal(signal) @_actors.Message - def job_graph(self, graph: _EngineContext, root_id: NodeId): - self.actors().spawn(GraphProcessor(graph, root_id, self._resolver)) - self.actors().stop(self.actors().sender) + def build_graph_succeeded(self, graph_spec: _graph.Graph): + + # Build a new engine context graph from the graph spec + engine_id = self.actors().parent + nodes = dict((node_id, _EngineNode(node)) for node_id, node in graph_spec.nodes.items()) + graph = _EngineContext(engine_id, self.job_key, graph_spec.root_id, nodes) + + # Add all the nodes as pending nodes to start + graph.pending_nodes.update(graph.nodes.keys()) + + self.actors().spawn(FunctionResolver(self._resolver, graph)) + if self.actors().sender != self.actors().id and self.actors().sender != self.actors().parent: + self.actors().stop(self.actors().sender) + + @_actors.Message + def resolve_functions_succeeded(self, graph: _EngineContext): + + self.actors().spawn(GraphProcessor(graph, self._resolver)) + if self.actors().sender != self.actors().id and self.actors().sender != self.actors().parent: + self.actors().stop(self.actors().sender) @_actors.Message def job_succeeded(self, job_result: _cfg.JobResult): @@ -323,44 +422,54 @@ def job_failed(self, error: Exception): class GraphBuilder(_actors.Actor): """ - GraphBuilder is a worker (actors.Worker) responsible for building the execution graph for a job - The logic for graph building is provided in graph_builder.py + GraphBuilder is a worker (actor) to wrap the GraphBuilder logic from graph_builder.py """ - def __init__( - self, job_config: _cfg.JobConfig, - result_spec: _graph.JobResultSpec, - resolver: _func.FunctionResolver): - + def __init__(self, job_config: _cfg.JobConfig, result_spec: _graph.JobResultSpec): super().__init__() self.job_config = job_config self.result_spec = result_spec - self.graph: tp.Optional[_EngineContext] = None - - self._resolver = resolver self._log = _util.logger_for_object(self) def on_start(self): + self.build_graph(self, self.job_config) + + @_actors.Message + def build_graph(self, job_config: _cfg.JobConfig): self._log.info("Building execution graph") # TODO: Get sys config, or find a way to pass storage settings - graph_data = _graph.GraphBuilder.build_job(self.job_config, self.result_spec) - graph_nodes = {node_id: _EngineNode(node, {}) for node_id, node in graph_data.nodes.items()} - graph = _EngineContext(graph_nodes, pending_nodes=set(graph_nodes.keys())) + graph_builder = _graph.GraphBuilder(job_config, self.result_spec) + graph_spec = graph_builder.build_job(job_config.job) - self._log.info("Resolving graph nodes to executable code") + self.actors().reply("build_graph_succeeded", graph_spec) - for node_id, node in graph.nodes.items(): - node.function = self._resolver.resolve_node(node.node) +class FunctionResolver(_actors.Actor): + + """ + GraphResolver is a worker (actors) to wrap the FunctionResolver logic in functions.py + """ + + def __init__(self, resolver: _func.FunctionResolver, graph: _EngineContext): + super().__init__() self.graph = graph - self.actors().send_parent("job_graph", self.graph, graph_data.root_id) + self._resolver = resolver + self._log = _util.logger_for_object(self) + + def on_start(self): + self.resolve_functions(self, self.graph) @_actors.Message - def get_execution_graph(self): + def resolve_functions(self, graph: _EngineContext): - self.actors().send(self.actors().sender, "job_graph", self.graph) + self._log.info("Resolving graph nodes to executable code") + + for node_id, node in graph.nodes.items(): + node.function = self._resolver.resolve_node(node.node) + + self.actors().reply("resolve_functions_succeeded", graph) class GraphProcessor(_actors.Actor): @@ -376,10 +485,10 @@ class GraphProcessor(_actors.Actor): Once all running nodes are stopped, an error is reported to the parent """ - def __init__(self, graph: _EngineContext, root_id: NodeId, resolver: _func.FunctionResolver): + def __init__(self, graph: _EngineContext, resolver: _func.FunctionResolver): super().__init__() self.graph = graph - self.root_id = root_id + self.root_id_ = graph.root_id self.processors: tp.Dict[NodeId, _actors.ActorId] = dict() self._resolver = resolver self._log = _util.logger_for_object(self) @@ -427,12 +536,14 @@ def process_graph(graph: _EngineContext) -> _EngineContext: # Model and data nodes map to different thread pools in the actors engine # There is scope for a much more sophisticated approach, with prioritized scheduling - if isinstance(node.node, _graph.RunModelNode) or isinstance(node.node, _graph.ImportModelNode): - processor = ModelNodeProcessor(processed_graph, node_id, node) + if isinstance(node.node, _graph.ChildJobNode): + processor = ChildJobNodeProcessor(processed_graph, node) + elif isinstance(node.node, _graph.RunModelNode) or isinstance(node.node, _graph.ImportModelNode): + processor = ModelNodeProcessor(processed_graph, node) elif isinstance(node.node, _graph.LoadDataNode) or isinstance(node.node, _graph.SaveDataNode): - processor = DataNodeProcessor(processed_graph, node_id, node) + processor = DataNodeProcessor(processed_graph, node) else: - processor = NodeProcessor(processed_graph, node_id, node) + processor = NodeProcessor(processed_graph, node) # New nodes can be launched with the updated graph # Anything that was pruned is not needed by the new node @@ -502,7 +613,7 @@ def update_graph( for node_id, node in new_nodes.items(): GraphLogger.log_node_add(node) node_func = self._resolver.resolve_node(node) - new_node = _EngineNode(node, {}, function=node_func) + new_node = _EngineNode(node, node_func) new_graph.nodes[node_id] = new_node new_graph.pending_nodes.add(node_id) @@ -625,9 +736,10 @@ def _update_results(self, updates: tp.Dict[NodeId, _EngineNode], context_pop: _g for node_id in list(filter(lambda n: n.namespace == context_pop, nodes)): nodes.pop(node_id) - graph = _EngineContext(nodes, pending_nodes, active_nodes, succeeded_nodes, failed_nodes) + self.graph = self.graph.with_updates( + nodes, pending_nodes, active_nodes, + succeeded_nodes, failed_nodes) - self.graph = graph self.check_job_status() def check_job_status(self, do_submit=True): @@ -657,7 +769,7 @@ def check_job_status(self, do_submit=True): self.actors().send_parent("job_failed", _ex.EModelExec("Job suffered multiple errors", errors)) else: - job_result = self.graph.nodes[self.root_id].result + job_result = self.graph.nodes[self.graph.root_id].result self.actors().send_parent("job_succeeded", job_result) @@ -669,11 +781,12 @@ class NodeProcessor(_actors.Actor): __NONE_TYPE = type(None) - def __init__(self, graph: _EngineContext, node_id: NodeId, node: _EngineNode): + def __init__(self, graph: _EngineContext, node: _EngineNode): super().__init__() self.graph = graph - self.node_id = node_id self.node = node + self.node_id = node.node.id + def on_start(self): @@ -782,14 +895,59 @@ def _check_result_type(self, result): class ModelNodeProcessor(NodeProcessor): - def __init__(self, graph: _EngineContext, node_id: NodeId, node: _EngineNode): - super().__init__(graph, node_id, node) + def __init__(self, graph: _EngineContext, node: _EngineNode): + super().__init__(graph, node) class DataNodeProcessor(NodeProcessor): - def __init__(self, graph: _EngineContext, node_id: NodeId, node: _EngineNode): - super().__init__(graph, node_id, node) + def __init__(self, graph: _EngineContext, node: _EngineNode): + super().__init__(graph, node) + + +class ChildJobNodeProcessor(NodeProcessor): + + def __init__(self, graph: _EngineContext, node: _EngineNode): + super().__init__(graph, node) + + @_actors.Message + def evaluate_node(self): + + NodeLogger.log_node_start(self.node) + + job_id = self.node.node.job_id # noqa + job_key = _util.object_key(job_id) + + node_id = self.actors().id + + def success_callback(ctx, _, result): + ctx.send(node_id, "child_job_succeeded", result) + + def failure_callback(ctx, _, error): + ctx.send(node_id, "child_job_failed", error) + + monitor = JobMonitor(job_key, success_callback, failure_callback) + monitor_id = self.actors().spawn(monitor) + + graph_spec: _graph.Graph = self.node.node.graph # noqa + + self.actors().send(self.graph.engine_id, "submit_child_job", job_id, graph_spec, monitor_id) + + @_actors.Message + def child_job_succeeded(self, job_result: _cfg.JobResult): + + self._check_result_type(job_result) + + NodeLogger.log_node_succeeded(self.node) + + self.actors().send_parent("node_succeeded", self.node_id, job_result) + + @_actors.Message + def child_job_failed(self, job_error: Exception): + + NodeLogger.log_node_failed(self.node, job_error) + + self.actors().send_parent("node_failed", self.node_id, job_error) class GraphLogger: diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/functions.py b/tracdap-runtime/python/src/tracdap/rt/_exec/functions.py index 5e1b3812d..7933a9719 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/functions.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/functions.py @@ -699,7 +699,7 @@ def _execute(self, ctx: NodeContext) -> Bundle[_data.DataView]: output_section = _graph.GraphBuilder.build_runtime_outputs(dynamic_outputs, self.node.id.namespace) new_nodes.update(output_section.nodes) - ctx_id = NodeId.of("trac_build_result", self.node.id.namespace, result_type=None) + ctx_id = NodeId.of("trac_job_result", self.node.id.namespace, result_type=None) new_deps[ctx_id] = list(_graph.Dependency(nid, _graph.DependencyType.HARD) for nid in output_section.outputs) self.node_callback.send_graph_updates(new_nodes, new_deps) @@ -707,6 +707,18 @@ def _execute(self, ctx: NodeContext) -> Bundle[_data.DataView]: return results +class ChildJobFunction(NodeFunction[None]): + + def __init__(self, node: ChildJobNode): + super().__init__() + self.node = node + + def _execute(self, ctx: NodeContext): + # This node should never execute, the engine intercepts child job nodes and provides special handling + raise _ex.ETracInternal("Child job was not processed correctly (this is a bug)") + + + # ---------------------------------------------------------------------------------------------------------------------- # FUNCTION RESOLUTION # ---------------------------------------------------------------------------------------------------------------------- @@ -790,6 +802,7 @@ def resolve_run_model_node(self, node: RunModelNode) -> NodeFunction: DataResultNode: DataResultFunc, StaticValueNode: StaticValueFunc, RuntimeOutputsNode: RuntimeOutputsFunc, + ChildJobNode: ChildJobFunction, BundleItemNode: NoopFunc, NoopNode: NoopFunc, RunModelResultNode: NoopFunc diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/graph.py b/tracdap-runtime/python/src/tracdap/rt/_exec/graph.py index c73ae0913..0d008d895 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/graph.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/graph.py @@ -414,3 +414,12 @@ class SaveJobResultNode(Node[None]): def _node_dependencies(self) -> tp.Dict[NodeId, DependencyType]: return {self.job_result_id: DependencyType.HARD} + + +@_node_type +class ChildJobNode(Node[cfg.JobResult]): + + job_id: meta.TagHeader + job_def: meta.JobDefinition + + graph: Graph diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/graph_builder.py b/tracdap-runtime/python/src/tracdap/rt/_exec/graph_builder.py index 8e1fdbc2e..b47322ee3 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/graph_builder.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/graph_builder.py @@ -22,75 +22,111 @@ class GraphBuilder: - __JOB_BUILD_FUNC = tp.Callable[ - [config.JobConfig, JobResultSpec, NodeNamespace, NodeId], - GraphSection] + __JOB_DETAILS = tp.TypeVar( + "__JOB_DETAILS", + meta.RunModelJob, + meta.RunFlowJob, + meta.ImportModelJob, + meta.ImportDataJob, + meta.ExportDataJob) - @classmethod - def build_job( - cls, job_config: config.JobConfig, - result_spec: JobResultSpec) -> Graph: + __JOB_BUILD_FUNC = tp.Callable[[meta.JobDefinition, NodeId], GraphSection] - if job_config.job.jobType == meta.JobType.IMPORT_MODEL: - return cls.build_standard_job(job_config, result_spec, cls.build_import_model_job) + def __init__(self, job_config: config.JobConfig, result_spec: JobResultSpec): - if job_config.job.jobType == meta.JobType.RUN_MODEL: - return cls.build_standard_job(job_config, result_spec, cls.build_run_model_job) + self._job_config = job_config + self._result_spec = result_spec - if job_config.job.jobType == meta.JobType.RUN_FLOW: - return cls.build_standard_job(job_config, result_spec, cls.build_run_flow_job) + self._job_key = _util.object_key(job_config.jobId) + self._job_namespace = NodeNamespace(self._job_key) - if job_config.job.jobType in [meta.JobType.IMPORT_DATA, meta.JobType.EXPORT_DATA]: - return cls.build_standard_job(job_config, result_spec, cls.build_import_export_data_job) + self._errors = [] - raise _ex.EConfigParse(f"Job type [{job_config.job.jobType}] is not supported yet") + def _child_builder(self, job_id: meta.TagHeader) -> "GraphBuilder": - @classmethod - def build_standard_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - build_func: __JOB_BUILD_FUNC): + builder = GraphBuilder(self._job_config, JobResultSpec(save_result=False)) + builder._job_key = _util.object_key(job_id) + builder._job_namespace = NodeNamespace(builder._job_key) - # Set up the job context + return builder + + def build_job(self, job_def: meta.JobDefinition,) -> Graph: + + try: + + if job_def.jobType == meta.JobType.IMPORT_MODEL: + return self.build_standard_job(job_def, self.build_import_model_job) + + if job_def.jobType == meta.JobType.RUN_MODEL: + return self.build_standard_job(job_def, self.build_run_model_job) + + if job_def.jobType == meta.JobType.RUN_FLOW: + return self.build_standard_job(job_def, self.build_run_flow_job) + + if job_def.jobType in [meta.JobType.IMPORT_DATA, meta.JobType.EXPORT_DATA]: + return self.build_standard_job(job_def, self.build_import_export_data_job) + + if job_def.jobType == meta.JobType.JOB_GROUP: + return self.build_standard_job(job_def, self.build_job_group) + + self._error(_ex.EJobValidation(f"Job type [{job_def.jobType.name}] is not supported yet")) - job_key = _util.object_key(job_config.jobId) - job_namespace = NodeNamespace(job_key) + except Exception as e: - push_id = NodeId("trac_job_push", job_namespace, Bundle[tp.Any]) - push_node = ContextPushNode(push_id, job_namespace) + # If there are recorded, errors, assume unhandled exceptions are a result of those + # Only report the recorded errors, to reduce noise + if any(self._errors): + pass + + # If no errors are recorded, an exception here would be a bug + raise _ex.ETracInternal(f"Unexpected error preparing the job execution graph") from e + + finally: + + if any(self._errors): + + if len(self._errors) == 1: + raise self._errors[0] + else: + err_text = "\n".join(map(str, self._errors)) + raise _ex.EJobValidation("Invalid job configuration\n" + err_text) + + def build_standard_job(self, job_def: meta.JobDefinition, build_func: __JOB_BUILD_FUNC): + + # Set up the job context + + push_id = NodeId("trac_job_push", self._job_namespace, Bundle[tp.Any]) + push_node = ContextPushNode(push_id, self._job_namespace) push_section = GraphSection({push_id: push_node}, must_run=[push_id]) # Build the execution graphs for the main job and results recording - main_section = build_func(job_config, result_spec, job_namespace, push_id) - main_result_id = NodeId.of("trac_build_result", job_namespace, config.JobResult) + main_section = build_func(job_def, push_id) + main_result_id = NodeId.of("trac_job_result", self._job_namespace, config.JobResult) # Clean up the job context - global_result_id = NodeId.of(job_key, NodeNamespace.root(), config.JobResult) + global_result_id = NodeId.of(self._job_key, NodeNamespace.root(), config.JobResult) - pop_id = NodeId("trac_job_pop", job_namespace, Bundle[tp.Any]) + pop_id = NodeId("trac_job_pop", self._job_namespace, Bundle[tp.Any]) pop_mapping = {main_result_id: global_result_id} pop_node = ContextPopNode( - pop_id, job_namespace, pop_mapping, + pop_id, self._job_namespace, pop_mapping, explicit_deps=main_section.must_run, bundle=NodeNamespace.root()) - global_result_node = BundleItemNode(global_result_id, pop_id, job_key) + global_result_node = BundleItemNode(global_result_id, pop_id, self._job_key) pop_section = GraphSection({ pop_id: pop_node, global_result_id: global_result_node}) - job = cls._join_sections(push_section, main_section, pop_section) + job = self._join_sections(push_section, main_section, pop_section) return Graph(job.nodes, global_result_id) - @classmethod - def build_import_model_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - job_namespace: NodeNamespace, job_push_id: NodeId) \ - -> GraphSection: + def build_import_model_job(self, job_def: meta.JobDefinition, job_push_id: NodeId) -> GraphSection: # Main section: run the model import @@ -98,82 +134,142 @@ def build_import_model_job( new_model_id = _util.new_object_id(meta.ObjectType.MODEL) new_model_key = _util.object_key(new_model_id) - model_scope = _util.object_key(job_config.jobId) - import_details = job_config.job.importModel + model_scope = self._job_key + import_details = job_def.importModel - import_id = NodeId.of("trac_import_model", job_namespace, meta.ObjectDefinition) + import_id = NodeId.of("trac_import_model", self._job_namespace, meta.ObjectDefinition) import_node = ImportModelNode(import_id, model_scope, import_details, explicit_deps=[job_push_id]) main_section = GraphSection(nodes={import_id: import_node}) # Build job-level metadata outputs - result_section = cls.build_job_results( - job_config, job_namespace, result_spec, + result_section = self.build_job_results( objects={new_model_key: import_id}, explicit_deps=[job_push_id, *main_section.must_run]) - return cls._join_sections(main_section, result_section) + return self._join_sections(main_section, result_section) - @classmethod - def build_import_export_data_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - job_namespace: NodeNamespace, job_push_id: NodeId) \ - -> GraphSection: + def build_import_export_data_job(self, job_def: meta.JobDefinition, job_push_id: NodeId) -> GraphSection: # TODO: These are processed as regular calculation jobs for now # That might be ok, but is worth reviewing - if job_config.job.jobType == meta.JobType.IMPORT_DATA: - job_def = job_config.job.importData + if job_def.jobType == meta.JobType.IMPORT_DATA: + job_details = job_def.importData else: - job_def = job_config.job.exportData + job_details = job_def.exportData - target_selector = job_def.model - target_obj = _util.get_job_resource(target_selector, job_config) + target_selector = job_details.model + target_obj = _util.get_job_resource(target_selector, self._job_config) target_def = target_obj.model - return cls.build_calculation_job( - job_config, result_spec, job_namespace, job_push_id, - target_selector, target_def, job_def) + return self.build_calculation_job( + job_def, job_push_id, + target_selector, target_def, + job_details) - @classmethod - def build_run_model_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - job_namespace: NodeNamespace, job_push_id: NodeId) \ - -> GraphSection: + def build_run_model_job(self, job_def: meta.JobDefinition, job_push_id: NodeId) -> GraphSection: + + job_details = job_def.runModel - target_selector = job_config.job.runModel.model - target_obj = _util.get_job_resource(target_selector, job_config) + target_selector = job_details.model + target_obj = _util.get_job_resource(target_selector, self._job_config) target_def = target_obj.model - job_def = job_config.job.runModel - return cls.build_calculation_job( - job_config, result_spec, job_namespace, job_push_id, - target_selector, target_def, job_def) + return self.build_calculation_job( + job_def, job_push_id, + target_selector, target_def, + job_details) - @classmethod - def build_run_flow_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - job_namespace: NodeNamespace, job_push_id: NodeId) \ - -> GraphSection: + def build_run_flow_job(self, job_def: meta.JobDefinition, job_push_id: NodeId) -> GraphSection: + + job_details = job_def.runFlow - target_selector = job_config.job.runFlow.flow - target_obj = _util.get_job_resource(target_selector, job_config) + target_selector = job_details.flow + target_obj = _util.get_job_resource(target_selector, self._job_config) target_def = target_obj.flow - job_def = job_config.job.runFlow - return cls.build_calculation_job( - job_config, result_spec, job_namespace, job_push_id, - target_selector, target_def, job_def) + return self.build_calculation_job( + job_def, job_push_id, + target_selector, target_def, + job_details) + + def build_job_group(self, job_def: meta.JobDefinition, job_push_id: NodeId) -> GraphSection: + + job_group = job_def.jobGroup + + if job_group.jobGroupType == meta.JobGroupType.SEQUENTIAL_JOB_GROUP: + return self.build_sequential_job_group(job_group, job_push_id) + + if job_group.jobGroupType == meta.JobGroupType.PARALLEL_JOB_GROUP: + return self.build_parallel_job_group(job_group, job_push_id) + + else: + self._error(_ex.EJobValidation(f"Job group type [{job_group.jobGroupType.name}] is not supported yet")) + return GraphSection(dict(), inputs={job_push_id}) + + def build_sequential_job_group(self, job_group: meta.JobGroup, job_push_id: NodeId) -> GraphSection: + + nodes = dict() + prior_id = job_push_id + + for child_def in job_group.sequential.jobs: + + child_node = self.build_child_job(child_def, explicit_deps=[prior_id]) + nodes[child_node.id] = child_node + + prior_id = child_node.id + + # No real results from job groups yet (they cannot be executed from the platform) + job_result = cfg.JobResult() + result_id = NodeId.of("trac_job_result", self._job_namespace, cfg.JobResult) + result_node = StaticValueNode(result_id, job_result, explicit_deps=[prior_id]) + nodes[result_id] = result_node + + return GraphSection(nodes, inputs={job_push_id}, outputs={result_id}) + + def build_parallel_job_group(self, job_group: meta.JobGroup, job_push_id: NodeId) -> GraphSection: + + nodes = dict() + parallel_ids = [job_push_id] + + for child_def in job_group.parallel.jobs: + + child_node = self.build_child_job(child_def, explicit_deps=[job_push_id]) + nodes[child_node.id] = child_node + + parallel_ids.append(child_node.id) + + # No real results from job groups yet (they cannot be executed from the platform) + job_result = cfg.JobResult() + result_id = NodeId.of("trac_job_result", self._job_namespace, cfg.JobResult) + result_node = StaticValueNode(result_id, job_result, explicit_deps=parallel_ids) + nodes[result_id] = result_node + + return GraphSection(nodes, inputs={job_push_id}, outputs={result_id}) + + def build_child_job(self, child_job_def: meta.JobDefinition, explicit_deps) -> Node[config.JobResult]: + + child_job_id = _util.new_object_id(meta.ObjectType.JOB) + + child_builder = self._child_builder(child_job_id) + child_graph = child_builder.build_job(child_job_def) + + child_node_name = _util.object_key(child_job_id) + child_node_id = NodeId.of(child_node_name, self._job_namespace, cfg.JobResult) + + child_node = ChildJobNode( + child_node_id, child_job_id, child_job_def, + child_graph, explicit_deps) + + return child_node - @classmethod def build_calculation_job( - cls, job_config: config.JobConfig, result_spec: JobResultSpec, - job_namespace: NodeNamespace, job_push_id: NodeId, + self, job_def: meta.JobDefinition, job_push_id: NodeId, target_selector: meta.TagSelector, target_def: tp.Union[meta.ModelDefinition, meta.FlowDefinition], - job_def: tp.Union[meta.RunModelJob, meta.RunFlowJob]) \ + job_details: __JOB_DETAILS) \ -> GraphSection: # The main execution graph can run directly in the job context, no need to do a context push @@ -185,29 +281,30 @@ def build_calculation_job( required_inputs = target_def.inputs required_outputs = target_def.outputs - provided_params = job_def.parameters - provided_inputs = job_def.inputs - provided_outputs = job_def.outputs + provided_params = job_details.parameters + provided_inputs = job_details.inputs + provided_outputs = job_details.outputs - params_section = cls.build_job_parameters( - job_namespace, required_params, provided_params, + params_section = self.build_job_parameters( + required_params, provided_params, explicit_deps=[job_push_id]) - input_section = cls.build_job_inputs( - job_config, job_namespace, required_inputs, provided_inputs, + input_section = self.build_job_inputs( + required_inputs, provided_inputs, explicit_deps=[job_push_id]) - exec_obj = _util.get_job_resource(target_selector, job_config) + exec_namespace = self._job_namespace + exec_obj = _util.get_job_resource(target_selector, self._job_config) - exec_section = cls.build_model_or_flow( - job_config, job_namespace, exec_obj, + exec_section = self.build_model_or_flow( + exec_namespace, job_def, exec_obj, explicit_deps=[job_push_id]) - output_section = cls.build_job_outputs( - job_config, job_namespace, required_outputs, provided_outputs, + output_section = self.build_job_outputs( + required_outputs, provided_outputs, explicit_deps=[job_push_id]) - main_section = cls._join_sections(params_section, input_section, exec_section, output_section) + main_section = self._join_sections(params_section, input_section, exec_section, output_section) # Build job-level metadata outputs @@ -215,16 +312,14 @@ def build_calculation_job( nid for nid, n in main_section.nodes.items() if isinstance(n, DataResultNode)) - result_section = cls.build_job_results( - job_config, job_namespace, - result_spec, bundles=data_result_ids, + result_section = self.build_job_results( + bundles=data_result_ids, explicit_deps=[job_push_id, *main_section.must_run]) - return cls._join_sections(main_section, result_section) + return self._join_sections(main_section, result_section) - @classmethod def build_job_parameters( - cls, job_namespace: NodeNamespace, + self, required_params: tp.Dict[str, meta.ModelParameter], supplied_params: tp.Dict[str, meta.Value], explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ @@ -240,18 +335,18 @@ def build_job_parameters( if param_schema.defaultValue is not None: param_def = param_schema.defaultValue else: - raise _ex.EJobValidation(f"Missing required parameter: [{param_name}]") + self._error(_ex.EJobValidation(f"Missing required parameter: [{param_name}]")) + continue - param_id = NodeId(param_name, job_namespace, meta.Value) + param_id = NodeId(param_name, self._job_namespace, meta.Value) param_node = StaticValueNode(param_id, param_def, explicit_deps=explicit_deps) nodes[param_id] = param_node return GraphSection(nodes, outputs=set(nodes.keys()), must_run=list(nodes.keys())) - @classmethod def build_job_inputs( - cls, job_config: config.JobConfig, job_namespace: NodeNamespace, + self, required_inputs: tp.Dict[str, meta.ModelInputSchema], supplied_inputs: tp.Dict[str, meta.TagSelector], explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ @@ -267,20 +362,21 @@ def build_job_inputs( if data_selector is None: if input_schema.optional: - data_view_id = NodeId.of(input_name, job_namespace, _data.DataView) + data_view_id = NodeId.of(input_name, self._job_namespace, _data.DataView) nodes[data_view_id] = StaticValueNode(data_view_id, _data.DataView.create_empty()) outputs.add(data_view_id) continue else: - raise _ex.EJobValidation(f"Missing required input: [{input_name}]") + self._error(_ex.EJobValidation(f"Missing required input: [{input_name}]")) + continue # Build a data spec using metadata from the job config # For now we are always loading the root part, snap 0, delta 0 - data_def = _util.get_job_resource(data_selector, job_config).data - storage_def = _util.get_job_resource(data_def.storageId, job_config).storage + data_def = _util.get_job_resource(data_selector, self._job_config).data + storage_def = _util.get_job_resource(data_def.storageId, self._job_config).storage if data_def.schemaId: - schema_def = _util.get_job_resource(data_def.schemaId, job_config).schema + schema_def = _util.get_job_resource(data_def.schemaId, self._job_config).schema else: schema_def = data_def.schema @@ -289,16 +385,16 @@ def build_job_inputs( data_spec = _data.DataSpec(data_item, data_def, storage_def, schema_def) # Data spec node is static, using the assembled data spec - data_spec_id = NodeId.of(f"{input_name}:SPEC", job_namespace, _data.DataSpec) + data_spec_id = NodeId.of(f"{input_name}:SPEC", self._job_namespace, _data.DataSpec) data_spec_node = StaticValueNode(data_spec_id, data_spec, explicit_deps=explicit_deps) # Physical load of data items from disk # Currently one item per input, since inputs are single part/delta - data_load_id = NodeId.of(f"{input_name}:LOAD", job_namespace, _data.DataItem) + data_load_id = NodeId.of(f"{input_name}:LOAD", self._job_namespace, _data.DataItem) data_load_node = LoadDataNode(data_load_id, data_spec_id, explicit_deps=explicit_deps) # Input views assembled by mapping one root part to each view - data_view_id = NodeId.of(input_name, job_namespace, _data.DataView) + data_view_id = NodeId.of(input_name, self._job_namespace, _data.DataView) data_view_node = DataViewNode(data_view_id, schema_def, data_load_id) nodes[data_spec_id] = data_spec_node @@ -311,9 +407,8 @@ def build_job_inputs( return GraphSection(nodes, outputs=outputs, must_run=must_run) - @classmethod def build_job_outputs( - cls, job_config: config.JobConfig, job_namespace: NodeNamespace, + self, required_outputs: tp.Dict[str, meta.ModelOutputSchema], supplied_outputs: tp.Dict[str, meta.TagSelector], explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ @@ -329,25 +424,27 @@ def build_job_outputs( if data_selector is None: if output_schema.optional: optional_info = "(configuration is required for all optional outputs, in case they are produced)" - raise _ex.EJobValidation(f"Missing optional output: [{output_name}] {optional_info}") + self._error(_ex.EJobValidation(f"Missing optional output: [{output_name}] {optional_info}")) + continue else: - raise _ex.EJobValidation(f"Missing required output: [{output_name}]") + self._error(_ex.EJobValidation(f"Missing required output: [{output_name}]")) + continue # Output data view must already exist in the namespace - data_view_id = NodeId.of(output_name, job_namespace, _data.DataView) - data_spec_id = NodeId.of(f"{output_name}:SPEC", job_namespace, _data.DataSpec) + data_view_id = NodeId.of(output_name, self._job_namespace, _data.DataView) + data_spec_id = NodeId.of(f"{output_name}:SPEC", self._job_namespace, _data.DataSpec) - data_obj = _util.get_job_resource(data_selector, job_config, optional=True) + data_obj = _util.get_job_resource(data_selector, self._job_config, optional=True) if data_obj is not None: # If data def for the output has been built in advance, use a static data spec data_def = data_obj.data - storage_def = _util.get_job_resource(data_def.storageId, job_config).storage + storage_def = _util.get_job_resource(data_def.storageId, self._job_config).storage if data_def.schemaId: - schema_def = _util.get_job_resource(data_def.schemaId, job_config).schema + schema_def = _util.get_job_resource(data_def.schemaId, self._job_config).schema else: schema_def = data_def.schema @@ -366,28 +463,28 @@ def build_job_outputs( # Dynamic data def will always use an embedded schema (this is no ID for an external schema) data_key = output_name + ":DATA" - data_id = job_config.resultMapping[data_key] + data_id = self._job_config.resultMapping[data_key] storage_key = output_name + ":STORAGE" - storage_id = job_config.resultMapping[storage_key] + storage_id = self._job_config.resultMapping[storage_key] data_spec_node = DynamicDataSpecNode( - data_spec_id, data_view_id, - data_id, storage_id, - prior_data_spec=None, - explicit_deps=explicit_deps) + data_spec_id, data_view_id, + data_id, storage_id, + prior_data_spec=None, + explicit_deps=explicit_deps) output_data_key = _util.object_key(data_id) output_storage_key = _util.object_key(storage_id) # Map one data item from each view, since outputs are single part/delta - data_item_id = NodeId(f"{output_name}:ITEM", job_namespace, _data.DataItem) + data_item_id = NodeId(f"{output_name}:ITEM", self._job_namespace, _data.DataItem) data_item_node = DataItemNode(data_item_id, data_view_id) # Create a physical save operation for the data item - data_save_id = NodeId.of(f"{output_name}:SAVE", job_namespace, None) + data_save_id = NodeId.of(f"{output_name}:SAVE", self._job_namespace, None) data_save_node = SaveDataNode(data_save_id, data_spec_id, data_item_id) - data_result_id = NodeId.of(f"{output_name}:RESULT", job_namespace, ObjectBundle) + data_result_id = NodeId.of(f"{output_name}:RESULT", self._job_namespace, ObjectBundle) data_result_node = DataResultNode( data_result_id, output_name, data_item_id, data_spec_id, data_save_id, @@ -406,6 +503,9 @@ def build_job_outputs( @classmethod def build_runtime_outputs(cls, output_names: tp.List[str], job_namespace: NodeNamespace): + # This method is called dynamically during job execution + # So it cannot use stateful information like self._job_config or self._job_namespace + # TODO: Factor out common logic with regular job outputs (including static / dynamic) nodes = {} @@ -462,22 +562,21 @@ def build_runtime_outputs(cls, output_names: tp.List[str], job_namespace: NodeNa return GraphSection(nodes, inputs=inputs, outputs={runtime_outputs_id}) - @classmethod def build_job_results( - cls, job_config: cfg.JobConfig, job_namespace: NodeNamespace, result_spec: JobResultSpec, + self, objects: tp.Dict[str, NodeId[meta.ObjectDefinition]] = None, bundles: tp.List[NodeId[ObjectBundle]] = None, explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: - build_result_id = NodeId.of("trac_build_result", job_namespace, cfg.JobResult) + build_result_id = NodeId.of("trac_job_result", self._job_namespace, cfg.JobResult) if objects is not None: results_inputs = set(objects.values()) build_result_node = BuildJobResultNode( - build_result_id, job_config.jobId, + build_result_id, self._job_config.jobId, outputs = JobOutputs(objects=objects), explicit_deps=explicit_deps) @@ -486,17 +585,16 @@ def build_job_results( results_inputs = set(bundles) build_result_node = BuildJobResultNode( - build_result_id, job_config.jobId, + build_result_id, self._job_config.jobId, outputs = JobOutputs(bundles=bundles), explicit_deps=explicit_deps) else: raise _ex.EUnexpected() - save_result_id = NodeId("trac_save_result", job_namespace) - save_result_node = SaveJobResultNode(save_result_id, build_result_id, result_spec) - - if result_spec.save_result: + if self._result_spec.save_result: + save_result_id = NodeId("trac_save_result", self._job_namespace) + save_result_node = SaveJobResultNode(save_result_id, build_result_id, self._result_spec) result_nodes = {build_result_id: build_result_node, save_result_id: save_result_node} job_result_id = save_result_id else: @@ -505,10 +603,9 @@ def build_job_results( return GraphSection(result_nodes, inputs=results_inputs, must_run=[job_result_id]) - @classmethod def build_model_or_flow_with_context( - cls, job_config: config.JobConfig, namespace: NodeNamespace, - model_or_flow_name: str, model_or_flow: meta.ObjectDefinition, + self, namespace: NodeNamespace, model_or_flow_name: str, + job_def: meta.JobDefinition, model_or_flow: meta.ObjectDefinition, input_mapping: tp.Dict[str, NodeId], output_mapping: tp.Dict[str, NodeId], explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: @@ -521,44 +618,45 @@ def build_model_or_flow_with_context( # Execute in the sub-context by doing PUSH, EXEC, POP # Note that POP node must be in the sub namespace too - push_section = cls.build_context_push( + push_section = self.build_context_push( sub_namespace, input_mapping, explicit_deps) - exec_section = cls.build_model_or_flow( - job_config, sub_namespace, model_or_flow, + exec_section = self.build_model_or_flow( + sub_namespace, job_def, model_or_flow, explicit_deps=push_section.must_run) - pop_section = cls.build_context_pop( + pop_section = self.build_context_pop( sub_namespace, output_mapping, explicit_deps=exec_section.must_run) - return cls._join_sections(push_section, exec_section, pop_section) + return self._join_sections(push_section, exec_section, pop_section) - @classmethod def build_model_or_flow( - cls, job_config: config.JobConfig, namespace: NodeNamespace, + self, namespace: NodeNamespace, + job_def: meta.JobDefinition, model_or_flow: meta.ObjectDefinition, explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: if model_or_flow.objectType == meta.ObjectType.MODEL: - return cls.build_model(job_config, namespace, model_or_flow.model, explicit_deps) + return self.build_model(namespace, job_def, model_or_flow.model, explicit_deps) elif model_or_flow.objectType == meta.ObjectType.FLOW: - return cls.build_flow(job_config, namespace, model_or_flow.flow) + return self.build_flow(namespace, job_def, model_or_flow.flow) else: - raise _ex.EConfigParse("Invalid job config given to the execution engine") + message = f"Invalid job config, expected model or flow, got [{model_or_flow.objectType}]" + self._error(_ex.EJobValidation(message)) - @classmethod def build_model( - cls, job_config: config.JobConfig, namespace: NodeNamespace, + self, namespace: NodeNamespace, + job_def: meta.JobDefinition, model_def: meta.ModelDefinition, explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: - cls.check_model_type(job_config, model_def) + self.check_model_type(job_def, model_def) def param_id(node_name): return NodeId(node_name, namespace, meta.Value) @@ -572,10 +670,10 @@ def data_id(node_name): output_ids = set(map(data_id, model_def.outputs)) # Set up storage access for import / export data jobs - if job_config.job.jobType == meta.JobType.IMPORT_DATA: - storage_access = job_config.job.importData.storageAccess - elif job_config.job.jobType == meta.JobType.EXPORT_DATA: - storage_access = job_config.job.exportData.storageAccess + if job_def.jobType == meta.JobType.IMPORT_DATA: + storage_access = job_def.importData.storageAccess + elif job_def.jobType == meta.JobType.EXPORT_DATA: + storage_access = job_def.exportData.storageAccess else: storage_access = None @@ -615,9 +713,9 @@ def data_id(node_name): # Assemble a graph to include the model and its outputs return GraphSection(nodes, inputs={*parameter_ids, *input_ids}, outputs=output_ids, must_run=[model_result_id]) - @classmethod def build_flow( - cls, job_config: config.JobConfig, namespace: NodeNamespace, + self, namespace: NodeNamespace, + job_def: meta.JobDefinition, flow_def: meta.FlowDefinition, explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: @@ -650,11 +748,11 @@ def is_input(n): return n[1].nodeType in [meta.FlowNodeType.PARAMETER_NODE, meta node_name, node = reachable_nodes.popitem() - sub_section = cls.build_flow_node( - job_config, namespace, target_edges, + sub_section = self.build_flow_node( + namespace, job_def, target_edges, node_name, node, explicit_deps) - graph_section = cls._join_sections(graph_section, sub_section, allow_partial_inputs=True) + graph_section = self._join_sections(graph_section, sub_section, allow_partial_inputs=True) if node.nodeType != meta.FlowNodeType.OUTPUT_NODE: @@ -674,20 +772,18 @@ def is_input(n): return n[1].nodeType in [meta.FlowNodeType.PARAMETER_NODE, meta missing_targets = [edge.target for node in remaining_edges_by_target.values() for edge in node] missing_target_names = [f"{t.node}.{t.socket}" if t.socket else t.node for t in missing_targets] missing_nodes = list(map(lambda n: NodeId(n, namespace), missing_target_names)) - cls._invalid_graph_error(missing_nodes) + self._invalid_graph_error(missing_nodes) return graph_section - @classmethod def build_flow_node( - cls, job_config: config.JobConfig, namespace: NodeNamespace, + self, namespace: NodeNamespace, + job_def: meta.JobDefinition, target_edges: tp.Dict[meta.FlowSocket, meta.FlowEdge], node_name: str, node: meta.FlowNode, explicit_deps: tp.Optional[tp.List[NodeId]] = None) \ -> GraphSection: - flow_job = job_config.job.runFlow - def socket_key(socket): return f"{socket.node}.{socket.socket}" if socket.socket else socket.node @@ -700,7 +796,7 @@ def edge_mapping(node_: str, socket_: str = None, result_type=None): edge = target_edges.get(socket) # Report missing edges as a job consistency error (this might happen sometimes in dev mode) if edge is None: - raise _ex.EJobValidation(f"Inconsistent flow: Socket [{socket}] is not connected") + self._error(_ex.EJobValidation(f"Inconsistent flow: Socket [{socket}] is not connected")) return socket_id(edge.source.node, edge.source.socket, result_type) if node.nodeType == meta.FlowNodeType.PARAMETER_NODE: @@ -723,27 +819,27 @@ def edge_mapping(node_: str, socket_: str = None, result_type=None): push_mapping = {**input_mapping, **param_mapping} pop_mapping = output_mapping - model_selector = flow_job.models.get(node_name) - model_obj = _util.get_job_resource(model_selector, job_config) + model_selector = job_def.runFlow.models.get(node_name) + model_obj = _util.get_job_resource(model_selector, self._job_config) # Missing models in the job config is a job consistency error if model_obj is None or model_obj.objectType != meta.ObjectType.MODEL: - raise _ex.EJobValidation(f"No model was provided for flow node [{node_name}]") + self._error(_ex.EJobValidation(f"No model was provided for flow node [{node_name}]")) # Explicit check for model compatibility - report an error now, do not try build_model() - cls.check_model_compatibility(model_selector, model_obj.model, node_name, node) - cls.check_model_type(job_config, model_obj.model) + self.check_model_compatibility(model_selector, model_obj.model, node_name, node) + self.check_model_type(job_def, model_obj.model) - return cls.build_model_or_flow_with_context( - job_config, namespace, node_name, model_obj, - push_mapping, pop_mapping, explicit_deps) + return self.build_model_or_flow_with_context( + namespace, node_name, + job_def, model_obj, + push_mapping, pop_mapping, + explicit_deps) - # Missing / invalid node type - should be caught in static validation - raise _ex.ETracInternal(f"Flow node [{node_name}] has invalid node type [{node.nodeType}]") + self._error(_ex.EJobValidation(f"Flow node [{node_name}] has invalid node type [{node.nodeType}]")) - @classmethod def check_model_compatibility( - cls, model_selector: meta.TagSelector, + self, model_selector: meta.TagSelector, model_def: meta.ModelDefinition, node_name: str, flow_node: meta.FlowNode): model_params = list(sorted(model_def.parameters.keys())) @@ -756,22 +852,21 @@ def check_model_compatibility( if model_params != node_params or model_inputs != node_inputs or model_outputs != node_outputs: model_key = _util.object_key(model_selector) - raise _ex.EJobValidation(f"Incompatible model for flow node [{node_name}] (Model: [{model_key}])") + self._error(_ex.EJobValidation(f"Incompatible model for flow node [{node_name}] (Model: [{model_key}])")) - @classmethod - def check_model_type(cls, job_config: config.JobConfig, model_def: meta.ModelDefinition): + def check_model_type(self, job_def: meta.JobDefinition, model_def: meta.ModelDefinition): - if job_config.job.jobType == meta.JobType.IMPORT_DATA: + if job_def.jobType == meta.JobType.IMPORT_DATA: allowed_model_types = [meta.ModelType.DATA_IMPORT_MODEL] - elif job_config.job.jobType == meta.JobType.EXPORT_DATA: + elif job_def.jobType == meta.JobType.EXPORT_DATA: allowed_model_types = [meta.ModelType.DATA_EXPORT_MODEL] else: allowed_model_types = [meta.ModelType.STANDARD_MODEL] if model_def.modelType not in allowed_model_types: - job_type = job_config.job.jobType.name + job_type = job_def.jobType.name model_type = model_def.modelType.name - raise _ex.EJobValidation(f"Job type [{job_type}] cannot use model type [{model_type}]") + self._error(_ex.EJobValidation(f"Job type [{job_type}] cannot use model type [{model_type}]")) @staticmethod def build_context_push( @@ -833,8 +928,7 @@ def build_context_pop( outputs={*pop_mapping.values()}, must_run=[pop_id]) - @classmethod - def _join_sections(cls, *sections: GraphSection, allow_partial_inputs: bool = False): + def _join_sections(self, *sections: GraphSection, allow_partial_inputs: bool = False): n_sections = len(sections) first_section = sections[0] @@ -856,7 +950,7 @@ def _join_sections(cls, *sections: GraphSection, allow_partial_inputs: bool = Fa if allow_partial_inputs: inputs.update(requirements_not_met) else: - cls._invalid_graph_error(requirements_not_met) + self._invalid_graph_error(requirements_not_met) nodes.update(current_section.nodes) @@ -865,13 +959,12 @@ def _join_sections(cls, *sections: GraphSection, allow_partial_inputs: bool = Fa return GraphSection(nodes, inputs, last_section.outputs, must_run) - @classmethod - def _invalid_graph_error(cls, missing_dependencies: tp.Iterable[NodeId]): + def _invalid_graph_error(self, missing_dependencies: tp.Iterable[NodeId]): - missing_ids = ", ".join(map(cls._missing_item_display_name, missing_dependencies)) - message = f"Invalid job config: The execution graph has unsatisfied dependencies: [{missing_ids}]" + missing_ids = ", ".join(map(self._missing_item_display_name, missing_dependencies)) + message = f"The execution graph has unsatisfied dependencies: [{missing_ids}]" - raise _ex.EJobValidation(message) + self._error(_ex.EJobValidation(message)) @classmethod def _missing_item_display_name(cls, node_id: NodeId): @@ -886,3 +979,7 @@ def _missing_item_display_name(cls, node_id: NodeId): return node_id.name else: return f"{node_id.name} / {', '.join(components[:-1])}" + + def _error(self, error: Exception): + + self._errors.append(error) diff --git a/tracdap-runtime/python/src/tracdap/rt/_exec/runtime.py b/tracdap-runtime/python/src/tracdap/rt/_exec/runtime.py index 75fb541de..541a84cac 100644 --- a/tracdap-runtime/python/src/tracdap/rt/_exec/runtime.py +++ b/tracdap-runtime/python/src/tracdap/rt/_exec/runtime.py @@ -333,10 +333,8 @@ def load_job_config( config_file_name="job") if self._dev_mode: - job_config = _dev_mode.DevModeTranslator.translate_job_config( - self._sys_config, job_config, - self._scratch_dir, self._config_mgr, - model_class) + translator = _dev_mode.DevModeTranslator(self._sys_config, self._config_mgr, self._scratch_dir) + job_config = translator.translate_job_config(job_config, model_class) return job_config diff --git a/tracdap-runtime/python/test/tracdap_examples/test_tutorial.py b/tracdap-runtime/python/test/tracdap_examples/test_tutorial.py index 6725788f2..efb569302 100644 --- a/tracdap-runtime/python/test/tracdap_examples/test_tutorial.py +++ b/tracdap-runtime/python/test/tracdap_examples/test_tutorial.py @@ -169,3 +169,10 @@ def test_using_polars(self): sys_config = self.examples_root.joinpath("config/sys_config.yaml") launch.launch_model(UsingPolarsModel, job_config, sys_config, dev_mode=True) + + def test_group_import_process_export(self): + + job_config = self.examples_root.joinpath("config/job_group.yaml") + sys_config = self.examples_root.joinpath("config/sys_config.yaml") + + launch.launch_job(job_config, sys_config, dev_mode=True) diff --git a/tracdap-runtime/python/test/tracdap_test/rt/jobs/test_core_jobs.py b/tracdap-runtime/python/test/tracdap_test/rt/jobs/test_core_jobs.py index f8b63bc2f..1a0d17744 100644 --- a/tracdap-runtime/python/test/tracdap_test/rt/jobs/test_core_jobs.py +++ b/tracdap-runtime/python/test/tracdap_test/rt/jobs/test_core_jobs.py @@ -97,8 +97,8 @@ def test_run_model_job(self): scratch_dir = pathlib.Path(tmpdir) # Let dev mode translator sort out the data / storage definitions - job_config = dev_mode.DevModeTranslator.translate_job_config( - self.sys_config, job_config, scratch_dir, None, None) + translator = dev_mode.DevModeTranslator(self.sys_config, None, scratch_dir) # No config mgr + job_config = translator.translate_job_config(job_config) trac_runtime = runtime.TracRuntime( self.sys_config, @@ -120,8 +120,8 @@ def test_run_model_job_external_schemas(self): scratch_dir = pathlib.Path(tmpdir) # Let dev mode translator sort out the data / storage definitions - job_config = dev_mode.DevModeTranslator.translate_job_config( - self.sys_config, job_config, scratch_dir, None, None) + translator = dev_mode.DevModeTranslator(self.sys_config, None, scratch_dir) # No config mgr + job_config = translator.translate_job_config(job_config) # Make the input dataset use an external schema