Skip to content

Commit

Permalink
Merge pull request #50 from blumenstiel/main
Browse files Browse the repository at this point in the history
Updated grid wrapper code
  • Loading branch information
romeokienzler authored Mar 9, 2024
2 parents 6b411a7 + 1ef40c0 commit c3d3f2e
Show file tree
Hide file tree
Showing 10 changed files with 579 additions and 342 deletions.
54 changes: 20 additions & 34 deletions GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def grid_process(batch_id, parameter1, parameter2, *args, **kwargs):

You might want to add `*args, **kwargs` to avoid errors, if not all interface variables are used in the grid process.
Note that the operator script is imported by the grid wrapper script. Therefore, all code in the script is executed.
It is recommended to avoid executions in the code and to use a main block if the script is also used as a single operator.
If the script is also used as a single operator, it is recommended to check for `__main__` to avoid executions when the code is imported by the grid wrapper.

```python
if __name__ == '__main__':
Expand All @@ -564,56 +564,42 @@ Note that the grid computing is currently not implemented for R scripts.

### 5.2 Compile a grid wrapper with C3

The compilation is similar to an operator. Additionally, the name of the grid process is passed to `create_grid_wrapper.py` using `--process` or `-p`.
The compilation is similar to an operator. Additionally, the name of the grid process is passed to `create_gridwrapper.py` using `--process` or `-p` (default: `"grid_process"`)
and a backend for the coordinator is selected with `--backend` or `-b` (default: `"local"`).

```sh
c3_create_gridwrapper -r "<registry>/<namespace>" --process "grid_process" "<my-operator-script>.py" "<additional_file1>" "<additional_file2>"
c3_create_gridwrapper -r "<registry>/<namespace>" -p "grid_process" -b "local" "<my-operator-script>.py" "<additional_file1>" "<additional_file2>"
```

C3 also includes a grid computing pattern for Cloud Object Storage (COS). You can create a COS grid wrapper by adding a `--cos` flag.
The COS grid wrapper downloads all files of a batch to local storage, compute the process, and uploads the output files to COS.
Note that the COS grid wrapper requires the file paths to include the batch id to be identified, see details in the next subsection.
C3 supports three backends for the coordination: Coordinator files on a shared local storage (`"local"`), on COS (`"cos"`), or as a key-value storage on S3 (`"s3kv"`).

The created files include a `gw_<my-operator-script>.py` file that includes the generated code for the grid wrapper (`cgw_<my-operator-script>.py` for the COS version).
Note, that the backend `"legacy_cos"` also handles downloading and uploading files from COS. We removed this functionality to simplify the grid wrapper.

The grid wrapper creates a temporary file `gw_<my-operator-script>.py` which is copied to the container image and deleted.
Similar to an operator, `gw_<my-operator-script>.yaml`, `gw_<my-operator-script>.cwl`, and `gw_<my-operator-script>.job.yaml` are created.


### 5.3 Apply grid wrappers

The grid wrapper uses coordinator files to split up the batch processes between different pods.
Therefore, each pod needs access to a shared persistent volume, see [storage](#storage).
Alternatively, you can use the COS grid wrapper which uses a coordinator path in COS.
Alternatively, you can use the COS or S3kv grid wrapper which uses a coordinator in S3.

The grid wrapper adds specific variables to the `job.yaml`, that define the batches and some coordination settings.

First, you can define the list of batches in a file and pass `gw_batch_file` to the grid wrapper.
You can use either a txt file with a comma-separated list of strings or a json file with the keys being the batch ids.
Alternatively, the batch ids can be defined by a file name pattern via `gw_file_path_pattern` and `gw_group_by`.
You can provide multiple patterns via a comma-separated list and the patterns can include wildcards like `*` or `?` to find all relevant files.
`gw_group_by` is code that extracts the batch id from a file name by merging the file name string with the code string and passing it to `eval()`.
Assuming, we have the file names `file-from-batch-42-metadata.json` and `second_file-42-image.png`.
The code `gw_group_by = ".split('-')[-2]"` extracts the batch `42` from both files.
You can also to use something like `"[-15:-10]"` or `".split('/')[-1].split('.')[0]"`.
`gw_group_by` is ignored if you provide `gw_batch_file`.
Be aware that the file names need to include the batch name if you are using `gw_group_by` or the COS version
(because files are downloaded based on a match with the batch id).

Second, you need to define `gw_coordinator_path` and optionally other coordinator variables.
The `gw_coordinator_path` is a path to a persistent and shared directory that is used by the pods to lock batches and mark them as processed.
`gw_lock_file_suffix` and similar variables are the suffixes for coordinator files (default: `.lock`, `.processed`, and `.err`).
`gw_lock_timeout` defines the time in seconds until other pods remove the `.lock` file from batches that might be struggling (default `3600`).
You need to increase `gw_lock_timeout` to avoid multiple processing if batch processes run very long.
By default, pods skip batches with `.err` files. You can set `gw_ignore_error_files` to `True` after you fixed the error.
First, you can define the list of batch ids in a file and pass `gw_batch_file` to the grid wrapper.
You can use either a `txt` file with a comma-separated list of strings, a `json` file with the keys being the batch ids, or a `csv` file with `gw_batch_file_col_name` being the column with the batch ids.
`gw_batch_file` can be a local path, a path within the coordinator bucket or a COS connection to a file (`cos://<access_key_id>:<access_secret_key>@<endpoint>/<bucket>/<path_to>/<batch_file>`).

