diff --git a/.changes/unreleased/Under the Hood-20220924-143713.yaml b/.changes/unreleased/Under the Hood-20220924-143713.yaml new file mode 100644 index 000000000..c537a9395 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220924-143713.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Convert df to pyspark DataFrame if it is koalas before writing +time: 2022-09-24T14:37:13.100404-06:00 +custom: + Author: dbeatty10 ueshin + Issue: "473" + PR: "474" diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index 25d70c722..5721bd25e 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -46,6 +46,7 @@ import importlib.util pandas_available = False pyspark_available = False +koalas_available = False # make sure pandas exists before using it if importlib.util.find_spec("pandas"): @@ -57,17 +58,26 @@ if importlib.util.find_spec("pyspark.pandas"): import pyspark.pandas pyspark_available = True -# preferentially convert pandas DataFrames to pandas-on-Spark DataFrames first +# make sure databricks.koalas exists before using it +if importlib.util.find_spec("databricks.koalas"): + import databricks.koalas + koalas_available = True + +# preferentially convert pandas DataFrames to pandas-on-Spark or Koalas DataFrames first # since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)` # and converting from pandas-on-Spark to Spark DataFrame has no overhead if pyspark_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame): df = pyspark.pandas.frame.DataFrame(df) +elif koalas_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame): + df = databricks.koalas.frame.DataFrame(df) # convert to pyspark.sql.dataframe.DataFrame if isinstance(df, pyspark.sql.dataframe.DataFrame): pass # since it is already a Spark DataFrame elif pyspark_available and isinstance(df, pyspark.pandas.frame.DataFrame): df = df.to_spark() +elif koalas_available and isinstance(df, databricks.koalas.frame.DataFrame): + df = df.to_spark() elif pandas_available and isinstance(df, pandas.core.frame.DataFrame): df = spark.createDataFrame(df) else: