Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dataset API add save method #180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Major features and improvements:
* Added pandas 2.0 support.
* Added SQLAlchemy 2.0 support (and dropped support for versions below 1.4).
* Added a save method to the APIDataSet

* Reduced constructor arguments for `APIDataSet` by replacing most arguments with a single constructor argument `load_args`. This makes it more consistent with other Kedro DataSets and the underlying `requests` API, and automatically enables the full configuration domain: stream, certificates, proxies, and more.
* Relaxed Kedro version pin to `>=0.16`

Expand Down Expand Up @@ -42,10 +44,10 @@ Many thanks to the following Kedroids for contributing PRs to this release:

* Added the following new datasets:

| Type | Description | Location |
| ------------------------------------ | -------------------------------------------------------------------------- | ----------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |
| Type | Description | Location |
| -------------------------------- | --------------------------------------------------------------------------------------------------------------------- | -------------------------- |
| `polars.CSVDataSet` | A `CSVDataSet` backed by [polars](https://www.pola.rs/), a lighting fast dataframe package built entirely using Rust. | `kedro_datasets.polars` |
| `snowflake.SnowparkTableDataSet` | Work with [Snowpark](https://www.snowflake.com/en/data-cloud/snowpark/) DataFrames from tables in Snowflake. | `kedro_datasets.snowflake` |

## Bug fixes and other changes
* Add `mssql` backend to the `SQLQueryDataSet` DataSet using `pyodbc` library.
Expand Down
138 changes: 113 additions & 25 deletions kedro-datasets/kedro_datasets/api/api_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, List, NoReturn, Tuple, Union
import json as json_ # make pylint happy
from copy import deepcopy
from typing import Any, Dict, List, Tuple, Union

import requests
from kedro.io.core import AbstractDataSet, DataSetError
Expand All @@ -14,11 +16,10 @@


class APIDataSet(AbstractDataSet[None, requests.Response]):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
"""``APIDataSet`` loads/saves data from/to HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/

Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml
Expand All @@ -34,10 +35,8 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
agg_level_desc: STATE,
year: 2000

Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::
Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_: ::

>>> from kedro.extras.datasets.api import APIDataSet
>>>
Expand All @@ -57,49 +56,101 @@ class APIDataSet(AbstractDataSet[None, requests.Response]):
>>> credentials=("username", "password")
>>> )
>>> data = data_set.load()

``APIDataSet`` can also be used to save output on a remote server using HTTP(S)
methods.

>>> example_table = '{"col1":["val1", "val2"], "col2":["val3", "val4"]}'

>>> data_set = APIDataSet(
method = "POST"
url = "url_of_remote_server",
save_args = {"chunk_size":1}
)
>>> data_set.save(example_table)

On initialisation, we can specify all the necessary parameters in the save args
dictionary. The default HTTP(S) method is POST but PUT is also supported. Two
important parameters to keep in mind are timeout and chunk_size. `timeout` defines
how long our program waits for a response after a request. `chunk_size`, is only
used if the input of save method is a list. It will divide the request into chunks
of size `chunk_size`. For example, here we will send two requests each containing
one row of our example DataFrame.
If the data passed to the save method is not a list, ``APIDataSet`` will check if it
can be loaded as JSON. If true, it will send the data unchanged in a single request.
Otherwise, the ``_save`` method will try to dump the data in JSON format and execute
the request.
"""

DEFAULT_SAVE_ARGS = {
"params": None,
"headers": None,
"auth": None,
"json": None,
"timeout": 60,
"chunk_size": 100,
}
# pylint: disable=too-many-arguments

def __init__(
self,
url: str,
method: str = "GET",
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
McDonnellJoseph marked this conversation as resolved.
Show resolved Hide resolved
credentials: Union[Tuple[str, str], List[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.

Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
method: The method of the request. GET, POST, PUT are the only supported
methods
load_args: Additional parameters to be fed to requests.request.
https://requests.readthedocs.io/en/latest/api/#requests.request
credentials: Allows specifying secrets in credentials.yml.
Expected format is ``('login', 'password')`` if given as a tuple or list.
An ``AuthBase`` instance can be provided for more complex cases.
Expected format is ``('login', 'password')`` if given as a tuple or
list. An ``AuthBase`` instance can be provided for more complex cases.
save_args: Options for saving data on server. Includes all parameters used
during load method. Adds an optional parameter, ``chunk_size`` which
determines the size of the package sent at each request.
Raises:
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are specified.
ValueError: if both ``auth`` in ``load_args`` and ``credentials`` are
specified.
"""
super().__init__()

self._load_args = load_args or {}
self._load_args_auth = self._load_args.pop("auth", None)
# GET method means load
if method == "GET":
self._params = load_args or {}

# PUT, POST, DELETE means save
elif method in ["PUT", "POST"]:
self._params = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._params.update(save_args)
self._chunk_size = self._params.pop("chunk_size", 1)
else:
raise ValueError("Only GET, POST and PUT methods are supported")

self._param_auth = self._params.pop("auth", None)

if credentials is not None and self._load_args_auth is not None:
if credentials is not None and self._param_auth is not None:
raise ValueError("Cannot specify both auth and credentials.")

self._auth = credentials or self._load_args_auth
self._auth = credentials or self._param_auth

if "cert" in self._load_args:
self._load_args["cert"] = self._convert_type(self._load_args["cert"])
if "cert" in self._params:
self._params["cert"] = self._convert_type(self._params["cert"])

if "timeout" in self._load_args:
self._load_args["timeout"] = self._convert_type(self._load_args["timeout"])
if "timeout" in self._params:
self._params["timeout"] = self._convert_type(self._params["timeout"])

self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"auth": self._convert_type(self._auth),
**self._load_args,
**self._params,
}

@staticmethod
Expand Down Expand Up @@ -131,11 +182,48 @@ def _execute_request(self, session: Session) -> requests.Response:
return response

def _load(self) -> requests.Response:
with sessions.Session() as session:
return self._execute_request(session)
if self._request_args["method"] == "GET":
with sessions.Session() as session:
return self._execute_request(session)

raise DataSetError("Only GET method is supported for load")

def _execute_save_with_chunks(
self,
json_data: List[Dict[str, Any]],
) -> requests.Response:
chunk_size = self._chunk_size
n_chunks = len(json_data) // chunk_size + 1

for i in range(n_chunks):
send_data = json_data[i * chunk_size : (i + 1) * chunk_size]
response = self._execute_save_request(json_data=send_data)

return response

def _execute_save_request(self, json_data: Any) -> requests.Response:
try:
json_.loads(json_data)
except TypeError:
self._request_args["json"] = json_.dumps(json_data)
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to send data", exc) from exc

except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response

def _save(self, data: Any) -> requests.Response:
if self._request_args["method"] in ["PUT", "POST"]:
if isinstance(data, list):
return self._execute_save_with_chunks(json_data=data)

return self._execute_save_request(json_data=data)

def _save(self, data: None) -> NoReturn:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")
raise DataSetError("Use PUT or POST methods for save")

def _exists(self) -> bool:
with sessions.Session() as session:
Expand Down
2 changes: 0 additions & 2 deletions kedro-datasets/kedro_datasets/pandas/generic_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def _ensure_file_system_target(self) -> None:
)

def _load(self) -> pd.DataFrame:

self._ensure_file_system_target()

load_path = get_filepath_str(self._get_load_path(), self._protocol)
Expand All @@ -196,7 +195,6 @@ def _load(self) -> pd.DataFrame:
)

def _save(self, data: pd.DataFrame) -> None:

self._ensure_file_system_target()

save_path = get_filepath_str(self._get_save_path(), self._protocol)
Expand Down
1 change: 0 additions & 1 deletion kedro-datasets/kedro_datasets/spark/spark_jdbc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def __init__(

# Update properties in load_args and save_args with credentials.
if credentials is not None:

# Check credentials for bad inputs.
for cred_key, cred_value in credentials.items():
if cred_value is None:
Expand Down
Loading