diff --git a/python/Cargo.toml b/python/Cargo.toml index c2d5f3fd27..0ba5b5ed9e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.17.3" +version = "0.17.4" authors = ["Qingping Hou ", "Will Jones "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 59e3d93d88..d97331c094 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -721,10 +721,17 @@ class DeltaFileSystemHandler: def __init__( self, - root: str, - options: dict[str, str] | None = None, - known_sizes: dict[str, int] | None = None, + table_uri: str, + options: Dict[str, str] | None = None, + known_sizes: Dict[str, int] | None = None, ) -> None: ... + @classmethod + def from_table( + cls, + table: RawDeltaTable, + options: Dict[str, str] | None = None, + known_sizes: Dict[str, int] | None = None, + ) -> "DeltaFileSystemHandler": ... def get_type_name(self) -> str: ... def copy_file(self, src: str, dst: str) -> None: """Copy a file. @@ -776,7 +783,7 @@ class DeltaFileSystemHandler: def open_input_file(self, path: str) -> ObjectInputFile: """Open an input file for random access reading.""" def open_output_stream( - self, path: str, metadata: dict[str, str] | None = None + self, path: str, metadata: Dict[str, str] | None = None ) -> ObjectOutputStream: """Open an output stream for sequential writing.""" diff --git a/python/deltalake/fs.py b/python/deltalake/fs.py index 12e33f40e3..cf1780036a 100644 --- a/python/deltalake/fs.py +++ b/python/deltalake/fs.py @@ -1,17 +1,102 @@ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Mapping, Optional import pyarrow as pa from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler -from ._internal import DeltaFileSystemHandler +from ._internal import DeltaFileSystemHandler, RawDeltaTable # NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks. -class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler): +class DeltaStorageHandler(FileSystemHandler): """ DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler. """ + def __init__( + self, + table_uri: str, + options: Optional[Dict[str, str]] = None, + known_sizes: Optional[Dict[str, int]] = None, + ): + self._handler = DeltaFileSystemHandler( + table_uri=table_uri, options=options, known_sizes=known_sizes + ) + + @classmethod + def from_table( + cls, + table: RawDeltaTable, + options: Optional[Dict[str, str]] = None, + known_sizes: Optional[Dict[str, int]] = None, + ) -> "DeltaStorageHandler": + self = cls.__new__(cls) + self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes) + return self + + def get_type_name(self) -> str: + return self._handler.get_type_name() + + def copy_file(self, src: str, dst: str) -> None: + """Copy a file. + + If the destination exists and is a directory, an error is returned. Otherwise, it is replaced. + """ + return self._handler.copy_file(src=src, dst=dst) + + def create_dir(self, path: str, recursive: bool = True) -> None: + """Create a directory and subdirectories. + + This function succeeds if the directory already exists. + """ + return self._handler.create_dir(path, recursive) + + def delete_dir(self, path: str) -> None: + """Delete a directory and its contents, recursively.""" + return self._handler.delete_dir(path) + + def delete_file(self, path: str) -> None: + """Delete a file.""" + return self._handler.delete_file(path) + + def equals(self, other: Any) -> bool: + return self._handler.equals(other) + + def delete_dir_contents( + self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False + ) -> None: + """Delete a directory's contents, recursively. + + Like delete_dir, but doesn't delete the directory itself. + """ + return self._handler.delete_dir_contents( + path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok + ) + + def delete_root_dir_contents(self) -> None: + """Delete the root directory contents, recursively.""" + return self._handler.delete_root_dir_contents() + + def get_file_info(self, paths: List[str]) -> List[FileInfo]: + """Get info for the given files. + + A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound. + An exception indicates a truly exceptional condition (low-level I/O error, etc.). + """ + return self._handler.get_file_info(paths) + + def move(self, src: str, dest: str) -> None: + """Move / rename a file or directory. + + If the destination exists: - if it is a non-empty directory, an error is returned - otherwise, + if it has the same type as the source, it is replaced - otherwise, behavior is + unspecified (implementation-dependent). + """ + self._handler.move_file(src=src, dest=dest) + + def normalize_path(self, path: str) -> str: + """Normalize filesystem path.""" + return self._handler.normalize_path(path) + def open_input_file(self, path: str) -> pa.PythonFile: """ Open an input file for random access reading. @@ -22,7 +107,7 @@ def open_input_file(self, path: str) -> pa.PythonFile: Returns: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + return pa.PythonFile(self._handler.open_input_file(path)) def open_input_stream(self, path: str) -> pa.PythonFile: """ @@ -34,7 +119,7 @@ def open_input_stream(self, path: str) -> pa.PythonFile: Returns: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + return pa.PythonFile(self._handler.open_input_file(path)) def open_output_stream( self, path: str, metadata: Optional[Dict[str, str]] = None @@ -51,11 +136,9 @@ def open_output_stream( Returns: NativeFile """ - return pa.PythonFile( - DeltaFileSystemHandler.open_output_stream(self, path, metadata) - ) + return pa.PythonFile(self._handler.open_output_stream(path, metadata)) - def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore + def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: """ Get info for the files defined by FileSelector. @@ -65,6 +148,9 @@ def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # t Returns: list of file info objects """ - return DeltaFileSystemHandler.get_file_info_selector( - self, selector.base_dir, selector.allow_not_found, selector.recursive + return self._handler.get_file_info_selector( + selector.base_dir, selector.allow_not_found, selector.recursive ) + + def open_append_stream(self, path: str, metadata: Mapping[str, str]) -> None: + raise NotImplementedError diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 58b5297a77..31e71e8fc6 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1095,8 +1095,10 @@ def to_pyarrow_dataset( x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"]) } filesystem = pa_fs.PyFileSystem( - DeltaStorageHandler( - self._table.table_uri(), self._storage_options, file_sizes + DeltaStorageHandler.from_table( + self._table, + self._storage_options, + file_sizes, ) ) format = ParquetFileFormat( diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index e4dd3fd42d..eaf95650ea 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -339,13 +339,18 @@ def write_deltalake( "schema_mode 'merge' is not supported in pyarrow engine. Use engine=rust" ) # We need to write against the latest table version - filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) def sort_arrow_schema(schema: pa.schema) -> pa.schema: sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type))) return pa.schema(sorted_cols) if table: # already exists + filesystem = pa_fs.PyFileSystem( + DeltaStorageHandler.from_table( + table=table._table, options=storage_options + ) + ) + if sort_arrow_schema(schema) != sort_arrow_schema( table.schema().to_pyarrow(as_large_types=large_dtypes) ) and not (mode == "overwrite" and schema_mode == "overwrite"): @@ -370,6 +375,9 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema: partition_by = table.metadata().partition_columns else: # creating a new table + filesystem = pa_fs.PyFileSystem( + DeltaStorageHandler(table_uri, options=storage_options) + ) current_version = -1 dtype_map = { diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 7a3872c660..2825bf9092 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,14 +1,14 @@ -use std::collections::HashMap; -use std::sync::Arc; - use crate::error::PythonError; use crate::utils::{delete_dir, rt, walk_tree}; +use crate::RawDeltaTable; use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path}; use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{IntoPyDict, PyBytes}; +use pyo3::types::{IntoPyDict, PyBytes, PyType}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; use tokio::io::{AsyncWrite, AsyncWriteExt}; const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; @@ -43,19 +43,39 @@ impl DeltaFileSystemHandler { #[new] #[pyo3(signature = (table_uri, options = None, known_sizes = None))] fn new( - table_uri: &str, + table_uri: String, options: Option>, known_sizes: Option>, ) -> PyResult { - let storage = DeltaTableBuilder::from_uri(table_uri) + let storage = DeltaTableBuilder::from_uri(&table_uri) .with_storage_options(options.clone().unwrap_or_default()) .build_storage() .map_err(PythonError::from)? .object_store(); + + Ok(Self { + inner: storage, + config: FsConfig { + root_url: table_uri, + options: options.unwrap_or_default(), + }, + known_sizes, + }) + } + + #[classmethod] + #[pyo3(signature = (table, options = None, known_sizes = None))] + fn from_table( + _cls: &PyType, + table: &RawDeltaTable, + options: Option>, + known_sizes: Option>, + ) -> PyResult { + let storage = table._table.object_store(); Ok(Self { inner: storage, config: FsConfig { - root_url: table_uri.into(), + root_url: table._table.table_uri(), options: options.unwrap_or_default(), }, known_sizes, diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 547e697e4b..245d97a89a 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -50,7 +50,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds, monkeypatch): # Create unauthenticated handler storage_handler = DeltaStorageHandler( "s3://deltars/", - { + options={ "AWS_ENDPOINT_URL": s3_localstack_creds["AWS_ENDPOINT_URL"], # Grants anonymous access. If we don't do this, will timeout trying # to reading from EC2 instance provider.