Skip to content

Commit

Permalink
Add deferrable BatchOperator apache#29300
Browse files Browse the repository at this point in the history
  • Loading branch information
tgo-netizen committed Apr 11, 2023
1 parent d23a3bb commit 3c1bc9a
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions airflow/providers/amazon/aws/operators/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
if TYPE_CHECKING:
from airflow.utils.context import Context

import boto3
import botocore.waiter

class BatchOperator(BaseOperator):
"""
Expand Down Expand Up @@ -363,3 +365,50 @@ def execute(self, context: Context):
self.hook.client.create_compute_environment(**trim_none_values(kwargs))

self.log.info("AWS Batch compute environment created successfully")


def submit_batch_job(job_name, job_definition, job_queue, command):
client = boto3.client("batch")
response = client.submit_job(
jobName=job_name,
jobQueue=job_queue,
jobDefinition=job_definition,
containerOverrides={"command": command},
)
job_id = response["jobId"]
return job_id

class JobStatusWaiter(botocore.waiter.Waiter):
def __init__(self, client, job_id, desired_status):
super().__init__(
client=client,
waiter_name="JobStatusWaiter",
delay=10,
max_attempts=60,
operation_name="DescribeJobs",
acceptors=[
{
"expected": desired_status,
"matcher": "path",
"state": "success",
"argument": "jobs[].status",
},
{
"expected": "FAILED",
"matcher": "path",
"state": "failure",
"argument": "jobs[].status",
},
{
"expected": "SUCCEEDED",
"matcher": "path",
"state": "failure",
"argument": "jobs[].status",
},
],
)
self.job_id = job_id

def wait(self):
self._wait(JobIds=[self.job_id])

0 comments on commit 3c1bc9a

Please sign in to comment.