If your using the COS grid wrapper, further variables are required.
You can provide a comma-separated list of additional files that should be downloaded COS using `gw_additional_source_files`.
All batch files and additional files are download to an input directory, defined via `gw_local_input_path` (default: `input`).
Similar, all files in `gw_local_target_path` are uploaded to COS after the batch processing (default: `target`).
Second, you need to define a `gw_coordinator_path` or `gw_coordinator_connection`.
The `gw_coordinator_path` is used in the `local` version. It is a path to a persistent and shared directory that is used by the pods to lock batches and mark them as processed.
`gw_coordinator_connection` is used in the `cos` and `s3kv` version. It defines a connection to a directory on COS: `cos://<access_key_id>:<access_secret_key>@<endpoint>/<bucket>/<path_to_directory>`.
The coordinator uses files with specific suffixes: `.lock`, `.processed`, and `.err`.
`gw_lock_timeout` defines the time in seconds until other pods remove the `.lock` file from batches that might be struggling (default `10800`).
If your processes run very long, you can increase `gw_lock_timeout` to avoid duplicated processing of batches.
By default, pods skip batches with `.err` files. You can set `gw_ignore_error_files` to `True` after you fixed the error.

Furthermore, `gw_source_access_key_id`, `gw_source_secret_access_key`, `gw_source_endpoint`, and `gw_source_bucket` define the COS bucket to the source files.
You can specify other buckets for the coordinator and target files.
If the buckets are similar to the source bucket, you just need to provide `gw_target_path` and `gw_coordinator_path` and remove the other variables from the `job.yaml`.
It is recommended to use [secrets](#secrets) for the access key and secret.
The grid wrapper currently does not support [secrets](#secrets) for the access key and secret within a connection.

Lastly, you want to add the number of parallel pods by adding `parallelism : <num pods>` to the `job.yaml`.

Expand Down
21 changes: 12 additions & 9 deletions src/c3/create_gridwrapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import logging
import os
import argparse
Expand All @@ -24,17 +23,20 @@ def wrap_component(component_path,

logging.info(f'Using backend: {backend}')


backends = {
'cos_grid_wrapper' : c3.templates.cos_grid_wrapper_template,
'grid_wrapper' : c3.templates.grid_wrapper_template,
's3kv_grid_wrapper': c3.templates.s3kv_grid_wrapper_template,
'local': c3.templates.grid_wrapper_template,
'cos': c3.templates.cos_grid_wrapper_template,
'legacy_cos': c3.templates.legacy_cos_grid_wrapper_template,
's3kv': c3.templates.s3kv_grid_wrapper_template,
'grid_wrapper': c3.templates.grid_wrapper_template,
'cos_grid_wrapper': c3.templates.cos_grid_wrapper_template,
'legacy_cos_grid_wrapper': c3.templates.legacy_cos_grid_wrapper_template,
's3kv_grid_wrapper': c3.templates.s3kv_grid_wrapper_template,
}
gw_template = backends.get(backend)

logging.debug(f'Using backend template: {gw_template}')


grid_wrapper_code = gw_template.substitute(
component_name=component_name,
component_description=component_description,
Expand Down Expand Up @@ -74,7 +76,8 @@ def get_component_elements(file_path):
type_to_func = {'String': '', 'Boolean': 'bool', 'Integer': 'int', 'Float': 'float'}
for variable, d in inputs.items():
interface += f"# {d['description']}\n"
if d['type'] == 'String' and d['default'] is not None and d['default'][0] not in '\'\"':
if (d['type'] == 'String' and d['default'] is not None and
(d['default'] == '' or d['default'][0] not in '\'\"')):
# Add quotation marks
d['default'] = "'" + d['default'] + "'"
interface += f"component_{variable} = {type_to_func[d['type']]}(os.getenv('{variable}', {d['default']}))\n"
Expand Down Expand Up @@ -159,8 +162,8 @@ def main():
help='List of paths to additional files to include in the container image')
parser.add_argument('-p', '--component_process', type=str, default='grid_process',
help='Name of the component sub process that is executed for each batch.')
parser.add_argument('-b', '--backend', type=str, default='s3kv_grid_wrapper',
help='Define backend. Default: s3kv_grid_wrapper. Others: grid_wrapper, cos_grid_wrapper')
parser.add_argument('-b', '--backend', type=str, default='local',
help='Define backend. Default: local. Others: cos, s3kv, legacy_cos (with automatic file download/upload)')
parser.add_argument('-r', '--repository', type=str, default=None,
help='Container registry address, e.g. docker.io/<username>')
parser.add_argument('-v', '--version', type=str, default=None,
Expand Down
3 changes: 3 additions & 0 deletions src/c3/pythonscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ def _get_input_vars(self):
logging.debug(f'Interface: No description for variable {env_name} provided.')
if re.search(r'=\s*int\(\s*os', line):
type = 'Integer'
default = default.strip('\"\'')
elif re.search(r'=\s*float\(\s*os', line):
type = 'Float'
default = default.strip('\"\'')
elif re.search(r'=\s*bool\(\s*os', line):
type = 'Boolean'
default = default.strip('\"\'')
else:
type = 'String'
return_value[env_name] = {
Expand Down
4 changes: 4 additions & 0 deletions src/c3/templates/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CWL_COMPONENT_FILE = 'cwl_component_template.cwl'
GRID_WRAPPER_FILE = 'grid_wrapper_template.py'
COS_GRID_WRAPPER_FILE = 'cos_grid_wrapper_template.py'
LEGACY_COS_GRID_WRAPPER_FILE = 'legacy_cos_grid_wrapper_template.py'
S3KV_GRID_WRAPPER_FILE = 's3kv_grid_wrapper_template.py'

# load templates
Expand Down Expand Up @@ -49,5 +50,8 @@
with open(template_path / COS_GRID_WRAPPER_FILE, 'r') as f:
cos_grid_wrapper_template = Template(f.read())

with open(template_path / LEGACY_COS_GRID_WRAPPER_FILE, 'r') as f:
legacy_cos_grid_wrapper_template = Template(f.read())

with open(template_path / S3KV_GRID_WRAPPER_FILE, 'r') as f:
s3kv_grid_wrapper_template = Template(f.read())
Loading

0 comments on commit c3d3f2e

Please sign in to comment.