diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..f53d18e0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +.vscode diff --git a/docs/src/api/builtins/deploy.md b/docs/src/api/builtins/deploy.md index b4ac8961..6a633df9 100644 --- a/docs/src/api/builtins/deploy.md +++ b/docs/src/api/builtins/deploy.md @@ -2,70 +2,81 @@ Submit a job to a [AWS BATCH](https://aws.amazon.com/batch/) queue. -Types: +When used as a Makes declaration (at makes.nix attrs): -- computeOnAwsBatch (`attrsOf jobType`): Optional. +- computeOnAwsBatch: `attrsOf JobType` (Optional Attr) Job groups to submit. Defaults to `{ }`. -- jobType (`submodule`): - - allowDuplicates (`bool`): Optional. + +Types: + +- computeOnAwsBatch: `JobType -> SourceAble` + Source able batch file to send jobs to aws batch. + +- `JobType` = `attrs` + - allowDuplicates: `bool` (Optional Attr) Set to `false` in order to prevent submitting the job if there is already a job in the queue with the same name. Defaults to `false`. - - attempts (`ints.positive`): Optional. + - attempts: `positiveInt` (Optional Attr) If the value of attempts is greater than one, the job is retried on failure the same number of attempts as the value. Defaults to `1`. - - attemptDurationSeconds (`ints.positive`): Optional. + - attemptDurationSeconds: `positiveInt` The time duration in seconds (measured from the job attempt's startedAt timestamp) after which Batch terminates your jobs if they have not finished. - - command (`listOf str`): + - command: `listOf str` The command to send to the container. It overrides the one specified in the Batch job definition. Additional arguments can be propagated when running this module output. - - definition (`str`): + - definition: `str` Name of the Batch job definition that we will use as base for submitting the job. In general an Batch job definition is required in order to specify which container image our job is going to run on. - - environment (`listOf str`): Optional. + - environment: `listOf str` (Optional Attr) Name of the environment variables whose names and values should be copied from the machine running Makes to the machine on Batch running the job. Defaults to `[ ]`. - - includePositionalArgsInName (`bool`): Optional. + - includePositionalArgsInName: `bool` (Optional Attr). Enable to make positional arguments part of the job name. This is useful for identifying jobs in the Batch console more easily. Defaults to `true`. - - memory (`ints.positive`): + - nextJob: `attrs` (Optional Attr) + The next job that will be executed after its parent finish. + You must provide a `name` attribute and all the required + attrs of `JobType`. + Defaults to `{ }`. + - memory: `positiveInt` Amount of memory, in MiB that is reserved for the job. - - parallel (`ints.positive`): Optional. + - parallel: `positiveInt` (Optional Attr) Number of parallel jobs to trigger using [Batch Array Jobs](https://docs.aws.amazon.com/batch/latest/userguide/array_jobs.html). - - propagateTags (`bool`): Optional. + Defaults to `1`. + - propagateTags: `bool` (Optional Attr) Enable tags to be propagated into the ECS tasks. Defaults to `true`. - - queue (`nullOr str`): + - queue: `nullOr str` Name of the Batch queue we should submit the job to. - It can be set to `null`, - causing Makes to read + If `null` then queue is fetch from the `MAKES_COMPUTE_ON_AWS_BATCH_QUEUE` environment variable at runtime. - - setup (`listOf package`): + - setup: `listOf SourceAble` [Makes Environment][makes_environment] or [Makes Secrets][makes_secrets] to `source` (as in Bash's `source`) before anything else. Defaults to `[ ]`. - - tags (`attrsOf str`): Optional. + - tags: `attrsOf str` (Optional Attr). Tags to apply to the batch job. Defaults to `{ }`. - - vcpus (`ints.positive`): + - vcpus: `positiveInt` Amount of virtual CPUs that is reserved for the job. Example: diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/__init__.py b/src/args/compute-on-aws-batch/batch-client/batch_client/__init__.py new file mode 100644 index 00000000..88eb2046 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/__init__.py @@ -0,0 +1,9 @@ +from ._logger import ( + setup_logger, +) +from fa_purity.cmd import ( + unsafe_unwrap, +) + +__version__ = "1.0.0" +unsafe_unwrap(setup_logger(__name__)) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/_cli.py b/src/args/compute-on-aws-batch/batch-client/batch_client/_cli.py new file mode 100644 index 00000000..e5310b98 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/_cli.py @@ -0,0 +1,107 @@ +from . import ( + actions, + decode, + utils, +) +from .api import ( + new_client, +) +from .core import ( + EnvVarPointer, + QueueName, +) +import click +from fa_purity import ( + Cmd, + FrozenList, +) +from fa_purity.json_2.value import ( + JsonObj, + UnfoldedFactory, +) +from fa_purity.pure_iter import ( + PureIterFactory, +) +import itertools +from typing import ( + NoReturn, +) + + +def _decode_json(file_path: str) -> Cmd[JsonObj]: + def _action() -> JsonObj: + with open(file_path, "r", encoding="utf-8") as file: + raw = UnfoldedFactory.load(file).unwrap() + return raw + + return Cmd.from_cmd(_action) + + +@click.command() # type: ignore[misc] +@click.option("--pipeline", type=click.Path(exists=True), required=True) # type: ignore[misc] +@click.argument("args", nargs=-1) # type: ignore[misc] +def submit_job( + pipeline: str, + args: FrozenList[str], +) -> NoReturn: + _queue_from_env = ( + EnvVarPointer("MAKES_COMPUTE_ON_AWS_BATCH_QUEUE") + .get_value() + .map( + lambda m: m.map(QueueName) + .to_result() + .alt( + lambda _: Exception( + "Required env var: MAKES_COMPUTE_ON_AWS_BATCH_QUEUE" + ) + ) + ) + ) + _root = ( + _decode_json(pipeline) + .map(decode.decode_raw_draft) + .map(lambda r: r.unwrap().unwrap()) + ) + + def _sep(item: str) -> bool: + return item == "---" + + _arg_groups = tuple( + list(g) for k, g in itertools.groupby(args, _sep) if not k + ) + arg_groups = ( + PureIterFactory.from_list(_arg_groups) + .map(lambda x: tuple(x)) + .to_list() + ) + drafts = _queue_from_env.bind( + lambda queue: _root.map( + lambda root: decode.decode_all_drafts(root, arg_groups, queue) + ) + ).map( + lambda t: ( + t[0].map( + lambda r: r.alt( + lambda e: Exception(f"Invalid job draft i.e. {e}") + ).unwrap() + ), + t[1], + ) + ) + cmd: Cmd[None] = drafts.bind( + lambda d: new_client().bind( + lambda c: utils.extract_single(d[0]).map( + lambda j: actions.send_single_job(c, j, d[1]), + lambda p: actions.send_pipeline(c, p), + ) + ) + ) + cmd.compute() + + +@click.group() # type: ignore[misc] +def main() -> None: + pass + + +main.add_command(submit_job) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/_logger.py b/src/args/compute-on-aws-batch/batch-client/batch_client/_logger.py new file mode 100644 index 00000000..4a56a2f7 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/_logger.py @@ -0,0 +1,17 @@ +from fa_purity import ( + Cmd, +) +import logging +import sys + + +def setup_logger(name: str) -> Cmd[None]: + def _action() -> None: + handler = logging.StreamHandler(sys.stderr) + formatter = logging.Formatter("[%(levelname)s] %(message)s") + handler.setFormatter(formatter) + log = logging.getLogger(name) + log.addHandler(handler) + log.setLevel(logging.INFO) + + return Cmd.from_cmd(_action) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/actions.py b/src/args/compute-on-aws-batch/batch-client/batch_client/actions.py new file mode 100644 index 00000000..9dbc36be --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/actions.py @@ -0,0 +1,78 @@ +from .api import ( + ApiClient, +) +from batch_client.core import ( + AllowDuplicates, + DependentJobDraft, + JobDependencies, + JobDraft, + JobId, + JobStatus, +) +from fa_purity import ( + Cmd, + Maybe, + PureIter, +) +from fa_purity.cmd import ( + CmdUnwrapper, +) +import logging + +LOG = logging.getLogger(__name__) + + +def send_single_job( + client: ApiClient, + draft: JobDraft, + allow_duplicates: AllowDuplicates, +) -> Cmd[None]: + dup_msg = Cmd.from_cmd(lambda: LOG.info("Detecting duplicates...")) + skipped_msg = Cmd.from_cmd( + lambda: LOG.warning("Duplicated job detected. Skipping job submission") + ) + allow_send = ( + Cmd.from_cmd(lambda: LOG.warning("Duplicated jobs are allowed")).map( + lambda _: True + ) + if allow_duplicates + else dup_msg + + client.list_jobs(draft.name, draft.queue, JobStatus.RUNNABLE) + .find_first(lambda _: True) + .map(lambda m: m.map(lambda _: False).value_or(True)) + ) + return allow_send.bind( + lambda b: Cmd.from_cmd( + lambda: LOG.info("Submiting job: %s", draft.name.raw) + ) + + client.send_job(DependentJobDraft(draft, Maybe.empty())).bind( + lambda j: Cmd.from_cmd( + lambda: LOG.info("Job sent! id=%s arn=%s", j[0].raw, j[1].raw) + ) + ) + if b + else skipped_msg + ) + + +def send_pipeline( + client: ApiClient, pipeline: PureIter[JobDraft] +) -> Cmd[None]: + def _action(unwrapper: CmdUnwrapper) -> None: + prev: Maybe[JobId] = Maybe.empty() + LOG.info("Submiting jobs pipeline...") + for draft in pipeline: + send = client.send_job( + DependentJobDraft( + draft, + prev.map( + lambda j: JobDependencies.new(frozenset([j])).unwrap() + ), + ) + ) + LOG.info("Submiting job: %s", draft.name.raw) + sent_id = unwrapper.act(send) + LOG.info("Job sent! id=%s arn=%s", sent_id[0].raw, sent_id[1].raw) + prev = Maybe.from_value(sent_id[0]) + + return Cmd.new_cmd(_action) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/api/__init__.py b/src/args/compute-on-aws-batch/batch-client/batch_client/api/__init__.py new file mode 100644 index 00000000..72291cbc --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/api/__init__.py @@ -0,0 +1,11 @@ +from ._client_1 import ( + new_client, +) +from ._core import ( + ApiClient, +) + +__all__ = [ + "ApiClient", + "new_client", +] diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/__init__.py b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/__init__.py new file mode 100644 index 00000000..7d585db9 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/__init__.py @@ -0,0 +1,33 @@ +from . import ( + _list_jobs, + _send_job, +) +from batch_client.api._core import ( + ApiClient, +) +import boto3 +from fa_purity import ( + Cmd, +) +from fa_purity.date_time import ( + DatetimeFactory, +) +from mypy_boto3_batch.client import ( + BatchClient, +) + + +def _new_batch_client() -> Cmd[BatchClient]: + def _action() -> BatchClient: + return boto3.client("batch") + + return Cmd.from_cmd(_action) + + +def new_client() -> Cmd[ApiClient]: + return _new_batch_client().map( + lambda client: ApiClient( + lambda n, q, s: _list_jobs.list_jobs(client, n, q, s), + lambda j: _send_job.send_job(client, j), + ) + ) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_list_jobs.py b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_list_jobs.py new file mode 100644 index 00000000..d9416e08 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_list_jobs.py @@ -0,0 +1,136 @@ +from batch_client import ( + utils, +) +from batch_client.core import ( + BatchJob, + BatchJobObj, + JobArn, + JobId, + JobName, + JobStatus, + QueueName, +) +from dataclasses import ( + dataclass, +) +from fa_purity import ( + Cmd, + FrozenList, + Maybe, + ResultE, + Stream, +) +from fa_purity.date_time import ( + DatetimeUTC, +) +from fa_purity.pure_iter import ( + PureIterFactory, +) +from fa_purity.result.transform import ( + all_ok, +) +from fa_purity.stream import ( + StreamFactory, + StreamTransform, +) +from fa_purity.utils import ( + raise_exception, +) +from mypy_boto3_batch.client import ( + BatchClient, +) +from mypy_boto3_batch.type_defs import ( + JobSummaryTypeDef, + KeyValuesPairTypeDef, + ListJobsResponseTypeDef, +) + + +def _decode_job(raw: JobSummaryTypeDef) -> ResultE[BatchJob]: + def _inner() -> ResultE[BatchJob]: + return JobStatus.to_status(raw["status"]).map( + lambda status: BatchJob( + raw["createdAt"], + status, + Maybe.from_optional(raw.get("statusReason")), + Maybe.from_optional(raw.get("startedAt")), + Maybe.from_optional(raw.get("stoppedAt")), + ) + ) + + return utils.handle_key_error(_inner).bind(lambda x: x) + + +def _decode_job_obj( + raw: JobSummaryTypeDef, +) -> ResultE[BatchJobObj]: + def _inner() -> ResultE[BatchJobObj]: + _arn = JobArn(raw["jobArn"]) + _id = JobId(raw["jobId"]) + _name = JobName.new(raw["jobName"]) + return _name.bind( + lambda name: _decode_job(raw).map( + lambda j: BatchJobObj(_id, _arn, name, j) + ) + ) + + return utils.handle_key_error(_inner).bind(lambda x: x) + + +@dataclass +class JobsPage: + items: FrozenList[BatchJobObj] + next_item: Maybe[str] + + +def _decode_respose(response: ListJobsResponseTypeDef) -> ResultE[JobsPage]: + def _inner() -> ResultE[JobsPage]: + items = PureIterFactory.from_list(response["jobSummaryList"]).map( + _decode_job_obj + ) + _next = Maybe.from_optional(response.get("nextToken")) + return all_ok(items.to_list()).map(lambda i: JobsPage(i, _next)) + + return utils.handle_key_error(_inner).bind(lambda x: x) + + +def _list_jobs_page( + client: BatchClient, + queue: QueueName, + name: JobName, + _next: Maybe[str], +) -> Cmd[JobsPage]: + def _action() -> JobsPage: + _filter: FrozenList[KeyValuesPairTypeDef] = ( + {"name": "JOB_NAME", "values": [name.raw]}, + ) + result = _next.map( + lambda n: client.list_jobs( + jobQueue=queue.raw, filters=_filter, nextToken=n + ) + ).or_else_call( + lambda: client.list_jobs(jobQueue=queue.raw, filters=_filter) + ) + return _decode_respose(result).alt(raise_exception).unwrap() + + return Cmd.from_cmd(_action) + + +def list_jobs( + client: BatchClient, + name: JobName, + queue: QueueName, + status: JobStatus, +) -> Stream[BatchJobObj]: + def _extract(page: JobsPage) -> Maybe[Maybe[str]]: + return page.next_item.map(lambda n: Maybe.from_value(n)) + + def _cmd(index: Maybe[str]) -> Cmd[JobsPage]: + return _list_jobs_page(client, queue, name, index) + + return ( + StreamFactory.generate(_cmd, _extract, Maybe.empty(str)) + .map(lambda j: PureIterFactory.from_list(j.items)) + .transform(lambda x: StreamTransform.chain(x)) + .filter(lambda j: j.job.status == status) + ) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_send_job.py b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_send_job.py new file mode 100644 index 00000000..86635150 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_client_1/_send_job.py @@ -0,0 +1,129 @@ +from batch_client import ( + utils, +) +from batch_client.core import ( + DependentJobDraft, + JobArn, + JobDependencies, + JobDraft, + JobId, + Manifest, + ResourceRequirement, +) +from fa_purity import ( + Cmd, + FrozenList, +) +from fa_purity.cmd import ( + CmdUnwrapper, +) +from fa_purity.pure_iter import ( + PureIterFactory, +) +import logging +from mypy_boto3_batch.client import ( + BatchClient, +) +from mypy_boto3_batch.type_defs import ( + JobDependencyTypeDef, + KeyValuePairTypeDef, + ResourceRequirementTypeDef, + SubmitJobResponseTypeDef, +) +from typing import ( + Dict, + List, + Tuple, +) + +LOG = logging.getLogger(__name__) + + +def _encode_req(req: ResourceRequirement) -> ResourceRequirementTypeDef: + return { + "type": req.resource.value, + "value": utils.int_to_str(req.value.to_int), + } + + +def _to_pair(key: str, val: str) -> KeyValuePairTypeDef: + return { + "name": key, + "value": val, + } + + +def _get_envs(manifest: Manifest) -> Cmd[FrozenList[KeyValuePairTypeDef]]: + def _action(unwrapper: CmdUnwrapper) -> FrozenList[KeyValuePairTypeDef]: + return ( + PureIterFactory.from_list(tuple(manifest.environment.items())) + .map( + lambda t: _to_pair( + t[0], + t[1] + if isinstance(t[1], str) + else unwrapper.act(t[1].get_value()).value_or(""), + ), + ) + .to_list() + ) + + return Cmd.new_cmd(_action) + + +def _decode_respose( + response: SubmitJobResponseTypeDef, +) -> Tuple[JobId, JobArn]: + return (JobId(response["jobId"]), JobArn(response["jobArn"])) + + +def _encode_deps(deps: JobDependencies) -> List[JobDependencyTypeDef]: + def _transform(_id: JobId) -> JobDependencyTypeDef: + return {"jobId": _id.raw} + + result = ( + PureIterFactory.from_list(tuple(deps.items)).map(_transform).to_list() + ) + return list(result) + + +def send_job( + client: BatchClient, full_draft: DependentJobDraft +) -> Cmd[Tuple[JobId, JobArn]]: + draft = full_draft.draft + + def _action(unwrapper: CmdUnwrapper) -> Tuple[JobId, JobArn]: + env = unwrapper.act(_get_envs(draft.manifest)) + response = ( + client.submit_job( # pylint: disable=assignment-from-no-return + arrayProperties={"size": draft.parallel.size.to_int} + if draft.parallel.size.to_int > 1 + else {}, + containerOverrides={ + "command": draft.command.raw, + "environment": env, + "resourceRequirements": PureIterFactory.from_list( + draft.manifest.resources + ) + .map(_encode_req) + .to_list(), + }, + jobDefinition=draft.job_def.raw, + jobName=draft.name.raw, + jobQueue=draft.queue.raw, + retryStrategy={ + "attempts": draft.retries.maximum.to_int, + }, + tags=dict(draft.tags.items), + timeout={ + "attemptDurationSeconds": draft.timeout.seconds.to_int + }, + propagateTags=draft.propagate_tags, + dependsOn=full_draft.dependencies.map(_encode_deps).value_or( + [] + ), + ) + ) + return _decode_respose(response) + + return Cmd.new_cmd(_action) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/api/_core.py b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_core.py new file mode 100644 index 00000000..8da8383b --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/api/_core.py @@ -0,0 +1,28 @@ +from batch_client.core import ( + BatchJobObj, + DependentJobDraft, + JobArn, + JobId, + JobName, + JobStatus, + QueueName, +) +from collections.abc import ( + Callable, +) +from dataclasses import ( + dataclass, +) +from fa_purity import ( + Cmd, + Stream, +) +from typing import ( + Tuple, +) + + +@dataclass(frozen=True) +class ApiClient: + list_jobs: Callable[[JobName, QueueName, JobStatus], Stream[BatchJobObj]] + send_job: Callable[[DependentJobDraft], Cmd[Tuple[JobId, JobArn]]] diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/core.py b/src/args/compute-on-aws-batch/batch-client/batch_client/core.py new file mode 100644 index 00000000..7ce0578d --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/core.py @@ -0,0 +1,268 @@ +from __future__ import ( + annotations, +) + +from batch_client import ( + utils, +) +from batch_client.utils import ( + Natural, +) +from dataclasses import ( + dataclass, + field, +) +from enum import ( + Enum, +) +from fa_purity import ( + Cmd, + FrozenDict, + FrozenList, + Maybe, + Result, + ResultE, +) +from fa_purity.pure_iter import ( + PureIterFactory, + PureIterTransform, +) +import os +from typing import ( + FrozenSet, +) + + +@dataclass(frozen=True) +class _Private: + pass + + +@dataclass(frozen=True) +class QueueName: + raw: str + + +@dataclass(frozen=True) +class Attempts: + _private: _Private = field(repr=False, hash=False, compare=False) + maximum: Natural + + @staticmethod + def new(raw: Natural) -> ResultE[Attempts]: + if raw.to_int <= 10: + return Result.success(Attempts(_Private(), raw)) + err = ValueError("Attempts must be a Natural <= 10") + return Result.failure(Exception(err)) + + +@dataclass(frozen=True) +class Timeout: + _private: _Private = field(repr=False, hash=False, compare=False) + seconds: Natural + + @staticmethod + def new(raw: Natural) -> ResultE[Timeout]: + if raw.to_int >= 60: + return Result.success(Timeout(_Private(), raw)) + err = ValueError("Timeout must be a Natural >= 60") + return Result.failure(Exception(err)) + + +@dataclass(frozen=True) +class Command: + raw: FrozenList[str] + + +@dataclass(frozen=True) +class JobDefinition: + raw: str + + +@dataclass(frozen=True) +class EnvVarPointer: + name: str + + def get_value(self) -> Cmd[Maybe[str]]: + return Cmd.from_cmd( + lambda: Maybe.from_optional(os.environ.get(self.name)) + ) + + +class ResourceType(Enum): + VCPU = "VCPU" + MEMORY = "MEMORY" + + @staticmethod + def to_req_type(raw: str) -> ResultE[ResourceType]: + return utils.handle_value_error(lambda: ResourceType(raw.upper())) + + +@dataclass(frozen=True) +class ResourceRequirement: + resource: ResourceType + value: Natural + + +@dataclass(frozen=True) +class Tags: + items: FrozenDict[str, str] + + +@dataclass(frozen=True) +class Manifest: + environment: FrozenDict[str, str | EnvVarPointer] + resources: FrozenList[ResourceRequirement] + + +@dataclass(frozen=True) +class JobSize: + size: Natural + + @staticmethod + def new(raw: Natural) -> ResultE[Attempts]: + if raw.to_int >= 1 and raw.to_int <= 10000: + return Result.success(Attempts(_Private(), raw)) + err = ValueError("JobSize must be a Natural between 1 and 10000") + return Result.failure(Exception(err)) + + +class JobStatus(Enum): + SUBMITTED = "SUBMITTED" + PENDING = "PENDING" + RUNNABLE = "RUNNABLE" + STARTING = "STARTING" + RUNNING = "RUNNING" + SUCCEEDED = "SUCCEEDED" + FAILED = "FAILED" + + @staticmethod + def to_status(raw: str) -> ResultE[JobStatus]: + return utils.handle_value_error(lambda: JobStatus(raw.upper())) + + +@dataclass(frozen=True) +class JobName: + _private: _Private = field(repr=False, hash=False, compare=False) + raw: str + + @staticmethod + def new(raw: str) -> ResultE[JobName]: + def _check(index: int, char: str) -> bool: + if index == 1: + return char.isalnum() + return char.isalnum() or char in ["_", "-"] + + validation = ( + PureIterFactory.from_list(tuple(raw)) + .enumerate(1) + .map(lambda t: _check(*t)) + ) + if len(raw) <= 128 and all(validation): + return Result.success(JobName(_Private(), raw)) + err = ValueError("JobName does not fulfill naming rules") + return Result.failure(Exception(err)) + + @staticmethod + def normalize(raw: str) -> JobName: + def _normalize(index: int, char: str) -> str: + if index == 1 and not char.isalnum(): + return "X" + if char.isalnum() or char in ["_"]: + return char + else: + return "-" + + text = ( + PureIterFactory.from_list(tuple(raw)) + .enumerate(1) + .map(lambda t: (t[0], _normalize(*t))) + ) + truncated = PureIterTransform.until_none( + text.map(lambda t: t[1] if t[0] <= 128 else None) + ) + return JobName(_Private(), "".join(truncated)) + + +@dataclass(frozen=True) +class JobId: + raw: str + + +@dataclass(frozen=True) +class JobArn: + raw: str + + +@dataclass(frozen=True) +class JobDependencies: + _private: _Private = field(repr=False, hash=False, compare=False) + items: FrozenSet[JobId] + + @staticmethod + def new(items: FrozenSet[JobId]) -> ResultE[JobDependencies]: + if len(items) >= 1 and len(items) <= 20: + return Result.success(JobDependencies(_Private(), items)) + err = ValueError("The maximun number of dependencies for a job is 20") + return Result.failure(Exception(err)) + + +@dataclass(frozen=True) +class BatchJob: + created_at: int + status: JobStatus + status_reason: Maybe[str] + started_at: Maybe[int] + stoped_at: Maybe[int] + + +@dataclass(frozen=True) +class BatchJobObj: + job_id: JobId + arn: JobArn + name: JobName + job: BatchJob + + +@dataclass(frozen=True) +class RawJobDraft: + name: str + queue: Maybe[str] + command: FrozenList[str] + job_def: str + retries: int + timeout: int + env: FrozenList[FrozenDict[str, str]] + memory: int + vcpus: int + size: int + tags: FrozenDict[str, str] + allow_duplicates: bool + args_in_name: bool + propagate_tags: bool + next_job: Maybe[RawJobDraft] + + +@dataclass(frozen=True) +class JobDraft: + name: JobName + queue: QueueName + parallel: JobSize + job_def: JobDefinition + retries: Attempts + timeout: Timeout + command: Command + manifest: Manifest + tags: Tags + propagate_tags: bool + + +@dataclass(frozen=True) +class DependentJobDraft: + draft: JobDraft + dependencies: Maybe[JobDependencies] + + +@dataclass(frozen=True) +class AllowDuplicates: + value: bool diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/decode.py b/src/args/compute-on-aws-batch/batch-client/batch_client/decode.py new file mode 100644 index 00000000..fa0377b0 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/decode.py @@ -0,0 +1,237 @@ +from __future__ import ( + annotations, +) + +from batch_client import ( + utils, +) +from batch_client.core import ( + AllowDuplicates, + Attempts, + Command, + EnvVarPointer, + JobDefinition, + JobDraft, + JobName, + JobSize, + Manifest, + QueueName, + RawJobDraft, + ResourceRequirement, + ResourceType, + Tags, + Timeout, +) +from batch_client.utils import ( + Natural, + ResultUnwrapper, +) +from collections.abc import ( + Callable, +) +from fa_purity import ( + FrozenDict, + FrozenList, + Maybe, + PureIter, + Result, + ResultE, +) +from fa_purity.json_2 import ( + JsonObj, + JsonPrimitiveUnfolder, + JsonUnfolder, + JsonValue, + Unfolder, +) +from fa_purity.pure_iter import ( + PureIterFactory, + PureIterTransform, +) +from fa_purity.result import ( + ResultFactory, +) +from fa_purity.result.transform import ( + all_ok, +) +from typing import ( + Tuple, + TypeVar, +) + +_T = TypeVar("_T") + + +def _require( + item: JsonObj, key: str, transform: Callable[[JsonValue], ResultE[_T]] +) -> ResultE[_T]: + return JsonUnfolder.require(item, key, transform).alt( + lambda e: Exception(f"Error at key `{key}` i.e. {e}") + ) + + +def _to_str(raw: JsonValue) -> ResultE[str]: + return Unfolder.to_primitive(raw).bind(JsonPrimitiveUnfolder.to_str) + + +def _to_int(raw: JsonValue) -> ResultE[int]: + return Unfolder.to_primitive(raw).bind(JsonPrimitiveUnfolder.to_int) + + +def _to_bool(raw: JsonValue) -> ResultE[bool]: + return Unfolder.to_primitive(raw).bind(JsonPrimitiveUnfolder.to_bool) + + +def _decode_env_pair( + raw: FrozenDict[str, str] +) -> ResultE[Tuple[str, str | EnvVarPointer]]: + name = ( + Maybe.from_optional(raw.get("name")) + .to_result() + .alt(lambda _: KeyError("name")) + .alt(Exception) + ) + value = Maybe.from_optional(raw.get("value")) + return name.map(lambda n: (n, value.value_or(EnvVarPointer(n)))) + + +def _decode_envs( + raw: FrozenList[FrozenDict[str, str]], +) -> ResultE[FrozenDict[str, str | EnvVarPointer]]: + _decoded = PureIterFactory.from_list(raw).map(_decode_env_pair) + return all_ok(_decoded.to_list()).map(lambda i: FrozenDict(dict(i))) + + +def _decode_raw_draft(raw: JsonObj, unwrapper: ResultUnwrapper) -> RawJobDraft: + _name = _require(raw, "name", _to_str) + _queue = _require( + raw, + "queue", + lambda j: Unfolder.to_primitive(j).bind( + JsonPrimitiveUnfolder.to_opt_str + ), + ).map(lambda x: Maybe.from_optional(x)) + _command = _require( + raw, "command", lambda e: Unfolder.to_list_of(e, _to_str) + ) + _job_def = _require(raw, "definition", _to_str) + _retries = _require(raw, "attempts", _to_int) + _timeout = _require(raw, "attemptDurationSeconds", _to_int) + _env = _require( + raw, + "environment", + lambda e: Unfolder.to_list_of( + e, lambda j: Unfolder.to_dict_of(j, _to_str) + ), + ) + _memory = _require(raw, "memory", _to_int) + _vcpus = _require(raw, "vcpus", _to_int) + _size = _require(raw, "parallel", _to_int) + _tags = _require(raw, "tags", lambda j: Unfolder.to_dict_of(j, _to_str)) + _allow_duplicates = _require(raw, "allowDuplicates", _to_bool) + _args_in_name = _require(raw, "includePositionalArgsInName", _to_bool) + _propagate_tags = _require(raw, "propagateTags", _to_bool) + _next = _require(raw, "nextJob", Unfolder.to_json).bind(decode_raw_draft) + return RawJobDraft( + unwrapper.unwrap(_name), + unwrapper.unwrap(_queue), + unwrapper.unwrap(_command), + unwrapper.unwrap(_job_def), + unwrapper.unwrap(_retries), + unwrapper.unwrap(_timeout), + unwrapper.unwrap(_env), + unwrapper.unwrap(_memory), + unwrapper.unwrap(_vcpus), + unwrapper.unwrap(_size), + unwrapper.unwrap(_tags), + unwrapper.unwrap(_allow_duplicates), + unwrapper.unwrap(_args_in_name), + unwrapper.unwrap(_propagate_tags), + unwrapper.unwrap(_next), + ) + + +def decode_raw_draft(raw: JsonObj) -> ResultE[Maybe[RawJobDraft]]: + if raw == FrozenDict({}): + return Result.success(Maybe.empty()) + return ( + utils.unwrap_context(lambda u: _decode_raw_draft(raw, u)) + .map(lambda x: Maybe.from_value(x)) + .alt( + lambda e: Exception( + f"decode_raw_draft failed i.e. {e} with data {Unfolder.dumps(raw)}" + ) + ) + ) + + +def _from_raw_draft( + raw: RawJobDraft, args: FrozenList[str], queue_from_env: ResultE[QueueName] +) -> ResultE[JobDraft]: + name = JobName.normalize( + raw.name + ("-" if len(args) > 0 else "") + "-".join(args) + if raw.args_in_name + else raw.name + ) + _factory: ResultFactory[QueueName, Exception] = ResultFactory() + _queue = raw.queue.map( + lambda q: _factory.success(QueueName(q)) + if len(q) > 0 + else queue_from_env + ).value_or(queue_from_env) + size = JobSize(Natural.abs(raw.size)) + job_def = JobDefinition(raw.job_def) + _attempts = Attempts.new(Natural.abs(raw.retries)) + _timeout = Timeout.new(Natural.abs(raw.timeout)) + command = Command(raw.command + args) + reqs = ( + ResourceRequirement(ResourceType.MEMORY, Natural.abs(raw.memory)), + ResourceRequirement(ResourceType.VCPU, Natural.abs(raw.vcpus)), + ) + _manifest = _decode_envs(raw.env).map( + lambda envs: Manifest( + envs, + reqs, + ) + ) + + def _context(unwrapper: ResultUnwrapper) -> JobDraft: + return JobDraft( + name, + unwrapper.unwrap(_queue), + size, + job_def, + unwrapper.unwrap(_attempts), + unwrapper.unwrap(_timeout), + command, + unwrapper.unwrap(_manifest), + Tags(raw.tags), + raw.propagate_tags, + ) + + return utils.unwrap_context(_context) + + +def _raw_jobs(raw: RawJobDraft) -> PureIter[RawJobDraft]: + return PureIterFactory.infinite_gen( + lambda m: m.bind(lambda r: r.next_job), Maybe.from_value(raw) + ).transform(lambda i: PureIterTransform.until_empty(i)) + + +def decode_all_drafts( + root: RawJobDraft, + args: FrozenList[FrozenList[str]], + queue_from_env: ResultE[QueueName], +) -> Tuple[PureIter[ResultE[JobDraft]], AllowDuplicates]: + items = ( + _raw_jobs(root) + .enumerate(0) + .map( + lambda t: _from_raw_draft( + t[1], + utils.get_index(args, t[0]).value_or(tuple([])), + queue_from_env, + ) + ) + ) + return (items, AllowDuplicates(root.allow_duplicates)) diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/py.typed b/src/args/compute-on-aws-batch/batch-client/batch_client/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/src/args/compute-on-aws-batch/batch-client/batch_client/utils.py b/src/args/compute-on-aws-batch/batch-client/batch_client/utils.py new file mode 100644 index 00000000..cf0ff0a4 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/batch_client/utils.py @@ -0,0 +1,151 @@ +from __future__ import ( + annotations, +) + +from collections.abc import ( + Callable, +) +from dataclasses import ( + dataclass, + field, +) +from fa_purity import ( + FrozenList, + Maybe, + PureIter, + Result, + result, + ResultE, +) +from fa_purity.pure_iter import ( + PureIterTransform, +) +from fa_purity.union import ( + Coproduct, + CoproductFactory, +) +from fa_purity.utils import ( + raise_exception, +) +from typing import ( + Generic, + NoReturn, + TypeVar, +) + +_T = TypeVar("_T") +_S = TypeVar("_S") +_F = TypeVar("_F") + + +@dataclass(frozen=True) +class _Private: + pass + + +@dataclass(frozen=True) +class LibraryBug(Exception): + traceback: Exception + + def __str__(self) -> str: + return f"If raised then there is a bug in the `batch_client` library" + + +def str_to_int(raw: str) -> ResultE[int]: + try: + return Result.success(int(raw)) + except ValueError as err: + return Result.failure(Exception(err)) + + +def int_to_str(item: int) -> str: + return str(item) + + +def handle_value_error(transform: Callable[[], _T | NoReturn]) -> ResultE[_T]: + try: + return Result.success(transform()) + except ValueError as err: + return Result.failure(Exception(err)) + + +def handle_key_error(transform: Callable[[], _T | NoReturn]) -> ResultE[_T]: + try: + return Result.success(transform()) + except KeyError as err: + return Result.failure(Exception(err)) + + +def handle_index_error(transform: Callable[[], _T | NoReturn]) -> ResultE[_T]: + try: + return Result.success(transform()) + except IndexError as err: + return Result.failure(Exception(err)) + + +def get_index(items: FrozenList[_T], index: int) -> Maybe[_T]: + return Maybe.from_result( + handle_index_error(lambda: items[index]).alt(lambda _: None) + ) + + +@dataclass(frozen=True) +class Natural: + _private: _Private = field(repr=False, hash=False, compare=False) + to_int: int + + @staticmethod + def assert_natural(raw: int) -> ResultE[Natural]: + if raw >= 0: + return Result.success(Natural(_Private(), raw)) + err = ValueError("The supplied integer is not a natural number") + return Result.failure(Exception(err)) + + @classmethod + def abs(cls, raw: int) -> Natural: + return ( + cls.assert_natural(abs(raw)) + .alt(LibraryBug) + .alt(raise_exception) + .unwrap() + ) + + +def extract_single(items: PureIter[_T]) -> Coproduct[_T, PureIter[_T]]: + _factory: CoproductFactory[_T, PureIter[_T]] = CoproductFactory() + single_element = ( + items.enumerate(1) + .find_first(lambda t: t[0] >= 2) + .map(lambda _: False) + .value_or(True) + ) + if single_element: + return _factory.inl( + items.enumerate(1) + .find_first(lambda t: t[0] == 1) + .to_result() + .alt(lambda _: LibraryBug(Exception("no first element"))) + .unwrap()[1] + ) + return _factory.inr(items) + + +class HandledException(Exception): + pass + + +@dataclass(frozen=True) +class ResultUnwrapper: + _inner: _Private = field(repr=False, hash=False, compare=False) + + def unwrap(self, item: Result[_S, _F]) -> _S: + return item.alt(HandledException).alt(raise_exception).unwrap() + + +def unwrap_context( + unwrap_block: Callable[[ResultUnwrapper], _T] +) -> ResultE[_T]: + try: + return Result.success(unwrap_block(ResultUnwrapper(_Private()))) + except HandledException as err: + return Result.failure(Exception(err)) diff --git a/src/args/compute-on-aws-batch/batch-client/build/default.nix b/src/args/compute-on-aws-batch/batch-client/build/default.nix new file mode 100644 index 00000000..2895ecb4 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/default.nix @@ -0,0 +1,32 @@ +{ + makePythonPyprojectPackage, + nixpkgs, + python_version, + src, +}: let + deps = import ./deps { + inherit nixpkgs python_version; + }; + pkgDeps = { + runtime_deps = with deps.python_pkgs; [ + boto3 + click + fa-purity + pathos + mypy-boto3-batch + types-boto3 + ]; + build_deps = with deps.python_pkgs; [flit-core]; + test_deps = with deps.python_pkgs; [ + arch-lint + mypy + pylint + pytest + ]; + }; + packages = makePythonPyprojectPackage { + inherit (deps.lib) buildEnv buildPythonPackage; + inherit pkgDeps src; + }; +in + packages diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/arch_lint.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/arch_lint.nix new file mode 100644 index 00000000..ed6ad0e4 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/arch_lint.nix @@ -0,0 +1,13 @@ +{ + nixpkgs, + python_version, +}: let + src = builtins.fetchGit { + url = "https://gitlab.com/dmurciaatfluid/arch_lint"; + rev = "72a495bb933f052ad812292b468ca3e18fd9dde4"; + ref = "refs/tags/2.4.0+1"; + }; +in + import "${src}/build" { + inherit nixpkgs python_version src; + } diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/batch-stubs.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/batch-stubs.nix new file mode 100644 index 00000000..6a950258 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/batch-stubs.nix @@ -0,0 +1,14 @@ +{ + lib, + python_pkgs, +}: +lib.buildPythonPackage rec { + pname = "mypy-boto3-batch"; + version = "1.28.36"; + src = lib.fetchPypi { + inherit pname version; + sha256 = "SEDD3Fjd4y337atj+RVUKIvpUd0oCvje8gOF1/Rg7Gs="; + }; + nativeBuildInputs = with python_pkgs; [boto3]; + propagatedBuildInputs = with python_pkgs; [botocore typing-extensions]; +} diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/stubs.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/stubs.nix new file mode 100644 index 00000000..2e90d968 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/boto3/stubs.nix @@ -0,0 +1,32 @@ +{ + lib, + python_pkgs, +}: let + botocore-stubs = lib.buildPythonPackage rec { + pname = "botocore-stubs"; + version = "1.26.3"; + src = lib.fetchPypi { + inherit pname version; + sha256 = "k7aYzFM9eh1kYxLBdeM+9jr2SCHxrmDl6AWEdOmu0ZU="; + }; + propagatedBuildInputs = [python_pkgs.typing-extensions]; + }; + boto3-stubs = lib.buildPythonPackage rec { + pname = "boto3-stubs"; + version = "1.22.10"; + src = lib.fetchPypi { + inherit pname version; + sha256 = "P3d9/x6lABlPx5vCGICtb9yuowDsiB4vr/c8rdYlew4="; + }; + propagatedBuildInputs = [botocore-stubs]; + }; +in + lib.buildPythonPackage rec { + pname = "types-boto3"; + version = "1.0.2"; + src = lib.fetchPypi { + inherit pname version; + sha256 = "FfP/rQMU5AoHCP7CX5SJFBT5MmAgJCK/ixm2kThTyYM="; + }; + propagatedBuildInputs = [boto3-stubs]; + } diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/default.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/default.nix new file mode 100644 index 00000000..e5e0b0c1 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/default.nix @@ -0,0 +1,39 @@ +{ + nixpkgs, + python_version, +}: let + lib = { + buildEnv = nixpkgs."${python_version}".buildEnv.override; + inherit (nixpkgs."${python_version}".pkgs) buildPythonPackage; + inherit (nixpkgs.python3Packages) fetchPypi; + }; + + utils = import ./override_utils.nix; + pkgs_overrides = override: python_pkgs: builtins.mapAttrs (_: override python_pkgs) python_pkgs; + + arch-lint = import ./arch_lint.nix {inherit nixpkgs python_version;}; + fa-purity = let + core = import ./fa_purity.nix {inherit nixpkgs python_version;}; + in { + "${python_version}" = core; + }; + + layer_1 = python_pkgs: + python_pkgs + // { + arch-lint = arch-lint.pkg; + fa-purity = fa-purity."${python_version}".pkg; + mypy-boto3-batch = import ./boto3/batch-stubs.nix {inherit lib python_pkgs;}; + types-boto3 = import ./boto3/stubs.nix {inherit lib python_pkgs;}; + }; + + fa_purity_override = python_pkgs: utils.replace_pkg ["fa_purity"] python_pkgs.fa-purity; + overrides = map pkgs_overrides [ + fa_purity_override + (_: utils.no_check_override) + ]; + + python_pkgs = utils.compose ([layer_1] ++ overrides) nixpkgs."${python_version}Packages"; +in { + inherit lib python_pkgs; +} diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/fa_purity.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/fa_purity.nix new file mode 100644 index 00000000..9c46d34d --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/fa_purity.nix @@ -0,0 +1,13 @@ +{ + nixpkgs, + python_version, +}: let + src = builtins.fetchGit { + url = "https://gitlab.com/dmurciaatfluid/purity"; + rev = "e0b5cf459a16eb92d86ca6c024edbedd52d72589"; + ref = "refs/tags/v1.38.0"; + }; +in + import "${src}/build" { + inherit src nixpkgs python_version; + } diff --git a/src/args/compute-on-aws-batch/batch-client/build/deps/override_utils.nix b/src/args/compute-on-aws-batch/batch-client/build/deps/override_utils.nix new file mode 100644 index 00000000..901a5827 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/build/deps/override_utils.nix @@ -0,0 +1,46 @@ +let + recursive_python_pkg_override = is_pkg: override: let + # is_pkg: Derivation -> Bool + # override: Derivation -> Derivation + self = recursive_python_pkg_override is_pkg override; + in + pkg: + if is_pkg pkg + then override pkg + else if pkg ? overridePythonAttrs && pkg ? pname + then + pkg.overridePythonAttrs ( + builtins.mapAttrs (_: value: + if builtins.isList value + then map self value + else self value) + ) + else pkg; + + # no_check_override: Derivation -> Derivation + no_check_override = recursive_python_pkg_override (pkg: pkg ? overridePythonAttrs && pkg ? pname) ( + pkg: + pkg.overridePythonAttrs ( + old: + ( + builtins.mapAttrs (_: value: + if builtins.isList value + then map no_check_override value + else no_check_override value) + old + ) + // { + doCheck = false; + } + ) + ); + + # replace_pkg: List[str] -> Derivation -> Derivation + replace_pkg = names: new_pkg: + recursive_python_pkg_override ( + x: x ? overridePythonAttrs && x ? pname && builtins.elem x.pname names + ) (_: new_pkg); +in { + inherit recursive_python_pkg_override no_check_override replace_pkg; + compose = functions: val: builtins.foldl' (x: f: f x) val functions; +} diff --git a/src/args/compute-on-aws-batch/batch-client/entrypoint.nix b/src/args/compute-on-aws-batch/batch-client/entrypoint.nix new file mode 100644 index 00000000..b20f2c32 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/entrypoint.nix @@ -0,0 +1,26 @@ +{ + makePythonPyprojectPackage, + nixpkgs, +}: let + nix-filter = let + src = builtins.fetchGit { + url = "https://github.com/numtide/nix-filter"; + rev = "fc282c5478e4141842f9644c239a41cfe9586732"; + }; + in + import src; + python_version = "python311"; + out = import ./build { + inherit makePythonPyprojectPackage nixpkgs python_version; + src = nix-filter { + root = ./.; + include = [ + "batch_client" + "tests" + "pyproject.toml" + "mypy.ini" + ]; + }; + }; +in + out diff --git a/src/args/compute-on-aws-batch/batch-client/mypy.ini b/src/args/compute-on-aws-batch/batch-client/mypy.ini new file mode 100644 index 00000000..9cc773d7 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/mypy.ini @@ -0,0 +1,30 @@ +[mypy] +disallow_any_decorated = True +disallow_any_expr = True +disallow_any_explicit = True +disallow_any_generics = True +disallow_any_unimported = True +disallow_incomplete_defs = True +disallow_subclassing_any = True +disallow_untyped_calls = True +disallow_untyped_decorators = True +disallow_untyped_defs = True + +check_untyped_defs = True +no_implicit_optional = True +strict = True +strict_equality = True +strict_optional = True + +warn_redundant_casts = True +warn_unused_ignores = True +warn_no_return = True +warn_return_any = True +warn_unreachable = True + +ignore_errors = False +ignore_missing_imports = False +allow_untyped_globals = False +allow_redefinition = False +local_partial_types = False +implicit_reexport = False diff --git a/src/args/compute-on-aws-batch/batch-client/pyproject.toml b/src/args/compute-on-aws-batch/batch-client/pyproject.toml new file mode 100644 index 00000000..0d40f6ce --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "batch_client" +authors = [ + {name = "Product Team", email = "development@fluidattacks.com"}, +] +requires-python = ">=3.11" +dependencies = [ + "click >=8.1.3, <9.0.0", + "fa_purity >=1.38.0, <2.0.0", + "pathos >=0.2.8, <1.0.0", + "mypy-boto3-batch >=1.28.36, <2.0.0", + "types-boto3 >=1.0.2, <2.0.0", +] +description = "AWS batch python client" +dynamic = ["version"] + +[project.scripts] +batch-client = 'batch_client._cli:main' + +[build-system] +requires = ["flit_core >=3.2,<4"] +build-backend = "flit_core.buildapi" diff --git a/src/args/compute-on-aws-batch/batch-client/tests/__init__.py b/src/args/compute-on-aws-batch/batch-client/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/args/compute-on-aws-batch/batch-client/tests/arch/__init__.py b/src/args/compute-on-aws-batch/batch-client/tests/arch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/args/compute-on-aws-batch/batch-client/tests/arch/arch.py b/src/args/compute-on-aws-batch/batch-client/tests/arch/arch.py new file mode 100644 index 00000000..11ac3f51 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/tests/arch/arch.py @@ -0,0 +1,49 @@ +from arch_lint.dag import ( + DagMap, +) +from arch_lint.graph import ( + FullPathModule, +) +from typing import ( + Dict, + FrozenSet, + NoReturn, + Tuple, + TypeVar, + Union, +) + +_T = TypeVar("_T") + + +def raise_or_return(item: _T | Exception) -> _T | NoReturn: + if isinstance(item, Exception): + raise item + return item + + +def _module(path: str) -> FullPathModule | NoReturn: + return raise_or_return(FullPathModule.from_raw(path)) + + +_dag: Dict[str, Tuple[Union[Tuple[str, ...], str], ...]] = { + "batch_client": ( + "_cli", + ("decode", "actions", "api"), + "core", + ("utils", "_logger"), + ), + "batch_client.api": ("_client_1", "_core"), + "batch_client.api._client_1": (("_list_jobs", "_send_job")), +} + + +def project_dag() -> DagMap: + return raise_or_return(DagMap.new(_dag)) + + +def forbidden_allowlist() -> Dict[FullPathModule, FrozenSet[FullPathModule]]: + _raw: Dict[str, FrozenSet[str]] = {} + return { + _module(k): frozenset(_module(i) for i in v) for k, v in _raw.items() + } diff --git a/src/args/compute-on-aws-batch/batch-client/tests/arch/test_arch.py b/src/args/compute-on-aws-batch/batch-client/tests/arch/test_arch.py new file mode 100644 index 00000000..d526b204 --- /dev/null +++ b/src/args/compute-on-aws-batch/batch-client/tests/arch/test_arch.py @@ -0,0 +1,42 @@ +from .arch import ( + forbidden_allowlist, + project_dag, +) +from arch_lint.dag.check import ( + dag_map_completeness, +) +from arch_lint.forbidden import ( + check_forbidden, +) +from arch_lint.graph import ( + ImportGraph, +) +from arch_lint.private import ( + check_private, +) + +root = "batch_client" + + +def test_dag_creation() -> None: + project_dag() + + +def test_dag_completeness() -> None: + graph = ImportGraph.build_graph(root, True) + dag_map_completeness(project_dag(), graph, next(iter(graph.roots))) + + +def test_forbidden_creation() -> None: + forbidden_allowlist() + + +def test_forbidden() -> None: + graph = ImportGraph.build_graph(root, True) + allowlist_map = forbidden_allowlist() + check_forbidden(allowlist_map, graph) + + +def test_private() -> None: + graph = ImportGraph.build_graph(root, False) + check_private(graph, next(iter(graph.roots))) diff --git a/src/args/compute-on-aws-batch/batch-client/tests/py.typed b/src/args/compute-on-aws-batch/batch-client/tests/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/src/args/compute-on-aws-batch/default.nix b/src/args/compute-on-aws-batch/default.nix index b13ce18b..959be866 100644 --- a/src/args/compute-on-aws-batch/default.nix +++ b/src/args/compute-on-aws-batch/default.nix @@ -1,5 +1,6 @@ { __nixpkgs__, + makePythonPyprojectPackage, makeScript, toFileJson, ... @@ -11,77 +12,82 @@ definition, environment, includePositionalArgsInName, + name, + nextJob, memory, parallel, propagateTags, queue, - name, setup, tags, vcpus, -}: -makeScript { - name = "compute-on-aws-batch-for-${name}"; - replace = { - __argAllowDuplicates__ = allowDuplicates; - __argAttempts__ = attempts; - __argAttemptDurationSeconds__ = attemptDurationSeconds; - __argCommand__ = toFileJson "command.json" command; - __argDefinition__ = definition; - __argIncludePositionalArgsInName__ = includePositionalArgsInName; - __argManifest__ = toFileJson "manifest.json" { - environment = builtins.concatLists [ - [ - { - name = "CI"; - value = "true"; - } - ] - [ - { - name = "MAKES_AWS_BATCH_COMPAT"; - value = "true"; - } - ] - (builtins.map - (name: { - inherit name; - value = "\${${name}}"; - }) - environment) - ]; - resourceRequirements = [ - { - type = "VCPU"; - value = toString vcpus; - } - { - type = "MEMORY"; - value = toString memory; - } - ]; - }; - __argName__ = name; - __argParallel__ = parallel; - __argPropagate__ = propagateTags; - __argQueue__ = queue; - __argTags__ = let - tag_names = builtins.attrNames tags; - encode_tag = key: "${key}=${tags."${key}"}"; - encoded = map encode_tag tag_names; - encoded_tags = builtins.concatStringsSep "," encoded; - in - encoded_tags; +} @ self: let + batch-client = import ./batch-client/entrypoint.nix { + inherit makePythonPyprojectPackage; + nixpkgs = __nixpkgs__; + }; + + ci_env_var = { + name = "CI"; + value = "true"; }; - searchPaths = { - bin = [ - __nixpkgs__.awscli - __nixpkgs__.gnugrep - __nixpkgs__.envsubst - __nixpkgs__.gnused - __nixpkgs__.jq + compat_env_var = { + name = "MAKES_AWS_BATCH_COMPAT"; + value = "true"; + }; + encode_envs = envs: + builtins.concatLists [ + [ci_env_var] + [compat_env_var] + (builtins.map (name: {inherit name;}) envs) + # An env var that does not have a value represents a reference to it, + # which will then be recovered during execution ]; - source = setup; + + # This should match the declared defaults on module options + apply_defaults = { + allowDuplicates ? false, + attempts ? 1, + attemptDurationSeconds, + command, + definition, + environment ? [], + includePositionalArgsInName ? true, + name, + nextJob ? {}, + memory, + parallel ? 1, + propagateTags ? true, + queue, + tags ? {}, + vcpus, + } @ result: { + inherit allowDuplicates attempts attemptDurationSeconds command definition environment; + inherit includePositionalArgsInName name nextJob memory parallel propagateTags queue tags vcpus; + }; + encode_draft = _draft: let + draft = apply_defaults (removeAttrs _draft ["setup"]); + in { + inherit (draft) allowDuplicates attempts attemptDurationSeconds command; + inherit (draft) definition includePositionalArgsInName memory parallel; + inherit (draft) propagateTags queue name tags vcpus; + environment = encode_envs draft.environment; + nextJob = + if draft.nextJob == {} + then {} + else encode_draft draft.nextJob; }; - entrypoint = ./entrypoint.sh; -} +in + makeScript { + name = "compute-on-aws-batch-for-${name}"; + replace = { + __argJobs__ = toFileJson "jobs.json" (encode_draft self); + }; + searchPaths = { + bin = [ + batch-client.env.runtime + ]; + source = setup; + }; + entrypoint = ./entrypoint.sh; + } diff --git a/src/args/compute-on-aws-batch/entrypoint.sh b/src/args/compute-on-aws-batch/entrypoint.sh index f981707c..a132d347 100644 --- a/src/args/compute-on-aws-batch/entrypoint.sh +++ b/src/args/compute-on-aws-batch/entrypoint.sh @@ -1,89 +1,3 @@ # shellcheck shell=bash -function subst_env_vars { - envsubst -no-empty -no-unset < "${1}" -} - -function normalize_job_name { - echo "${1}" | sed -E 's|[^a-zA-Z0-9_-]|-|g' | grep -oP '^.{0,128}' -} - -function main { - local attempts="__argAttempts__" - local attempt_duration_seconds="__argAttemptDurationSeconds__" - local container_overrides - local command="__argCommand__" - local definition="__argDefinition__" - local manifest="__argManifest__" - local name="__argName__" - local queue="__argQueue__" - local parallel="__argParallel__" - local propagate="__argPropagate__" - local tags="__argTags__" - local submit_job_args - - : \ - && if test -z "${queue}"; then - require_env_var MAKES_COMPUTE_ON_AWS_BATCH_QUEUE \ - && queue="${MAKES_COMPUTE_ON_AWS_BATCH_QUEUE}" - fi \ - && if test -n "__argIncludePositionalArgsInName__"; then - for arg in "${@}"; do - name="${name}-${arg}" - done - fi \ - && name="$(normalize_job_name "${name}")" \ - && if test -z "__argAllowDuplicates__"; then - info Checking if job "${name}" is already in the queue to avoid duplicates \ - && is_already_in_queue=$( - aws batch list-jobs \ - --job-queue "${queue}" \ - --job-status RUNNABLE \ - --query 'jobSummaryList[*].jobName' \ - | jq -r --arg name "${name}" '. | contains([$name])' - ) \ - && if test "${is_already_in_queue}" = true; then - info Job "${name}" is already in queue, we skipped sending it \ - && return 0 - fi - fi \ - && info Sending job \ - && command="$(subst_env_vars "${command}")" \ - && command="$( - jq -enr \ - --argjson command "${command}" \ - --args \ - '($command + $ARGS.positional)' \ - -- \ - "${@}" - )" \ - && manifest="$(subst_env_vars "${manifest}")" \ - && container_overrides="$( - jq -enr \ - --argjson manifest "${manifest}" \ - --argjson command "${command}" \ - --args \ - '$manifest * { - command: $command - }' - )" \ - && submit_job_args=( - --container-overrides "${container_overrides}" - --job-name "${name}" - --job-queue "${queue}" - --job-definition "${definition}" - --retry-strategy "attempts=${attempts}" - --timeout "attemptDurationSeconds=${attempt_duration_seconds}" - --tags "${tags}" - ) \ - && if test -n "${propagate}"; then - submit_job_args+=(--propagate-tags) - fi \ - && if [ "${parallel}" -gt "1" ]; then - submit_job_args+=(--array-properties "size=${parallel}") - fi \ - && aws batch submit-job "${submit_job_args[@]}" \ - && info Job "${name}" has been successfully sent -} - -main "${@}" +batch-client submit-job --pipeline "__argJobs__" "${@}" diff --git a/src/evaluator/modules/compute-on-aws-batch/default.nix b/src/evaluator/modules/compute-on-aws-batch/default.nix index 8fc124dd..b928234c 100644 --- a/src/evaluator/modules/compute-on-aws-batch/default.nix +++ b/src/evaluator/modules/compute-on-aws-batch/default.nix @@ -25,6 +25,7 @@ inherit (config) setup; inherit (config) tags; inherit (config) vcpus; + inherit (config) nextJob; }; }; in { @@ -58,6 +59,10 @@ in { default = true; type = lib.types.bool; }; + nextJob = lib.mkOption { + default = {}; + type = lib.types.attrs; + }; memory = lib.mkOption { type = lib.types.ints.positive; };