diff --git a/airflow/providers/amazon/aws/hooks/quicksight.py b/airflow/providers/amazon/aws/hooks/quicksight.py index 9fcb54be9f64..11ea728e5ef8 100644 --- a/airflow/providers/amazon/aws/hooks/quicksight.py +++ b/airflow/providers/amazon/aws/hooks/quicksight.py @@ -113,10 +113,26 @@ def get_status(self, aws_account_id: str, data_set_id: str, ingestion_id: str) - AwsAccountId=aws_account_id, DataSetId=data_set_id, IngestionId=ingestion_id ) return describe_ingestion_response["Ingestion"]["IngestionStatus"] - except KeyError: - raise AirflowException("Could not get status of the Amazon QuickSight Ingestion") - except ClientError: - raise AirflowException("AWS request failed, check logs for more info") + except KeyError as e: + raise AirflowException(f"Could not get status of the Amazon QuickSight Ingestion: {e}") + except ClientError as e: + raise AirflowException(f"AWS request failed: {e}") + + def get_error_info(self, aws_account_id: str, data_set_id: str, ingestion_id: str) -> dict | None: + """ + Gets info about the error if any. + + :param aws_account_id: An AWS Account ID + :param data_set_id: QuickSight Data Set ID + :param ingestion_id: QuickSight Ingestion ID + :return: Error info dict containing the error type (key 'Type') and message (key 'Message') + if available. Else, returns None. + """ + describe_ingestion_response = self.get_conn().describe_ingestion( + AwsAccountId=aws_account_id, DataSetId=data_set_id, IngestionId=ingestion_id + ) + # using .get() to get None if the key is not present, instead of an exception. + return describe_ingestion_response["Ingestion"].get("ErrorInfo") def wait_for_state( self, @@ -141,12 +157,14 @@ def wait_for_state( status = self.get_status(aws_account_id, data_set_id, ingestion_id) while status in self.NON_TERMINAL_STATES and status != target_state: self.log.info("Current status is %s", status) - time.sleep(check_interval) - sec += check_interval if status in self.FAILED_STATES: - raise AirflowException("The Amazon QuickSight Ingestion failed!") + info = self.get_error_info(aws_account_id, data_set_id, ingestion_id) + raise AirflowException(f"The Amazon QuickSight Ingestion failed. Error info: {info}") if status == "CANCELLED": raise AirflowException("The Amazon QuickSight SPICE ingestion cancelled!") + # wait and try again + time.sleep(check_interval) + sec += check_interval status = self.get_status(aws_account_id, data_set_id, ingestion_id) self.log.info("QuickSight Ingestion completed") diff --git a/airflow/providers/amazon/aws/sensors/quicksight.py b/airflow/providers/amazon/aws/sensors/quicksight.py index 7c71bb24e995..9145e886bf26 100644 --- a/airflow/providers/amazon/aws/sensors/quicksight.py +++ b/airflow/providers/amazon/aws/sensors/quicksight.py @@ -77,7 +77,8 @@ def poke(self, context: Context) -> bool: ) self.log.info("QuickSight Status: %s", quicksight_ingestion_state) if quicksight_ingestion_state in self.errored_statuses: - raise AirflowException("The QuickSight Ingestion failed!") + error = self.quicksight_hook.get_error_info(aws_account_id, self.data_set_id, self.ingestion_id) + raise AirflowException(f"The QuickSight Ingestion failed. Error info: {error}") return quicksight_ingestion_state == self.success_status @cached_property diff --git a/tests/providers/amazon/aws/sensors/test_quicksight.py b/tests/providers/amazon/aws/sensors/test_quicksight.py index 562a986c8897..e85c21e89654 100644 --- a/tests/providers/amazon/aws/sensors/test_quicksight.py +++ b/tests/providers/amazon/aws/sensors/test_quicksight.py @@ -49,7 +49,8 @@ def test_poke_success(self, mock_get_status): @mock_sts @mock.patch.object(QuickSightHook, "get_status") - def test_poke_cancelled(self, mock_get_status): + @mock.patch.object(QuickSightHook, "get_error_info") + def test_poke_cancelled(self, _, mock_get_status): mock_get_status.return_value = "CANCELLED" with pytest.raises(AirflowException): self.sensor.poke({}) @@ -57,7 +58,8 @@ def test_poke_cancelled(self, mock_get_status): @mock_sts @mock.patch.object(QuickSightHook, "get_status") - def test_poke_failed(self, mock_get_status): + @mock.patch.object(QuickSightHook, "get_error_info") + def test_poke_failed(self, _, mock_get_status): mock_get_status.return_value = "FAILED" with pytest.raises(AirflowException): self.sensor.poke({})