From 06e3dfd3651f75e8179c1334afced3bc887b81c2 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Mon, 23 Sep 2024 15:51:51 +0800 Subject: [PATCH] fix(providers/amazon): handle ClientError raised after key is missing during table.get_item --- .../providers/amazon/aws/sensors/dynamodb.py | 30 ++++++++++++++----- .../amazon/aws/sensors/test_dynamodb.py | 17 +++++++++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/airflow/providers/amazon/aws/sensors/dynamodb.py b/airflow/providers/amazon/aws/sensors/dynamodb.py index dbb7f973041e..ead8c123a621 100644 --- a/airflow/providers/amazon/aws/sensors/dynamodb.py +++ b/airflow/providers/amazon/aws/sensors/dynamodb.py @@ -18,6 +18,8 @@ from typing import TYPE_CHECKING, Any, Iterable, Sequence +from botocore.exceptions import ClientError + from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor from airflow.providers.amazon.aws.utils.mixins import aws_template_fields @@ -102,14 +104,26 @@ def poke(self, context: Context) -> bool: table = self.hook.conn.Table(self.table_name) self.log.info("Table: %s", table) self.log.info("Key: %s", key) - response = table.get_item(Key=key) + try: - item_attribute_value = response["Item"][self.attribute_name] - self.log.info("Response: %s", response) - self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value) - self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value) - return item_attribute_value in ( - [self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value + response = table.get_item(Key=key) + except ClientError as err: + self.log.error( + "Couldn't get %s from table %s.\nError Code: %s\nError Message: %s", + key, + self.table_name, + err.response["Error"]["Code"], + err.response["Error"]["Message"], ) - except KeyError: return False + else: + try: + item_attribute_value = response["Item"][self.attribute_name] + self.log.info("Response: %s", response) + self.log.info("Want: %s = %s", self.attribute_name, self.attribute_value) + self.log.info("Got: {response['Item'][self.attribute_name]} = %s", item_attribute_value) + return item_attribute_value in ( + [self.attribute_value] if isinstance(self.attribute_value, str) else self.attribute_value + ) + except KeyError: + return False diff --git a/tests/providers/amazon/aws/sensors/test_dynamodb.py b/tests/providers/amazon/aws/sensors/test_dynamodb.py index d8b31b48c5e3..93ca01d26275 100644 --- a/tests/providers/amazon/aws/sensors/test_dynamodb.py +++ b/tests/providers/amazon/aws/sensors/test_dynamodb.py @@ -104,6 +104,23 @@ def test_sensor_with_pk_and_sk(self): assert self.sensor_pk_sk.poke(None) + @mock_aws + def test_sensor_with_client_error(self): + hook = DynamoDBHook(table_name=self.table_name, table_keys=[self.pk_name]) + + hook.conn.create_table( + TableName=self.table_name, + KeySchema=[{"AttributeName": self.pk_name, "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": self.pk_name, "AttributeType": "S"}], + ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 10}, + ) + + items = [{self.pk_name: self.pk_value, self.attribute_name: self.attribute_value}] + hook.write_batch_data(items) + + self.sensor_pk.partition_key_name = "no such key" + assert self.sensor_pk.poke(None) is False + class TestDynamoDBMultipleValuesSensor: def setup_method(self):