Skip to content

Commit

Permalink
Refactor to move FileFormatConfig to the common crate as well
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Sep 9, 2024
1 parent f1b0655 commit bf24f23
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 135 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/common/file-formats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
[dependencies]
common-error = {path = "../error", default-features = false}
common-py-serde = {path = "../py-serde", default-features = false}
# Remove this dependency once we move Field and TimeUnit out into daft-schema
daft-core = {path = "../../daft-core", default-features = false}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true}
serde_json = {workspace = true, optional = true}

[features]
python = ["dep:pyo3", "common-error/python", "common-py-serde/python"]
python = ["dep:pyo3", "dep:serde_json", "common-error/python", "common-py-serde/python", "daft-core/python"]

[package]
edition = {workspace = true}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use common_file_formats::FileFormat;
use crate::FileFormat;
// TODO: Remove this once it is refactored to move into daft-schema
use daft_core::datatypes::{Field, TimeUnit};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
Expand All @@ -9,24 +10,11 @@ use common_py_serde::impl_bincode_py_state_serialization;
#[cfg(feature = "python")]
use {
common_py_serde::{deserialize_py_object, serialize_py_object},
// TODO: Remove this once it is refactored to move into daft-schema
daft_core::python::{datatype::PyTimeUnit, field::PyField},
pyo3::{pyclass, pyclass::CompareOp, pymethods, IntoPy, PyObject, PyResult, Python},
pyo3::{pyclass, pymethods, PyObject, PyResult, Python},
};

impl From<&FileFormatConfig> for FileFormat {
fn from(file_format_config: &FileFormatConfig) -> Self {
match file_format_config {
FileFormatConfig::Parquet(_) => Self::Parquet,
FileFormatConfig::Csv(_) => Self::Csv,
FileFormatConfig::Json(_) => Self::Json,
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => Self::Database,
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => Self::Python,
}
}
}

/// Configuration for parsing a particular file format.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum FileFormatConfig {
Expand Down Expand Up @@ -360,81 +348,3 @@ impl DatabaseSourceConfig {
}

impl_bincode_py_state_serialization!(DatabaseSourceConfig);

/// Configuration for parsing a particular file format.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
#[cfg_attr(
feature = "python",
pyclass(module = "daft.daft", name = "FileFormatConfig")
)]
pub struct PyFileFormatConfig(Arc<FileFormatConfig>);

#[cfg(feature = "python")]
#[pymethods]
impl PyFileFormatConfig {
/// Create a Parquet file format config.
#[staticmethod]
fn from_parquet_config(config: ParquetSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Parquet(config)))
}

/// Create a CSV file format config.
#[staticmethod]
fn from_csv_config(config: CsvSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Csv(config)))
}

/// Create a JSON file format config.
#[staticmethod]
fn from_json_config(config: JsonSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Json(config)))
}

/// Create a Database file format config.
#[staticmethod]
fn from_database_config(config: DatabaseSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Database(config)))
}

/// Get the underlying data source config.
#[getter]
fn get_config(&self, py: Python) -> PyObject {
use FileFormatConfig::*;

match self.0.as_ref() {
Parquet(config) => config.clone().into_py(py),
Csv(config) => config.clone().into_py(py),
Json(config) => config.clone().into_py(py),
Database(config) => config.clone().into_py(py),
PythonFunction => py.None(),
}
}

/// Get the file format for this file format config.
fn file_format(&self) -> FileFormat {
self.0.as_ref().into()
}

fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool {
match op {
CompareOp::Eq => self.0 == other.0,
CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq),
_ => unimplemented!("not implemented"),
}
}
}

impl_bincode_py_state_serialization!(PyFileFormatConfig);

impl From<PyFileFormatConfig> for Arc<FileFormatConfig> {
fn from(file_format_config: PyFileFormatConfig) -> Self {
file_format_config.0
}
}

impl From<Arc<FileFormatConfig>> for PyFileFormatConfig {
fn from(file_format_config: Arc<FileFormatConfig>) -> Self {
Self(file_format_config)
}
}
22 changes: 22 additions & 0 deletions src/common/file-formats/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
mod file_format;
pub use file_format::FileFormat;

mod file_format_config;
pub use file_format_config::{
CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig,
};

#[cfg(feature = "python")]
pub use file_format_config::DatabaseSourceConfig;

#[cfg(feature = "python")]
pub mod python;

impl From<&FileFormatConfig> for FileFormat {
fn from(file_format_config: &FileFormatConfig) -> Self {
match file_format_config {
FileFormatConfig::Parquet(_) => Self::Parquet,
FileFormatConfig::Csv(_) => Self::Csv,
FileFormatConfig::Json(_) => Self::Json,
#[cfg(feature = "python")]
FileFormatConfig::Database(_) => Self::Database,
#[cfg(feature = "python")]
FileFormatConfig::PythonFunction => Self::Python,
}
}
}
86 changes: 84 additions & 2 deletions src/common/file-formats/src/python.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,88 @@
use pyo3::prelude::*;
use std::sync::Arc;

use crate::FileFormat;
use common_py_serde::impl_bincode_py_state_serialization;
use pyo3::{basic::CompareOp, prelude::*};
use serde::{Deserialize, Serialize};

use crate::{
file_format_config::DatabaseSourceConfig, CsvSourceConfig, FileFormat, FileFormatConfig,
JsonSourceConfig, ParquetSourceConfig,
};

/// Configuration for parsing a particular file format.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
#[cfg_attr(
feature = "python",
pyclass(module = "daft.daft", name = "FileFormatConfig")
)]
pub struct PyFileFormatConfig(Arc<FileFormatConfig>);

