From 9cf0e053be18066eb96a1fe370ab554581b06418 Mon Sep 17 00:00:00 2001 From: Ulada Zakharava Date: Mon, 4 Dec 2023 14:30:16 +0000 Subject: [PATCH] Fix DataprocSubmitJobOperator to retrieve failed job error message --- airflow/providers/google/cloud/operators/dataproc.py | 7 ++++--- airflow/providers/google/cloud/triggers/dataproc.py | 7 ++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index b489a79dc8ee8..477ddbf7d0a72 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -2152,7 +2152,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator): Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param asynchronous: Flag to return after submitting the job to the Dataproc API. - This is useful for submitting long running jobs and + This is useful for submitting long-running jobs and waiting on them asynchronously using the DataprocJobSensor :param deferrable: Run operator in the deferrable mode :param polling_interval_seconds: time in seconds between polling for job completion. @@ -2267,10 +2267,11 @@ def execute_complete(self, context, event=None) -> None: """ job_state = event["job_state"] job_id = event["job_id"] + job = event["job"] if job_state == JobStatus.State.ERROR: - raise AirflowException(f"Job failed:\n{job_id}") + raise AirflowException(f"Job {job_id} failed:\n{job}") if job_state == JobStatus.State.CANCELLED: - raise AirflowException(f"Job was cancelled:\n{job_id}") + raise AirflowException(f"Job {job_id} was cancelled:\n{job}") self.log.info("%s completed successfully.", self.task_id) return job_id diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py index 7d7215ebc25e5..e03f7a14ca689 100644 --- a/airflow/providers/google/cloud/triggers/dataproc.py +++ b/airflow/providers/google/cloud/triggers/dataproc.py @@ -25,7 +25,6 @@ from google.api_core.exceptions import NotFound from google.cloud.dataproc_v1 import Batch, ClusterStatus, JobStatus -from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook from airflow.triggers.base import BaseTrigger, TriggerEvent @@ -98,12 +97,10 @@ async def run(self): ) state = job.status.state self.log.info("Dataproc job: %s is in state: %s", self.job_id, state) - if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED): + if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR): break - elif state == JobStatus.State.ERROR: - raise AirflowException(f"Dataproc job execution failed {self.job_id}") await asyncio.sleep(self.polling_interval_seconds) - yield TriggerEvent({"job_id": self.job_id, "job_state": state}) + yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job}) class DataprocClusterTrigger(DataprocBaseTrigger):