Skip to content

Commit

Permalink
updated docstring for ManagedTableDataSet
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Farah <danny_farah@mckinsey.com>
  • Loading branch information
dannyrfar committed Mar 21, 2023
1 parent 498b548 commit 9b43324
Showing 1 changed file with 56 additions and 18 deletions.
74 changes: 56 additions & 18 deletions kedro-datasets/kedro_datasets/databricks/managed_table_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,18 @@ def schema(self) -> StructType:

class ManagedTableDataSet(AbstractVersionedDataSet):
"""``ManagedTableDataSet`` loads and saves data into managed delta tables on Databricks.
Load and save can be in Spark or Pandas dataframes, specified in dataframe_type.
When saving data, you can specify one of three modes: overwtire(default), append,
or upsert. Upsert requires you to specify the primary_column parameter which
will be used as part of the join condition. This dataset works best with
the databricks kedro starter. That starter comes with hooks that allow this
dataset to function properly. Follow the instructions in that starter to
setup your project for this dataset.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
names_and_ages@spark:
Expand All @@ -167,23 +175,24 @@ class ManagedTableDataSet(AbstractVersionedDataSet):
`Python API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#use-the-data-catalog-with-the-code-api>`_:
::
Launch a pyspark session with the following configs:
% pyspark --packages io.delta:delta-core_2.12:1.2.1
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
IntegerType, StructType)
>>> from kedro_datasets.databricks import ManagedTableDataSet
>>> schema = StructType([StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>> data_set = ManagedTableDataSet(table="names_and_ages")
>>> data_set.save(spark_df)
>>> reloaded = data_set.load()
>>> reloaded.take(4)"""
% pyspark --packages io.delta:delta-core_2.12:1.2.1
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
IntegerType, StructType)
>>> from kedro_datasets.databricks import ManagedTableDataSet
>>> schema = StructType([StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>> data_set = ManagedTableDataSet(table="names_and_ages")
>>> data_set.save(spark_df)
>>> reloaded = data_set.load()
>>> reloaded.take(4)
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
Expand All @@ -207,7 +216,36 @@ def __init__( # pylint: disable=R0913
partition_columns: List[str] = None,
owner_group: str = None,
) -> None:
"""Creates a new instance of ``ManagedTableDataSet``."""
"""Creates a new instance of ``ManagedTableDataSet``
Args:
table (str): the name of the table
catalog (str, optional): the name of the catalog in Unity.
Defaults to None.
database (str, optional): the name of the database
(also referred to as schema). Defaults to "default".
write_mode (str, optional): the mode to write the data into the table.
Options are:["overwrite", "append", "upsert"].
"upsert" mode requires primary_key field to be populated.
Defaults to "overwrite".
dataframe_type (str, optional): "pandas" or "spark" dataframe.
Defaults to "spark".
primary_key (Union[str, List[str]], optional): the primary key of the table.
Can be in the form of a list. Defaults to None.
version (Version, optional): kedro.io.core.Version instance to load the data.
Defaults to None.
schema (Dict[str, Any], optional): the schema of the table in JSON form.
Dataframes will be truncated to match the schema if provided.
Used by the hooks to create the table if the schema is provided
Defaults to None.
partition_columns (List[str], optional): the columns to use for partitioning the table.
Used by the hooks. Defaults to None.
owner_group (str, optional): if table access control is enabled in your workspace,
specifying owner_group will transfer ownership of the table and database to
this owner. All databases should have the same owner_group. Defaults to None.
Raises:
DataSetError: Invalid configuration supplied (through ManagedTable validation)
"""

self._table = ManagedTable(
database=database,
Expand Down

0 comments on commit 9b43324

Please sign in to comment.