Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DataprocSubmitJobOperator to retrieve failed job error message #36053

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2152,7 +2152,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param asynchronous: Flag to return after submitting the job to the Dataproc API.
This is useful for submitting long running jobs and
This is useful for submitting long-running jobs and
waiting on them asynchronously using the DataprocJobSensor
:param deferrable: Run operator in the deferrable mode
:param polling_interval_seconds: time in seconds between polling for job completion.
Expand Down Expand Up @@ -2267,10 +2267,11 @@ def execute_complete(self, context, event=None) -> None:
"""
job_state = event["job_state"]
job_id = event["job_id"]
job = event["job"]
if job_state == JobStatus.State.ERROR:
raise AirflowException(f"Job failed:\n{job_id}")
raise AirflowException(f"Job {job_id} failed:\n{job}")
if job_state == JobStatus.State.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job_id}")
raise AirflowException(f"Job {job_id} was cancelled:\n{job}")
self.log.info("%s completed successfully.", self.task_id)
return job_id

Expand Down
7 changes: 2 additions & 5 deletions airflow/providers/google/cloud/triggers/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from google.api_core.exceptions import NotFound
from google.cloud.dataproc_v1 import Batch, ClusterStatus, JobStatus

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
from airflow.triggers.base import BaseTrigger, TriggerEvent

Expand Down Expand Up @@ -98,12 +97,10 @@ async def run(self):
)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id, state)
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, JobStatus.State.ERROR):
break
elif state == JobStatus.State.ERROR:
raise AirflowException(f"Dataproc job execution failed {self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
yield TriggerEvent({"job_id": self.job_id, "job_state": state})
yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": job})


class DataprocClusterTrigger(DataprocBaseTrigger):
Expand Down