#[pymethods]
impl PyFileFormatConfig {
/// Create a Parquet file format config.
#[staticmethod]
fn from_parquet_config(config: ParquetSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Parquet(config)))
}

/// Create a CSV file format config.
#[staticmethod]
fn from_csv_config(config: CsvSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Csv(config)))
}

/// Create a JSON file format config.
#[staticmethod]
fn from_json_config(config: JsonSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Json(config)))
}

/// Create a Database file format config.
#[staticmethod]
fn from_database_config(config: DatabaseSourceConfig) -> Self {
Self(Arc::new(FileFormatConfig::Database(config)))
}

/// Get the underlying data source config.
#[getter]
fn get_config(&self, py: Python) -> PyObject {
match self.0.as_ref() {
FileFormatConfig::Parquet(config) => config.clone().into_py(py),
FileFormatConfig::Csv(config) => config.clone().into_py(py),
FileFormatConfig::Json(config) => config.clone().into_py(py),
FileFormatConfig::Database(config) => config.clone().into_py(py),
FileFormatConfig::PythonFunction => py.None(),
}
}

/// Get the file format for this file format config.
fn file_format(&self) -> FileFormat {
self.0.as_ref().into()
}

fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool {
match op {
CompareOp::Eq => self.0 == other.0,
CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq),
_ => unimplemented!("not implemented"),
}
}
}

impl_bincode_py_state_serialization!(PyFileFormatConfig);

impl From<PyFileFormatConfig> for Arc<FileFormatConfig> {
fn from(file_format_config: PyFileFormatConfig) -> Self {
file_format_config.0
}
}

impl From<Arc<FileFormatConfig>> for PyFileFormatConfig {
fn from(file_format_config: Arc<FileFormatConfig>) -> Self {
Self(file_format_config)
}
}

pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_class::<FileFormat>()?;
Expand Down
3 changes: 2 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-file-formats = {path = "../common/file-formats", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
Expand All @@ -25,7 +26,7 @@ tokio-stream = {workspace = true}
tracing = {workspace = true}

[features]
python = ["dep:pyo3", "common-daft-config/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]
python = ["dep:pyo3", "common-daft-config/python", "common-file-formats/python", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]

[package]
edition = {workspace = true}
Expand Down
7 changes: 2 additions & 5 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use common_error::DaftResult;
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_io::IOStatsRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_micropartition::MicroPartition;
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_scan::{
file_format::{FileFormatConfig, ParquetSourceConfig},
storage_config::StorageConfig,
ChunkSpec, ScanTask,
};
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask};
use futures::{Stream, StreamExt};
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
Expand Down
3 changes: 2 additions & 1 deletion src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
arrow2 = {workspace = true}
bincode = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-file-formats = {path = "../common/file-formats", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
Expand All @@ -16,7 +17,7 @@ pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}

[features]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-io/python", "daft-parquet/python", "daft-scan/python", "daft-stats/python"]
python = ["dep:pyo3", "common-error/python", "common-file-formats/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-io/python", "daft-parquet/python", "daft-scan/python", "daft-stats/python"]

[package]
edition = {workspace = true}
Expand Down
15 changes: 6 additions & 9 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@ use std::{ops::Deref, sync::Mutex};

use arrow2::io::parquet::read::schema::infer_schema_with_options;
use common_error::DaftResult;
use common_file_formats::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use daft_core::prelude::*;
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_dsl::ExprRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_parquet::read::{
read_parquet_bulk, read_parquet_metadata_bulk, ParquetSchemaInferenceOptions,
};
use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use daft_scan::storage_config::{NativeStorageConfig, StorageConfig};
use daft_scan::{ChunkSpec, DataSource, Pushdowns, ScanTask};
use daft_table::Table;

use crate::{DaftCSVSnafu, DaftCoreComputeSnafu};
use parquet2::metadata::FileMetaData;
use snafu::ResultExt;

#[cfg(feature = "python")]
use crate::PyIOSnafu;
use crate::{DaftCSVSnafu, DaftCoreComputeSnafu};

use daft_io::{get_runtime, IOClient, IOConfig, IOStatsContext, IOStatsRef};
use daft_stats::TableStatistics;
use daft_stats::{PartitionSpec, TableMetadata};

#[cfg(feature = "python")]
use {crate::PyIOSnafu, common_file_formats::DatabaseSourceConfig};

#[derive(Debug)]
pub(crate) enum TableState {
Unloaded(Arc<ScanTask>),
Expand Down Expand Up @@ -356,10 +356,7 @@ fn materialize_scan_task(
})
.collect::<crate::Result<Vec<_>>>()
})?,
FileFormatConfig::Database(daft_scan::file_format::DatabaseSourceConfig {
sql,
conn,
}) => {
FileFormatConfig::Database(DatabaseSourceConfig { sql, conn }) => {
let predicate = scan_task
.pushdowns
.filters
Expand Down
7 changes: 3 additions & 4 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ use pyo3::prelude::*;
pub use sink_info::{DeltaLakeCatalogInfo, IcebergCatalogInfo, LanceCatalogInfo};
#[cfg(feature = "python")]
use {
daft_scan::file_format::{
CsvSourceConfig, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig,
common_file_formats::{
python::PyFileFormatConfig, CsvSourceConfig, DatabaseSourceConfig, JsonSourceConfig,
ParquetSourceConfig,
},
daft_scan::storage_config::{NativeStorageConfig, PyStorageConfig, PythonStorageConfig},
};

#[cfg(feature = "python")]
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
use daft_scan::file_format::DatabaseSourceConfig;

parent.add_class::<PyLogicalPlanBuilder>()?;
parent.add_class::<PyFileFormatConfig>()?;
parent.add_class::<ParquetSourceConfig>()?;
Expand Down
Loading

0 comments on commit bf24f23

Please sign in to comment.