Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more info to quicksight errors #30466

Merged
merged 3 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions airflow/providers/amazon/aws/hooks/quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,17 @@ def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id: str) -
AwsAccountId=aws_account_id, DataSetId=data_set_id, IngestionId=ingestion_id
)
return describe_ingestion_response["Ingestion"]["IngestionStatus"]
except KeyError:
raise AirflowException("Could not get status of the Amazon QuickSight Ingestion")
except ClientError:
raise AirflowException("AWS request failed, check logs for more info")
except KeyError as e:
raise AirflowException(f"Could not get status of the Amazon QuickSight Ingestion: {e}")
except ClientError as e:
raise AirflowException(f"AWS request failed: {e}")

def get_error_info(self, aws_account_id: str, data_set_id: str, ingestion_id: str) -> dict | None:
"""If the ingestion failed, returns the error info. Else, returns None."""
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved
describe_ingestion_response = self.get_conn().describe_ingestion(
AwsAccountId=aws_account_id, DataSetId=data_set_id, IngestionId=ingestion_id
)
return describe_ingestion_response["Ingestion"].get("ErrorInfo", None)
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved

def wait_for_state(
self,
Expand All @@ -141,12 +148,14 @@ def wait_for_state(
status = self.get_status(aws_account_id, data_set_id, ingestion_id)
while status in self.NON_TERMINAL_STATES and status != target_state:
self.log.info("Current status is %s", status)
time.sleep(check_interval)
sec += check_interval
if status in self.FAILED_STATES:
raise AirflowException("The Amazon QuickSight Ingestion failed!")
info = self.get_error_info(aws_account_id, data_set_id, ingestion_id)
raise AirflowException(f"The Amazon QuickSight Ingestion failed. Error info: {info}")
if status == "CANCELLED":
raise AirflowException("The Amazon QuickSight SPICE ingestion cancelled!")
# wait and try again
time.sleep(check_interval)
sec += check_interval
vandonr-amz marked this conversation as resolved.
Show resolved Hide resolved
status = self.get_status(aws_account_id, data_set_id, ingestion_id)

self.log.info("QuickSight Ingestion completed")
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/amazon/aws/sensors/quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def poke(self, context: Context) -> bool:
)
self.log.info("QuickSight Status: %s", quicksight_ingestion_state)
if quicksight_ingestion_state in self.errored_statuses:
raise AirflowException("The QuickSight Ingestion failed!")
error = self.quicksight_hook.get_error_info(aws_account_id, self.data_set_id, self.ingestion_id)
raise AirflowException(f"The QuickSight Ingestion failed. Error info: {error}")
return quicksight_ingestion_state == self.success_status

@cached_property
Expand Down
6 changes: 4 additions & 2 deletions tests/providers/amazon/aws/sensors/test_quicksight.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ def test_poke_success(self, mock_get_status):

@mock_sts
@mock.patch.object(QuickSightHook, "get_status")
def test_poke_cancelled(self, mock_get_status):
@mock.patch.object(QuickSightHook, "get_error_info")
def test_poke_cancelled(self, mock_get_status, _):
mock_get_status.return_value = "CANCELLED"
with pytest.raises(AirflowException):
self.sensor.poke({})
mock_get_status.assert_called_once_with(DEFAULT_ACCOUNT_ID, DATA_SET_ID, INGESTION_ID)

@mock_sts
@mock.patch.object(QuickSightHook, "get_status")
def test_poke_failed(self, mock_get_status):
@mock.patch.object(QuickSightHook, "get_error_info")
def test_poke_failed(self, mock_get_status, _):
mock_get_status.return_value = "FAILED"
with pytest.raises(AirflowException):
self.sensor.poke({})
Expand Down