From 002a38ca14382415d0c7f2be16515982f9497a8c Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Wed, 28 Jun 2023 14:53:42 +0300 Subject: [PATCH 01/11] Fixing issue - Fix payload parameter of amazon LambdaCreateFunctionOperator --- airflow/providers/amazon/aws/hooks/lambda_function.py | 2 +- airflow/providers/amazon/aws/operators/lambda_function.py | 7 ++++++- tests/providers/amazon/aws/hooks/test_lambda_function.py | 4 ++-- tests/system/providers/amazon/aws/example_lambda.py | 2 +- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 2d61f0751f7c..c96cf077f59a 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -48,7 +48,7 @@ def invoke_lambda( invocation_type: str | None = None, log_type: str | None = None, client_context: str | None = None, - payload: str | None = None, + payload: bytes | None = None, qualifier: str | None = None, ): """ diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index 93907634c12d..252fe3adb435 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -150,7 +150,7 @@ def __init__( qualifier: str | None = None, invocation_type: str | None = None, client_context: str | None = None, - payload: str | None = None, + payload: str | bytes | None = None, aws_conn_id: str = "aws_default", **kwargs, ): @@ -163,6 +163,11 @@ def __init__( self.client_context = client_context self.aws_conn_id = aws_conn_id + if type(payload) == 'str': + payload = payload.encode() + + self.payload = payload + @cached_property def hook(self) -> LambdaHook: return LambdaHook(aws_conn_id=self.aws_conn_id) diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index f21c000ea6d9..a967bd4474ef 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -50,11 +50,11 @@ def test_get_conn_returns_a_boto3_connection(self, hook): ) def test_invoke_lambda(self, mock_conn): hook = LambdaHook() - hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD) + hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD.encode()) mock_conn().invoke.assert_called_once_with( FunctionName=FUNCTION_NAME, - Payload=PAYLOAD, + Payload=PAYLOAD.encode(), ) @pytest.mark.parametrize( diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py index b4951799c8b0..ee07e44186dc 100644 --- a/tests/system/providers/amazon/aws/example_lambda.py +++ b/tests/system/providers/amazon/aws/example_lambda.py @@ -103,7 +103,7 @@ def delete_lambda(function_name: str): invoke_lambda_function = LambdaInvokeFunctionOperator( task_id="invoke_lambda_function", function_name=lambda_function_name, - payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}), + payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}).encode(), ) # [END howto_operator_invoke_lambda_function] From 3ab3ae3bc0751889f59492754ae7a8c8791242bf Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Thu, 29 Jun 2023 14:34:40 +0300 Subject: [PATCH 02/11] Remove redundant line --- airflow/providers/amazon/aws/operators/lambda_function.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index 252fe3adb435..7ab39bd9e047 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -156,7 +156,6 @@ def __init__( ): super().__init__(**kwargs) self.function_name = function_name - self.payload = payload self.log_type = log_type self.qualifier = qualifier self.invocation_type = invocation_type From 7e7d741c199c39a0991719c1dc9288f8afdf8886 Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Thu, 29 Jun 2023 15:28:15 +0300 Subject: [PATCH 03/11] Fix type checking --- airflow/providers/amazon/aws/operators/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index 7ab39bd9e047..df6bae06845b 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -162,7 +162,7 @@ def __init__( self.client_context = client_context self.aws_conn_id = aws_conn_id - if type(payload) == 'str': + if type(payload) == str: payload = payload.encode() self.payload = payload From 84f789b57278426daf101fc9e3cda04e88d0b255 Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Thu, 29 Jun 2023 18:17:51 +0300 Subject: [PATCH 04/11] Fix breaking change --- airflow/providers/amazon/aws/hooks/lambda_function.py | 5 ++++- airflow/providers/amazon/aws/operators/lambda_function.py | 6 +----- tests/providers/amazon/aws/hooks/test_lambda_function.py | 4 ++-- tests/system/providers/amazon/aws/example_lambda.py | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index c96cf077f59a..97f9a1273243 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -48,7 +48,7 @@ def invoke_lambda( invocation_type: str | None = None, log_type: str | None = None, client_context: str | None = None, - payload: bytes | None = None, + payload: bytes | str | None = None, qualifier: str | None = None, ): """ @@ -65,6 +65,9 @@ def invoke_lambda( :param payload: The JSON that you want to provide to your Lambda function as input. :param qualifier: AWS Lambda Function Version or Alias Name """ + if type(payload) == str: + payload = payload.encode() + invoke_args = { "FunctionName": function_name, "InvocationType": invocation_type, diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index df6bae06845b..9351d7be6702 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -156,17 +156,13 @@ def __init__( ): super().__init__(**kwargs) self.function_name = function_name + self.payload = payload self.log_type = log_type self.qualifier = qualifier self.invocation_type = invocation_type self.client_context = client_context self.aws_conn_id = aws_conn_id - if type(payload) == str: - payload = payload.encode() - - self.payload = payload - @cached_property def hook(self) -> LambdaHook: return LambdaHook(aws_conn_id=self.aws_conn_id) diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index a967bd4474ef..f21c000ea6d9 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -50,11 +50,11 @@ def test_get_conn_returns_a_boto3_connection(self, hook): ) def test_invoke_lambda(self, mock_conn): hook = LambdaHook() - hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD.encode()) + hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD) mock_conn().invoke.assert_called_once_with( FunctionName=FUNCTION_NAME, - Payload=PAYLOAD.encode(), + Payload=PAYLOAD, ) @pytest.mark.parametrize( diff --git a/tests/system/providers/amazon/aws/example_lambda.py b/tests/system/providers/amazon/aws/example_lambda.py index ee07e44186dc..b4951799c8b0 100644 --- a/tests/system/providers/amazon/aws/example_lambda.py +++ b/tests/system/providers/amazon/aws/example_lambda.py @@ -103,7 +103,7 @@ def delete_lambda(function_name: str): invoke_lambda_function = LambdaInvokeFunctionOperator( task_id="invoke_lambda_function", function_name=lambda_function_name, - payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}).encode(), + payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}), ) # [END howto_operator_invoke_lambda_function] From 50994239d52306b93e451faea74cb921ec653e18 Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Thu, 29 Jun 2023 18:19:05 +0300 Subject: [PATCH 05/11] Reorder --- airflow/providers/amazon/aws/operators/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/operators/lambda_function.py b/airflow/providers/amazon/aws/operators/lambda_function.py index 9351d7be6702..28b631320422 100644 --- a/airflow/providers/amazon/aws/operators/lambda_function.py +++ b/airflow/providers/amazon/aws/operators/lambda_function.py @@ -150,7 +150,7 @@ def __init__( qualifier: str | None = None, invocation_type: str | None = None, client_context: str | None = None, - payload: str | bytes | None = None, + payload: bytes | str | None = None, aws_conn_id: str = "aws_default", **kwargs, ): From 2c7410ea5b89f5fd87aa982f9324c8bafd561e0e Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sat, 1 Jul 2023 14:09:11 +0300 Subject: [PATCH 06/11] Use isinstance() --- airflow/providers/amazon/aws/hooks/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 97f9a1273243..58ecac8bcccb 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -65,7 +65,7 @@ def invoke_lambda( :param payload: The JSON that you want to provide to your Lambda function as input. :param qualifier: AWS Lambda Function Version or Alias Name """ - if type(payload) == str: + if isinstance(payload, str): payload = payload.encode() invoke_args = { From 6af5709c1b57a6a24cab87d208ef85539484b75a Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sun, 2 Jul 2023 14:22:24 +0300 Subject: [PATCH 07/11] Add tests for binary lambda payload --- .../amazon/aws/hooks/test_lambda_function.py | 9 +++++---- .../aws/operators/test_lambda_function.py | 20 +++++++++++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index f21c000ea6d9..7584d4bc9957 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -25,7 +25,7 @@ from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook FUNCTION_NAME = "test_function" -PAYLOAD = '{"hello": "airflow"}' +PAYLOADS = ['{"hello": "airflow"}', b'{"hello": "airflow"}'] RUNTIME = "python3.9" ROLE = "role" HANDLER = "handler" @@ -48,13 +48,14 @@ def test_get_conn_returns_a_boto3_connection(self, hook): @mock.patch( "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock ) - def test_invoke_lambda(self, mock_conn): + @pytest.mark.parametrize("payload", PAYLOADS) + def test_invoke_lambda(self, mock_conn, payload): hook = LambdaHook() - hook.invoke_lambda(function_name=FUNCTION_NAME, payload=PAYLOAD) + hook.invoke_lambda(function_name=FUNCTION_NAME, payload=payload) mock_conn().invoke.assert_called_once_with( FunctionName=FUNCTION_NAME, - Payload=PAYLOAD, + Payload=payload, ) @pytest.mark.parametrize( diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py index 6f1d98e8ac8b..064be0dd9a8c 100644 --- a/tests/providers/amazon/aws/operators/test_lambda_function.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -70,29 +70,37 @@ def test_create_lambda_with_wait_for_completion(self, mock_hook_conn, mock_hook_ class TestLambdaInvokeFunctionOperator: - def test_init(self): + @pytest.mark.parametrize( + "payload", + ['{"TestInput": "Testdata"}', b'{"TestInput": "Testdata"}'], + ) + def test_init(self, payload): lambda_operator = LambdaInvokeFunctionOperator( task_id="test", function_name="test", - payload=json.dumps({"TestInput": "Testdata"}), + payload=payload, log_type="None", aws_conn_id="aws_conn_test", ) assert lambda_operator.task_id == "test" assert lambda_operator.function_name == "test" - assert lambda_operator.payload == json.dumps({"TestInput": "Testdata"}) + assert lambda_operator.payload == payload assert lambda_operator.log_type == "None" assert lambda_operator.aws_conn_id == "aws_conn_test" @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock) - def test_invoke_lambda(self, hook_mock): + @pytest.mark.parametrize( + "payload", + ["e", b"e"], + ) + def test_invoke_lambda(self, hook_mock, payload): operator = LambdaInvokeFunctionOperator( task_id="task_test", function_name="a", invocation_type="b", log_type="c", client_context="d", - payload="e", + payload=payload, qualifier="f", ) returned_payload = Mock() @@ -111,7 +119,7 @@ def test_invoke_lambda(self, hook_mock): invocation_type="b", log_type="c", client_context="d", - payload="e", + payload=payload, qualifier="f", ) From 2be5f7f75f1775d6444cd331e7122f93062dfa1f Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sun, 2 Jul 2023 14:57:52 +0300 Subject: [PATCH 08/11] Fix tests --- tests/providers/amazon/aws/hooks/test_lambda_function.py | 3 ++- tests/providers/amazon/aws/operators/test_lambda_function.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index 7584d4bc9957..56068e14e023 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -53,9 +53,10 @@ def test_invoke_lambda(self, mock_conn, payload): hook = LambdaHook() hook.invoke_lambda(function_name=FUNCTION_NAME, payload=payload) + invoke_payload = payload.encode() if isinstance(payload, str) else payload mock_conn().invoke.assert_called_once_with( FunctionName=FUNCTION_NAME, - Payload=payload, + Payload=invoke_payload, ) @pytest.mark.parametrize( diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py index 064be0dd9a8c..26ea0f27983b 100644 --- a/tests/providers/amazon/aws/operators/test_lambda_function.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -114,12 +114,14 @@ def test_invoke_lambda(self, hook_mock, payload): value = operator.execute(None) assert value == "data was read" + + invoke_payload = payload.encode() if isinstance(payload, str) else payload hook_mock().invoke_lambda.assert_called_once_with( function_name="a", invocation_type="b", log_type="c", client_context="d", - payload=payload, + payload=invoke_payload, qualifier="f", ) From 9a30e61cd98ef14e71e8053ac8257048dd94b186 Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sun, 2 Jul 2023 15:10:23 +0300 Subject: [PATCH 09/11] Fix tests --- .../amazon/aws/hooks/test_lambda_function.py | 11 +++++++---- .../amazon/aws/operators/test_lambda_function.py | 7 +++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index 56068e14e023..caaf164be47b 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -25,7 +25,8 @@ from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook FUNCTION_NAME = "test_function" -PAYLOADS = ['{"hello": "airflow"}', b'{"hello": "airflow"}'] +PAYLOAD = '{"hello": "airflow"}' +BYTES_PAYLOAD = b'{"hello": "airflow"}' RUNTIME = "python3.9" ROLE = "role" HANDLER = "handler" @@ -48,12 +49,14 @@ def test_get_conn_returns_a_boto3_connection(self, hook): @mock.patch( "airflow.providers.amazon.aws.hooks.lambda_function.LambdaHook.conn", new_callable=mock.PropertyMock ) - @pytest.mark.parametrize("payload", PAYLOADS) - def test_invoke_lambda(self, mock_conn, payload): + @pytest.mark.parametrize( + "payload, invoke_payload", + [(PAYLOAD, BYTES_PAYLOAD), (BYTES_PAYLOAD, BYTES_PAYLOAD)], + ) + def test_invoke_lambda(self, mock_conn, payload, invoke_payload): hook = LambdaHook() hook.invoke_lambda(function_name=FUNCTION_NAME, payload=payload) - invoke_payload = payload.encode() if isinstance(payload, str) else payload mock_conn().invoke.assert_called_once_with( FunctionName=FUNCTION_NAME, Payload=invoke_payload, diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py index 26ea0f27983b..6059c6969640 100644 --- a/tests/providers/amazon/aws/operators/test_lambda_function.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -90,10 +90,10 @@ def test_init(self, payload): @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock) @pytest.mark.parametrize( - "payload", - ["e", b"e"], + "payload, invoke_payload", + [("e", b"e"), (b"e", b"e")], ) - def test_invoke_lambda(self, hook_mock, payload): + def test_invoke_lambda(self, hook_mock, payload, invoke_payload): operator = LambdaInvokeFunctionOperator( task_id="task_test", function_name="a", @@ -115,7 +115,6 @@ def test_invoke_lambda(self, hook_mock, payload): assert value == "data was read" - invoke_payload = payload.encode() if isinstance(payload, str) else payload hook_mock().invoke_lambda.assert_called_once_with( function_name="a", invocation_type="b", From 960b40563980836b5b6782b15186e6002a51a0e3 Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sun, 2 Jul 2023 15:22:26 +0300 Subject: [PATCH 10/11] Standardize tests --- .../providers/amazon/aws/operators/test_lambda_function.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py index 6059c6969640..191c120d4716 100644 --- a/tests/providers/amazon/aws/operators/test_lambda_function.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -30,6 +30,8 @@ ) FUNCTION_NAME = "function_name" +PAYLOAD = '{"hello": "airflow"}' +BYTES_PAYLOAD = b'{"hello": "airflow"}' ROLE_ARN = "role_arn" IMAGE_URI = "image_uri" @@ -72,7 +74,7 @@ def test_create_lambda_with_wait_for_completion(self, mock_hook_conn, mock_hook_ class TestLambdaInvokeFunctionOperator: @pytest.mark.parametrize( "payload", - ['{"TestInput": "Testdata"}', b'{"TestInput": "Testdata"}'], + [PAYLOAD, BYTES_PAYLOAD], ) def test_init(self, payload): lambda_operator = LambdaInvokeFunctionOperator( @@ -91,7 +93,7 @@ def test_init(self, payload): @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock) @pytest.mark.parametrize( "payload, invoke_payload", - [("e", b"e"), (b"e", b"e")], + [(PAYLOAD, BYTES_PAYLOAD), (BYTES_PAYLOAD, BYTES_PAYLOAD)], ) def test_invoke_lambda(self, hook_mock, payload, invoke_payload): operator = LambdaInvokeFunctionOperator( @@ -114,7 +116,6 @@ def test_invoke_lambda(self, hook_mock, payload, invoke_payload): value = operator.execute(None) assert value == "data was read" - hook_mock().invoke_lambda.assert_called_once_with( function_name="a", invocation_type="b", From f395a0922bc30c2ff78fd7e7dd1a0ca9d402899a Mon Sep 17 00:00:00 2001 From: Elad Galili Date: Sun, 2 Jul 2023 19:23:47 +0300 Subject: [PATCH 11/11] Fix tests --- .../amazon/aws/operators/test_lambda_function.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_lambda_function.py b/tests/providers/amazon/aws/operators/test_lambda_function.py index 191c120d4716..f0b4b834eb00 100644 --- a/tests/providers/amazon/aws/operators/test_lambda_function.py +++ b/tests/providers/amazon/aws/operators/test_lambda_function.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import json from unittest import mock from unittest.mock import Mock, patch @@ -92,10 +91,10 @@ def test_init(self, payload): @patch.object(LambdaInvokeFunctionOperator, "hook", new_callable=mock.PropertyMock) @pytest.mark.parametrize( - "payload, invoke_payload", - [(PAYLOAD, BYTES_PAYLOAD), (BYTES_PAYLOAD, BYTES_PAYLOAD)], + "payload", + [PAYLOAD, BYTES_PAYLOAD], ) - def test_invoke_lambda(self, hook_mock, payload, invoke_payload): + def test_invoke_lambda(self, hook_mock, payload): operator = LambdaInvokeFunctionOperator( task_id="task_test", function_name="a", @@ -121,7 +120,7 @@ def test_invoke_lambda(self, hook_mock, payload, invoke_payload): invocation_type="b", log_type="c", client_context="d", - payload=invoke_payload, + payload=payload, qualifier="f", )