Skip to content

Commit

Permalink
feat: Added DataFrameWriteOptions option when writing as csv, json, p…
Browse files Browse the repository at this point in the history
…arquet.
  • Loading branch information
allinux committed Sep 6, 2024
1 parent fe0738a commit 7b47717
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 11 deletions.
35 changes: 30 additions & 5 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,37 +409,62 @@ def except_all(self, other: DataFrame) -> DataFrame:
"""
return DataFrame(self.df.except_all(other.df))

def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None:
def write_csv(
self,

Check failure on line 413 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W291)

python/datafusion/dataframe.py:413:14: W291 Trailing whitespace
path: str | pathlib.Path,

Check failure on line 414 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W291)

python/datafusion/dataframe.py:414:34: W291 Trailing whitespace
with_header: bool = False,
write_options_overwrite: bool = False,
write_options_single_file_output: bool = False,
write_options_partition_by: List = [],
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a CSV file.
Args:
path: Path of the CSV file to write.
with_header: If true, output the CSV header row.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.

Check failure on line 426 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:426:89: W505 Doc line too long (176 > 88)
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. Can be set to empty vec![] for non-partitioned writes.

Check failure on line 427 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:427:89: W505 Doc line too long (171 > 88)
"""
self.df.write_csv(str(path), with_header)
self.df.write_csv(str(path), with_header, write_options_overwrite, write_options_single_file_output, write_options_partition_by)

def write_parquet(
self,
path: str | pathlib.Path,
compression: str = "uncompressed",
compression_level: int | None = None,
write_options_overwrite: bool = False,
write_options_single_file_output: bool = False,
write_options_partition_by: List = [],
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a Parquet file.
Args:
path: Path of the Parquet file to write.
compression: Compression type to use.
compression_level: Compression level to use.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.

Check failure on line 447 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:447:89: W505 Doc line too long (176 > 88)
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. Can be set to empty vec![] for non-partitioned writes.

Check failure on line 448 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:448:89: W505 Doc line too long (171 > 88)
"""
self.df.write_parquet(str(path), compression, compression_level)
self.df.write_parquet(str(path), compression, compression_level, write_options_overwrite, write_options_single_file_output, write_options_partition_by)

def write_json(self, path: str | pathlib.Path) -> None:
def write_json(
self,

Check failure on line 453 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W291)

python/datafusion/dataframe.py:453:14: W291 Trailing whitespace
path: str | pathlib.Path,
write_options_overwrite: bool = False,
write_options_single_file_output: bool = False,
write_options_partition_by: List = [],
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a JSON file.
Args:
path: Path of the JSON file to write.
write_options_overwrite: Controls if existing data should be overwritten
write_options_single_file_output: Controls if all partitions should be coalesced into a single output file. Generally will have slower performance when set to true.

Check failure on line 464 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:464:89: W505 Doc line too long (176 > 88)
write_options_partition_by: Sets which columns should be used for hive-style partitioned writes by name. Can be set to empty vec![] for non-partitioned writes.

Check failure on line 465 in python/datafusion/dataframe.py

View workflow job for this annotation

GitHub Actions / build

Ruff (W505)

python/datafusion/dataframe.py:465:89: W505 Doc line too long (171 > 88)
"""
self.df.write_json(str(path))
self.df.write_json(str(path), write_options_overwrite, write_options_single_file_output, write_options_partition_by)

def to_arrow_table(self) -> pa.Table:
"""Execute the :py:class:`DataFrame` and convert it into an Arrow Table.
Expand Down
64 changes: 58 additions & 6 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,24 @@ impl PyDataFrame {
}

/// Write a `DataFrame` to a CSV file.
fn write_csv(&self, path: &str, with_header: bool, py: Python) -> PyResult<()> {
#[pyo3(signature = (
path,
with_header=false,
write_options_overwrite=false,
write_options_single_file_output=false,
write_options_partition_by=vec![],
))]
fn write_csv(
&self,
path: &str,
with_header: bool,

write_options_overwrite: bool,
write_options_single_file_output: bool,
write_options_partition_by: Vec<String>,
py: Python
) -> PyResult<()> {
let csv_options = CsvOptions {
has_header: Some(with_header),
..Default::default()
Expand All @@ -411,7 +428,10 @@ impl PyDataFrame {
py,
self.df.as_ref().clone().write_csv(
path,
DataFrameWriteOptions::new(),
DataFrameWriteOptions::default()
.with_overwrite(write_options_overwrite)
.with_single_file_output(write_options_single_file_output)
.with_partition_by(write_options_partition_by),
Some(csv_options),
),
)?;
Expand All @@ -422,13 +442,21 @@ impl PyDataFrame {
#[pyo3(signature = (
path,
compression="uncompressed",
compression_level=None
compression_level=None,
write_options_overwrite=false,
write_options_single_file_output=false,
write_options_partition_by=vec![],
))]
fn write_parquet(
&self,
path: &str,
compression: &str,
compression_level: Option<u32>,

write_options_overwrite: bool,
write_options_single_file_output: bool,
write_options_partition_by: Vec<String>,
py: Python,
) -> PyResult<()> {
fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
Expand Down Expand Up @@ -472,21 +500,45 @@ impl PyDataFrame {
py,
self.df.as_ref().clone().write_parquet(
path,
DataFrameWriteOptions::new(),
DataFrameWriteOptions::default()
.with_overwrite(write_options_overwrite)
.with_single_file_output(write_options_single_file_output)
.with_partition_by(write_options_partition_by),
Option::from(options),
),
)?;
Ok(())
}

/// Executes a query and writes the results to a partitioned JSON file.
fn write_json(&self, path: &str, py: Python) -> PyResult<()> {
#[pyo3(signature = (
path,
write_options_overwrite=false,
write_options_single_file_output=false,
write_options_partition_by=vec![],
))]
fn write_json(
&self,
path: &str,

write_options_overwrite: bool,
write_options_single_file_output: bool,
write_options_partition_by: Vec<String>,
py: Python
) -> PyResult<()> {
wait_for_future(
py,
self.df
.as_ref()
.clone()
.write_json(path, DataFrameWriteOptions::new(), None),
.write_json(
path,
DataFrameWriteOptions::default()
.with_overwrite(write_options_overwrite)
.with_single_file_output(write_options_single_file_output)
.with_partition_by(write_options_partition_by),
None),
)?;
Ok(())
}
Expand Down

0 comments on commit 7b47717

Please sign in to comment.