Skip to content

Commit

Permalink
feat(back): #1155 support batch pipelines
Browse files Browse the repository at this point in the history
- implement compute-on-batch with python
- add support for batch pipelines
- update compute-on-batch docs

Signed-off-by: Daniel F. Murcia Rivera <danmur97@outlook.com>
  • Loading branch information
danmur97 committed Sep 19, 2023
1 parent 094fc8c commit 65f9db1
Show file tree
Hide file tree
Showing 33 changed files with 1,670 additions and 170 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
.vscode
49 changes: 30 additions & 19 deletions docs/src/api/builtins/deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,81 @@

Submit a job to a [AWS BATCH](https://aws.amazon.com/batch/) queue.

Types:
When used as a Makes declaration (at makes.nix attrs):

- computeOnAwsBatch (`attrsOf jobType`): Optional.
- computeOnAwsBatch: `attrsOf JobType` (Optional Attr)
Job groups to submit.
Defaults to `{ }`.
- jobType (`submodule`):
- allowDuplicates (`bool`): Optional.

Types:

- computeOnAwsBatch: `JobType -> SourceAble`
Source able batch file to send jobs to aws batch.

- `JobType` = `attrs`
- allowDuplicates: `bool` (Optional Attr)
Set to `false` in order to prevent submitting the job
if there is already a job in the queue with the same name.
Defaults to `false`.
- attempts (`ints.positive`): Optional.
- attempts: `positiveInt` (Optional Attr)
If the value of attempts is greater than one,
the job is retried on failure the same number of attempts as the value.
Defaults to `1`.
- attemptDurationSeconds (`ints.positive`): Optional.
- attemptDurationSeconds: `positiveInt`
The time duration in seconds
(measured from the job attempt's startedAt timestamp)
after which Batch terminates your jobs
if they have not finished.
- command (`listOf str`):
- command: `listOf str`
The command to send to the container.
It overrides the one specified
in the Batch job definition.
Additional arguments can be propagated when running this module output.
- definition (`str`):
- definition: `str`
Name of the Batch job definition
that we will use as base for submitting the job.
In general an Batch job definition is required
in order to specify which container image
our job is going to run on.
- environment (`listOf str`): Optional.
- environment: `listOf str` (Optional Attr)
Name of the environment variables
whose names and values should be copied from the machine running Makes
to the machine on Batch running the job.
Defaults to `[ ]`.
- includePositionalArgsInName (`bool`): Optional.
- includePositionalArgsInName: `bool` (Optional Attr).
Enable to make positional arguments part of the job name.
This is useful for identifying jobs
in the Batch console
more easily.
Defaults to `true`.
- memory (`ints.positive`):
- nextJob: `attrs` (Optional Attr)
The next job that will be executed after its parent finish.
You must provide a `name` attribute and all the required
attrs of `JobType`.
Defaults to `{ }`.
- memory: `positiveInt`
Amount of memory, in MiB that is reserved for the job.
- parallel (`ints.positive`): Optional.
- parallel: `positiveInt` (Optional Attr)
Number of parallel jobs to trigger using
[Batch Array Jobs](https://docs.aws.amazon.com/batch/latest/userguide/array_jobs.html).
- propagateTags (`bool`): Optional.
Defaults to `1`.
- propagateTags: `bool` (Optional Attr)
Enable tags to be propagated into the ECS tasks.
Defaults to `true`.
- queue (`nullOr str`):
- queue: `nullOr str`
Name of the Batch queue we should submit the job to.
It can be set to `null`,
causing Makes to read
If `null` then queue is fetch from
the `MAKES_COMPUTE_ON_AWS_BATCH_QUEUE` environment variable at runtime.
- setup (`listOf package`):
- setup: `listOf SourceAble`
[Makes Environment][makes_environment]
or [Makes Secrets][makes_secrets]
to `source` (as in Bash's `source`)
before anything else.
Defaults to `[ ]`.
- tags (`attrsOf str`): Optional.
- tags: `attrsOf str` (Optional Attr).
Tags to apply to the batch job.
Defaults to `{ }`.
- vcpus (`ints.positive`):
- vcpus: `positiveInt`
Amount of virtual CPUs that is reserved for the job.

Example:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ._logger import (
setup_logger,
)
from fa_purity.cmd import (
unsafe_unwrap,
)

__version__ = "1.0.0"
unsafe_unwrap(setup_logger(__name__))
107 changes: 107 additions & 0 deletions src/args/compute-on-aws-batch/batch-client/batch_client/_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from . import (
actions,
decode,
utils,
)
from .api import (
new_client,
)
from .core import (
EnvVarPointer,
QueueName,
)
import click
from fa_purity import (
Cmd,
FrozenList,
)
from fa_purity.json_2.value import (
JsonObj,
UnfoldedFactory,
)
from fa_purity.pure_iter import (
PureIterFactory,
)
import itertools
from typing import (
NoReturn,
)


def _decode_json(file_path: str) -> Cmd[JsonObj]:
def _action() -> JsonObj:
with open(file_path, "r", encoding="utf-8") as file:
raw = UnfoldedFactory.load(file).unwrap()
return raw

return Cmd.from_cmd(_action)


@click.command() # type: ignore[misc]
@click.option("--pipeline", type=click.Path(exists=True), required=True) # type: ignore[misc]
@click.argument("args", nargs=-1) # type: ignore[misc]
def submit_job(
pipeline: str,
args: FrozenList[str],
) -> NoReturn:
_queue_from_env = (
EnvVarPointer("MAKES_COMPUTE_ON_AWS_BATCH_QUEUE")
.get_value()
.map(
lambda m: m.map(QueueName)
.to_result()
.alt(
lambda _: Exception(
"Required env var: MAKES_COMPUTE_ON_AWS_BATCH_QUEUE"
)
)
)
)
_root = (
_decode_json(pipeline)
.map(decode.decode_raw_draft)
.map(lambda r: r.unwrap().unwrap())
)

def _sep(item: str) -> bool:
return item == "---"

_arg_groups = tuple(
list(g) for k, g in itertools.groupby(args, _sep) if not k
)
arg_groups = (
PureIterFactory.from_list(_arg_groups)
.map(lambda x: tuple(x))
.to_list()
)
drafts = _queue_from_env.bind(
lambda queue: _root.map(
lambda root: decode.decode_all_drafts(root, arg_groups, queue)
)
).map(
lambda t: (
t[0].map(
lambda r: r.alt(
lambda e: Exception(f"Invalid job draft i.e. {e}")
).unwrap()
),
t[1],
)
)
cmd: Cmd[None] = drafts.bind(
lambda d: new_client().bind(
lambda c: utils.extract_single(d[0]).map(
lambda j: actions.send_single_job(c, j, d[1]),
lambda p: actions.send_pipeline(c, p),
)
)
)
cmd.compute()


@click.group() # type: ignore[misc]
def main() -> None:
pass


main.add_command(submit_job)
17 changes: 17 additions & 0 deletions src/args/compute-on-aws-batch/batch-client/batch_client/_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from fa_purity import (
Cmd,
)
import logging
import sys


def setup_logger(name: str) -> Cmd[None]:
def _action() -> None:
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter("[%(levelname)s] %(message)s")
handler.setFormatter(formatter)
log = logging.getLogger(name)
log.addHandler(handler)
log.setLevel(logging.INFO)

return Cmd.from_cmd(_action)
78 changes: 78 additions & 0 deletions src/args/compute-on-aws-batch/batch-client/batch_client/actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from .api import (
ApiClient,
)
from batch_client.core import (
AllowDuplicates,
DependentJobDraft,
JobDependencies,
JobDraft,
JobId,
JobStatus,
)
from fa_purity import (
Cmd,
Maybe,
PureIter,
)
from fa_purity.cmd import (
CmdUnwrapper,
)
import logging

LOG = logging.getLogger(__name__)


def send_single_job(
client: ApiClient,
draft: JobDraft,
allow_duplicates: AllowDuplicates,
) -> Cmd[None]:
dup_msg = Cmd.from_cmd(lambda: LOG.info("Detecting duplicates..."))
skipped_msg = Cmd.from_cmd(
lambda: LOG.warning("Duplicated job detected. Skipping job submission")
)
allow_send = (
Cmd.from_cmd(lambda: LOG.warning("Duplicated jobs are allowed")).map(
lambda _: True
)
if allow_duplicates
else dup_msg
+ client.list_jobs(draft.name, draft.queue, JobStatus.RUNNABLE)
.find_first(lambda _: True)
.map(lambda m: m.map(lambda _: False).value_or(True))
)
return allow_send.bind(
lambda b: Cmd.from_cmd(
lambda: LOG.info("Submiting job: %s", draft.name.raw)
)
+ client.send_job(DependentJobDraft(draft, Maybe.empty())).bind(
lambda j: Cmd.from_cmd(
lambda: LOG.info("Job sent! id=%s arn=%s", j[0].raw, j[1].raw)
)
)
if b
else skipped_msg
)


def send_pipeline(
client: ApiClient, pipeline: PureIter[JobDraft]
) -> Cmd[None]:
def _action(unwrapper: CmdUnwrapper) -> None:
prev: Maybe[JobId] = Maybe.empty()
LOG.info("Submiting jobs pipeline...")
for draft in pipeline:
send = client.send_job(
DependentJobDraft(
draft,
prev.map(
lambda j: JobDependencies.new(frozenset([j])).unwrap()
),
)
)
LOG.info("Submiting job: %s", draft.name.raw)
sent_id = unwrapper.act(send)
LOG.info("Job sent! id=%s arn=%s", sent_id[0].raw, sent_id[1].raw)
prev = Maybe.from_value(sent_id[0])

return Cmd.new_cmd(_action)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from ._client_1 import (
new_client,
)
from ._core import (
ApiClient,
)

__all__ = [
"ApiClient",
"new_client",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from . import (
_list_jobs,
_send_job,
)
from batch_client.api._core import (
ApiClient,
)
import boto3
from fa_purity import (
Cmd,
)
from fa_purity.date_time import (
DatetimeFactory,
)
from mypy_boto3_batch.client import (
BatchClient,
)


def _new_batch_client() -> Cmd[BatchClient]:
def _action() -> BatchClient:
return boto3.client("batch")

return Cmd.from_cmd(_action)


def new_client() -> Cmd[ApiClient]:
return _new_batch_client().map(
lambda client: ApiClient(
lambda n, q, s: _list_jobs.list_jobs(client, n, q, s),
lambda j: _send_job.send_job(client, j),
)
)
Loading

0 comments on commit 65f9db1

Please sign in to comment.