From 2ff617c3214ea129718833f506d156b104562afa Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 08:34:40 +0100 Subject: [PATCH 1/8] feat: Restrict `data` parameter typing Follows the hook's typing --- airflow/providers/http/operators/http.py | 2 +- airflow/providers/http/triggers/http.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 96415ed9771c9..2c672201fa1c2 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -101,7 +101,7 @@ def __init__( *, endpoint: str | None = None, method: str = "POST", - data: Any = None, + data: dict[str, Any] | str | None = None, headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None, response_check: Callable[..., bool] | None = None, diff --git a/airflow/providers/http/triggers/http.py b/airflow/providers/http/triggers/http.py index 89aa9ca606ce3..b4598984f3740 100644 --- a/airflow/providers/http/triggers/http.py +++ b/airflow/providers/http/triggers/http.py @@ -56,7 +56,7 @@ def __init__( method: str = "POST", endpoint: str | None = None, headers: dict[str, str] | None = None, - data: Any = None, + data: dict[str, Any] | str | None = None, extra_options: dict[str, Any] | None = None, ): super().__init__() From 4b3d1ae4248ae20daa56821474dc185f3a4ceac4 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 08:35:28 +0100 Subject: [PATCH 2/8] feat: Implement `data` override when string --- airflow/providers/http/operators/http.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 2c672201fa1c2..28d63dc56b4ac 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -20,7 +20,7 @@ import base64 import pickle import warnings -from typing import TYPE_CHECKING, Any, Callable, Sequence +from typing import TYPE_CHECKING, Any, Callable, Sequence, cast from requests import Response @@ -271,9 +271,16 @@ def _merge_next_page_parameters(self, next_page_params: dict) -> dict: :param next_page_params: A dictionary containing the parameters for the next page. :return: A dictionary containing the merged parameters. """ + + next_page_data_param: dict | str | None = next_page_params.get("data") + if isinstance(self.data, dict) and isinstance(next_page_data_param, dict): + data = merge_dicts(self.data, cast(dict, next_page_data_param)), + else: + data = next_page_data_param or self.data + return dict( endpoint=next_page_params.get("endpoint") or self.endpoint, - data=merge_dicts(self.data, next_page_params.get("data", {})), + data=data, headers=merge_dicts(self.headers, next_page_params.get("headers", {})), extra_options=merge_dicts(self.extra_options, next_page_params.get("extra_options", {})), ) From 31f325ad5aca929317292d2e6f3354497b36df7e Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 08:36:09 +0100 Subject: [PATCH 3/8] feat: Improve docstring about merging and overriding behavior --- airflow/providers/http/operators/http.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 28d63dc56b4ac..38b482d72d6ec 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -56,12 +56,17 @@ class HttpOperator(BaseOperator): :param pagination_function: A callable that generates the parameters used to call the API again, based on the previous response. Typically used when the API is paginated and returns for e.g a cursor, a 'next page id', or a 'next page URL'. When provided, the Operator will call the API - repeatedly until this callable returns None. Also, the result of the Operator will become by - default a list of Response.text objects (instead of a single response object). Same with the - other injected functions (like response_check, response_filter, ...) which will also receive a - list of Response object. This function receives a Response object form previous call, and should - return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), which will be merged - and will override the one used in the initial API call. + repeatedly until this callable returns None. The result of the Operator will become by default a + list of Response.text objects (instead of a single response object). Same with the other injected + functions (like response_check, response_filter, ...) which will also receive a list of Response + objects. This function receives a Response object form previous call, and should return a nested + dictionary with the following optional keys: `endpoint`, `data`, `headers` and `extra_options. + Those keys will be merged and/or override the parameters provided into the HttpOperator declaration. + Parameters are merged when they are both a dictionary (e.g.: HttpOperator.headers will be merged + with the `headers` dict provided by this function). When merging, dict items returned by this + function will override initial ones (e.g: if both HttpOperator.headers and `headers` has a 'cookie' + item, the one provided by `headers` is kept). Parameters are simply overridden when any of them are + string (e.g.: HttpOperator.endpoint is overridden by `endpoint`). :param response_check: A check against the 'requests' response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. From 018fdcc84d3f71113fd34b819c29410e79f8f003 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 18:23:14 +0100 Subject: [PATCH 4/8] fix: Add correct typing for mypy --- airflow/providers/http/operators/http.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 38b482d72d6ec..185a39332bd02 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -64,7 +64,7 @@ class HttpOperator(BaseOperator): Those keys will be merged and/or override the parameters provided into the HttpOperator declaration. Parameters are merged when they are both a dictionary (e.g.: HttpOperator.headers will be merged with the `headers` dict provided by this function). When merging, dict items returned by this - function will override initial ones (e.g: if both HttpOperator.headers and `headers` has a 'cookie' + function will override initial ones (e.g: if both HttpOperator.headers and `headers` have a 'cookie' item, the one provided by `headers` is kept). Parameters are simply overridden when any of them are string (e.g.: HttpOperator.endpoint is overridden by `endpoint`). :param response_check: A check against the 'requests' response object. @@ -276,10 +276,10 @@ def _merge_next_page_parameters(self, next_page_params: dict) -> dict: :param next_page_params: A dictionary containing the parameters for the next page. :return: A dictionary containing the merged parameters. """ - - next_page_data_param: dict | str | None = next_page_params.get("data") + data: str | dict | None = None # makes mypy happy + next_page_data_param = next_page_params.get("data") if isinstance(self.data, dict) and isinstance(next_page_data_param, dict): - data = merge_dicts(self.data, cast(dict, next_page_data_param)), + data = merge_dicts(self.data, next_page_data_param) else: data = next_page_data_param or self.data From b4fb9cf222fea4389c615e660b1c4bff95807ab5 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 18:23:21 +0100 Subject: [PATCH 5/8] feat: add test --- tests/providers/http/operators/test_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/providers/http/operators/test_http.py b/tests/providers/http/operators/test_http.py index 451cd93d44144..5e48bff1d820a 100644 --- a/tests/providers/http/operators/test_http.py +++ b/tests/providers/http/operators/test_http.py @@ -127,7 +127,7 @@ def pagination_function(response: Response) -> dict | None: iterations += 1 return dict( endpoint=response.json()["endpoint"], - data={}, + data="", headers={}, extra_options={}, ) From d904a7224435d79784bbf139af29d71354538433 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Tue, 21 Nov 2023 21:53:12 +0100 Subject: [PATCH 6/8] fix: remove unused imports --- airflow/providers/http/operators/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 185a39332bd02..38ab4a2e3279a 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -20,7 +20,7 @@ import base64 import pickle import warnings -from typing import TYPE_CHECKING, Any, Callable, Sequence, cast +from typing import TYPE_CHECKING, Any, Callable, Sequence from requests import Response From 39396efcc420d48deb6d9608b6c321fc0bd1f480 Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Wed, 22 Nov 2023 13:29:05 +0100 Subject: [PATCH 7/8] fix: Update SimpleHttpOperator docstring --- airflow/providers/http/operators/http.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/airflow/providers/http/operators/http.py b/airflow/providers/http/operators/http.py index 38ab4a2e3279a..524de8c5850e2 100644 --- a/airflow/providers/http/operators/http.py +++ b/airflow/providers/http/operators/http.py @@ -306,14 +306,20 @@ class SimpleHttpOperator(HttpOperator): :param data: The data to pass. POST-data in POST/PUT and params in the URL for a GET request. (templated) :param headers: The HTTP headers to be added to the GET request - :param pagination_function: A callable that generates the parameters used to call the API again. - Typically used when the API is paginated and returns for e.g a cursor, a 'next page id', or - a 'next page URL'. When provided, the Operator will call the API repeatedly until this callable - returns None. Also, the result of the Operator will become by default a list of Response.text - objects (instead of a single response object). Same with the other injected functions (like - response_check, response_filter, ...) which will also receive a list of Response object. This - function should return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), - which will be merged and override the one used in the initial API call. + :param pagination_function: A callable that generates the parameters used to call the API again, + based on the previous response. Typically used when the API is paginated and returns for e.g a + cursor, a 'next page id', or a 'next page URL'. When provided, the Operator will call the API + repeatedly until this callable returns None. The result of the Operator will become by default a + list of Response.text objects (instead of a single response object). Same with the other injected + functions (like response_check, response_filter, ...) which will also receive a list of Response + objects. This function receives a Response object form previous call, and should return a nested + dictionary with the following optional keys: `endpoint`, `data`, `headers` and `extra_options. + Those keys will be merged and/or override the parameters provided into the HttpOperator declaration. + Parameters are merged when they are both a dictionary (e.g.: HttpOperator.headers will be merged + with the `headers` dict provided by this function). When merging, dict items returned by this + function will override initial ones (e.g: if both HttpOperator.headers and `headers` have a 'cookie' + item, the one provided by `headers` is kept). Parameters are simply overridden when any of them are + string (e.g.: HttpOperator.endpoint is overridden by `endpoint`). :param response_check: A check against the 'requests' response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. From 1a0badec67040f8cd9b72671e6b1268807c8c16c Mon Sep 17 00:00:00 2001 From: Joffrey Bienvenu Date: Wed, 22 Nov 2023 13:30:32 +0100 Subject: [PATCH 8/8] feat: Correctly test parameters overriding --- tests/providers/http/operators/test_http.py | 63 ++++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/tests/providers/http/operators/test_http.py b/tests/providers/http/operators/test_http.py index 5e48bff1d820a..dfd82a17aecd8 100644 --- a/tests/providers/http/operators/test_http.py +++ b/tests/providers/http/operators/test_http.py @@ -24,6 +24,7 @@ import pytest from requests import Response +from requests.models import RequestEncodingMixin from airflow.exceptions import AirflowException, TaskDeferred from airflow.providers.http.operators.http import HttpOperator @@ -112,41 +113,75 @@ def test_async_execute_successfully(self, requests_mock): ) assert result == "content" - def test_paginated_responses(self, requests_mock): + @pytest.mark.parametrize( + "data, headers, extra_options, pagination_data, pagination_headers, pagination_extra_options", + [ + ({"data": 1}, {"x-head": "1"}, {"verify": False}, {"data": 2}, {"x-head": "0"}, {"verify": True}), + ("data foo", {"x-head": "1"}, {"verify": False}, {"data": 2}, {"x-head": "0"}, {"verify": True}), + ("data foo", {"x-head": "1"}, {"verify": False}, "data bar", {"x-head": "0"}, {"verify": True}), + ({"data": 1}, {"x-head": "1"}, {"verify": False}, "data foo", {"x-head": "0"}, {"verify": True}), + ], + ) + def test_pagination( + self, + requests_mock, + data, + headers, + extra_options, + pagination_data, + pagination_headers, + pagination_extra_options, + ): """ Test that the HttpOperator calls repetitively the API when a pagination_function is provided, and as long as this function returns a dictionary that override previous' call parameters. """ - iterations: int = 0 + is_second_call: bool = False def pagination_function(response: Response) -> dict | None: """Paginated function which returns None at the second call.""" - nonlocal iterations - if iterations < 2: - iterations += 1 + nonlocal is_second_call + if not is_second_call: + is_second_call = True return dict( endpoint=response.json()["endpoint"], - data="", - headers={}, - extra_options={}, + data=pagination_data, + headers=pagination_headers, + extra_options=pagination_extra_options, ) return None - requests_mock.get("http://www.example.com/foo", json={"value": 5, "endpoint": "bar"}) - requests_mock.get("http://www.example.com/bar", json={"value": 10, "endpoint": "foo"}) + first_endpoint = requests_mock.post("http://www.example.com/1", json={"value": 5, "endpoint": "2"}) + second_endpoint = requests_mock.post("http://www.example.com/2", json={"value": 10, "endpoint": "3"}) operator = HttpOperator( task_id="test_HTTP_op", - method="GET", - endpoint="/foo", + method="POST", + endpoint="/1", + data=data, + headers=headers, + extra_options=extra_options, http_conn_id="HTTP_EXAMPLE", pagination_function=pagination_function, response_filter=lambda resp: [entry.json()["value"] for entry in resp], ) result = operator.execute({}) - assert result == [5, 10, 5] - def test_async_paginated_responses(self, requests_mock): + # Ensure the initial call is made with parameters passed to the Operator + first_call = first_endpoint.request_history[0] + assert first_call.headers.items() >= headers.items() + assert first_call.body == RequestEncodingMixin._encode_params(data) + assert first_call.verify is extra_options["verify"] + + # Ensure the second - paginated - call is made with parameters merged from the pagination function + second_call = second_endpoint.request_history[0] + assert second_call.headers.items() >= pagination_headers.items() + assert second_call.body == RequestEncodingMixin._encode_params(pagination_data) + assert second_call.verify is pagination_extra_options["verify"] + + assert result == [5, 10] + + def test_async_pagination(self, requests_mock): """ Test that the HttpOperator calls asynchronously and repetitively the API when a pagination_function is provided, and as long as this function