Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): reuse state in to_pyarrow_dataset #2485

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.17.3"
version = "0.17.4"
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
15 changes: 11 additions & 4 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -721,10 +721,17 @@ class DeltaFileSystemHandler:

def __init__(
self,
root: str,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
table_uri: str,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> None: ...
@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> "DeltaFileSystemHandler": ...
def get_type_name(self) -> str: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
Expand Down Expand Up @@ -776,7 +783,7 @@ class DeltaFileSystemHandler:
def open_input_file(self, path: str) -> ObjectInputFile:
"""Open an input file for random access reading."""
def open_output_stream(
self, path: str, metadata: dict[str, str] | None = None
self, path: str, metadata: Dict[str, str] | None = None
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

Expand Down
108 changes: 97 additions & 11 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,102 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Mapping, Optional

import pyarrow as pa
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler

from ._internal import DeltaFileSystemHandler
from ._internal import DeltaFileSystemHandler, RawDeltaTable


# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
class DeltaStorageHandler(FileSystemHandler):
"""
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

def __init__(
self,
table_uri: str,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
self._handler = DeltaFileSystemHandler(
table_uri=table_uri, options=options, known_sizes=known_sizes
)

@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
) -> "DeltaStorageHandler":
self = cls.__new__(cls)
self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes)
return self

def get_type_name(self) -> str:
return self._handler.get_type_name()

def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.

If the destination exists and is a directory, an error is returned. Otherwise, it is replaced.
"""
return self._handler.copy_file(src=src, dst=dst)

def create_dir(self, path: str, recursive: bool = True) -> None:
"""Create a directory and subdirectories.

This function succeeds if the directory already exists.
"""
return self._handler.create_dir(path, recursive)

def delete_dir(self, path: str) -> None:
"""Delete a directory and its contents, recursively."""
return self._handler.delete_dir(path)

def delete_file(self, path: str) -> None:
"""Delete a file."""
return self._handler.delete_file(path)

def equals(self, other: Any) -> bool:
return self._handler.equals(other)

def delete_dir_contents(
self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False
) -> None:
"""Delete a directory's contents, recursively.

Like delete_dir, but doesn't delete the directory itself.
"""
return self._handler.delete_dir_contents(
path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok
)

def delete_root_dir_contents(self) -> None:
"""Delete the root directory contents, recursively."""
return self._handler.delete_root_dir_contents()

def get_file_info(self, paths: List[str]) -> List[FileInfo]:
"""Get info for the given files.

A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound.
An exception indicates a truly exceptional condition (low-level I/O error, etc.).
"""
return self._handler.get_file_info(paths)

def move(self, src: str, dest: str) -> None:
"""Move / rename a file or directory.

If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
if it has the same type as the source, it is replaced - otherwise, behavior is
unspecified (implementation-dependent).
"""
self._handler.move_file(src=src, dest=dest)

def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
return self._handler.normalize_path(path)

def open_input_file(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
Expand All @@ -22,7 +107,7 @@ def open_input_file(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_input_stream(self, path: str) -> pa.PythonFile:
"""
Expand All @@ -34,7 +119,7 @@ def open_input_stream(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
Expand All @@ -51,11 +136,9 @@ def open_output_stream(
Returns:
NativeFile
"""
return pa.PythonFile(
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
)
return pa.PythonFile(self._handler.open_output_stream(path, metadata))

def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]:
"""
Get info for the files defined by FileSelector.

Expand All @@ -65,6 +148,9 @@ def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # t
Returns:
list of file info objects
"""
return DeltaFileSystemHandler.get_file_info_selector(
self, selector.base_dir, selector.allow_not_found, selector.recursive
return self._handler.get_file_info_selector(
selector.base_dir, selector.allow_not_found, selector.recursive
)

def open_append_stream(self, path: str, metadata: Mapping[str, str]) -> None:
raise NotImplementedError
6 changes: 4 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,10 @@ def to_pyarrow_dataset(
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
}
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
self._table.table_uri(), self._storage_options, file_sizes
DeltaStorageHandler.from_table(
self._table,
self._storage_options,
file_sizes,
)
)
format = ParquetFileFormat(
Expand Down
10 changes: 9 additions & 1 deletion python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,18 @@ def write_deltalake(
"schema_mode 'merge' is not supported in pyarrow engine. Use engine=rust"
)
# We need to write against the latest table version
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

def sort_arrow_schema(schema: pa.schema) -> pa.schema:
sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type)))
return pa.schema(sorted_cols)

if table: # already exists
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
table=table._table, options=storage_options
)
)

if sort_arrow_schema(schema) != sort_arrow_schema(
table.schema().to_pyarrow(as_large_types=large_dtypes)
) and not (mode == "overwrite" and schema_mode == "overwrite"):
Expand All @@ -370,6 +375,9 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema:
partition_by = table.metadata().partition_columns

else: # creating a new table
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(table_uri, options=storage_options)
)
current_version = -1

dtype_map = {
Expand Down
34 changes: 27 additions & 7 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use crate::RawDeltaTable;
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use pyo3::types::{IntoPyDict, PyBytes, PyType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
Expand Down Expand Up @@ -43,19 +43,39 @@ impl DeltaFileSystemHandler {
#[new]
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
fn new(
table_uri: &str,
table_uri: String,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let storage = DeltaTableBuilder::from_uri(&table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.object_store();

Ok(Self {
inner: storage,
config: FsConfig {
root_url: table_uri,
options: options.unwrap_or_default(),
},
known_sizes,
})
}

#[classmethod]
#[pyo3(signature = (table, options = None, known_sizes = None))]
fn from_table(
_cls: &PyType,
table: &RawDeltaTable,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table._table.object_store();
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table_uri.into(),
root_url: table._table.table_uri(),
options: options.unwrap_or_default(),
},
known_sizes,
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds, monkeypatch):
# Create unauthenticated handler
storage_handler = DeltaStorageHandler(
"s3://deltars/",
{
options={
"AWS_ENDPOINT_URL": s3_localstack_creds["AWS_ENDPOINT_URL"],
# Grants anonymous access. If we don't do this, will timeout trying
# to reading from EC2 instance provider.
Expand Down
Loading