Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HttpOperator pagination with str data #35782

Merged
28 changes: 20 additions & 8 deletions airflow/providers/http/operators/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ class HttpOperator(BaseOperator):
:param pagination_function: A callable that generates the parameters used to call the API again,
based on the previous response. Typically used when the API is paginated and returns for e.g a
cursor, a 'next page id', or a 'next page URL'. When provided, the Operator will call the API
repeatedly until this callable returns None. Also, the result of the Operator will become by
default a list of Response.text objects (instead of a single response object). Same with the
other injected functions (like response_check, response_filter, ...) which will also receive a
list of Response object. This function receives a Response object form previous call, and should
return a dict of parameters (`endpoint`, `data`, `headers`, `extra_options`), which will be merged
and will override the one used in the initial API call.
repeatedly until this callable returns None. The result of the Operator will become by default a
list of Response.text objects (instead of a single response object). Same with the other injected
functions (like response_check, response_filter, ...) which will also receive a list of Response
objects. This function receives a Response object form previous call, and should return a nested
dictionary with the following optional keys: `endpoint`, `data`, `headers` and `extra_options.
Those keys will be merged and/or override the parameters provided into the HttpOperator declaration.
Parameters are merged when they are both a dictionary (e.g.: HttpOperator.headers will be merged
with the `headers` dict provided by this function). When merging, dict items returned by this
function will override initial ones (e.g: if both HttpOperator.headers and `headers` have a 'cookie'
item, the one provided by `headers` is kept). Parameters are simply overridden when any of them are
string (e.g.: HttpOperator.endpoint is overridden by `endpoint`).
:param response_check: A check against the 'requests' response object.
The callable takes the response object as the first positional argument
and optionally any number of keyword arguments available in the context dictionary.
Expand Down Expand Up @@ -101,7 +106,7 @@ def __init__(
*,
endpoint: str | None = None,
method: str = "POST",
data: Any = None,
data: dict[str, Any] | str | None = None,
headers: dict[str, str] | None = None,
pagination_function: Callable[..., Any] | None = None,
response_check: Callable[..., bool] | None = None,
Expand Down Expand Up @@ -271,9 +276,16 @@ def _merge_next_page_parameters(self, next_page_params: dict) -> dict:
:param next_page_params: A dictionary containing the parameters for the next page.
:return: A dictionary containing the merged parameters.
"""
data: str | dict | None = None # makes mypy happy
next_page_data_param = next_page_params.get("data")
if isinstance(self.data, dict) and isinstance(next_page_data_param, dict):
data = merge_dicts(self.data, next_page_data_param)
else:
data = next_page_data_param or self.data

return dict(
endpoint=next_page_params.get("endpoint") or self.endpoint,
data=merge_dicts(self.data, next_page_params.get("data", {})),
data=data,
headers=merge_dicts(self.headers, next_page_params.get("headers", {})),
extra_options=merge_dicts(self.extra_options, next_page_params.get("extra_options", {})),
)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/http/triggers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(
method: str = "POST",
endpoint: str | None = None,
headers: dict[str, str] | None = None,
data: Any = None,
data: dict[str, Any] | str | None = None,
extra_options: dict[str, Any] | None = None,
):
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/http/operators/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def pagination_function(response: Response) -> dict | None:
iterations += 1
return dict(
endpoint=response.json()["endpoint"],
data={},
data="",
headers={},
extra_options={},
)
Expand Down