From f1f8f301cbf2028ad35db1bc371c597c844adfdf Mon Sep 17 00:00:00 2001 From: Daniel Mesejo Date: Fri, 27 Sep 2024 15:19:05 +0200 Subject: [PATCH] feat: expose http object store --- Cargo.toml | 2 +- python/datafusion/object_store.py | 8 ++--- python/datafusion/tests/test_sql.py | 11 ++++++- .../datafusion/tests/test_wrapper_coverage.py | 2 +- src/context.rs | 1 + src/store.rs | 30 +++++++++++++++++++ 6 files changed, 45 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11dccc4f..2fc14232 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ uuid = { version = "1.9", features = ["v4"] } mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] } async-trait = "0.1" futures = "0.3" -object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] } +object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] } parking_lot = "0.12" regex-syntax = "0.8" syn = "2.0.68" diff --git a/python/datafusion/object_store.py b/python/datafusion/object_store.py index c927e761..3f51ad9d 100644 --- a/python/datafusion/object_store.py +++ b/python/datafusion/object_store.py @@ -22,13 +22,9 @@ GoogleCloud = object_store.GoogleCloud LocalFileSystem = object_store.LocalFileSystem MicrosoftAzure = object_store.MicrosoftAzure +Http = object_store.Http -__all__ = [ - "AmazonS3", - "GoogleCloud", - "LocalFileSystem", - "MicrosoftAzure", -] +__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", "Http"] def __getattr__(name): diff --git a/python/datafusion/tests/test_sql.py b/python/datafusion/tests/test_sql.py index cbb2e9f5..26d2388b 100644 --- a/python/datafusion/tests/test_sql.py +++ b/python/datafusion/tests/test_sql.py @@ -22,7 +22,7 @@ from pyarrow.csv import write_csv import pyarrow.dataset as ds import pytest -from datafusion.object_store import LocalFileSystem +from datafusion.object_store import LocalFileSystem, Http from datafusion import udf, col @@ -104,6 +104,15 @@ def test_register_csv(ctx, tmp_path): ctx.register_csv("csv4", path, file_compression_type="rar") +def test_register_http_csv(ctx): + url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv" + ctx.register_object_store("", Http(url), None) + ctx.register_csv("remote", url) + assert ctx.table_exist("remote") + res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist() + assert res["total"] > 0 + + def test_register_parquet(ctx, tmp_path): path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data()) ctx.register_parquet("t", path) diff --git a/python/datafusion/tests/test_wrapper_coverage.py b/python/datafusion/tests/test_wrapper_coverage.py index c53a89c5..86f2d57f 100644 --- a/python/datafusion/tests/test_wrapper_coverage.py +++ b/python/datafusion/tests/test_wrapper_coverage.py @@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None: def test_datafusion_missing_exports() -> None: - """Check for any missing pythone exports. + """Check for any missing python exports. This test verifies that every exposed class, attribute, and function in the internal (pyo3) module is also exposed in our python wrappers. diff --git a/src/context.rs b/src/context.rs index 79db2e65..6d4508fe 100644 --- a/src/context.rs +++ b/src/context.rs @@ -307,6 +307,7 @@ impl PySessionContext { StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name), StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name), StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()), + StorageContexts::HTTP(http) => (http.store, http.url), }; // let users override the host to match the api signature from upstream diff --git a/src/store.rs b/src/store.rs index 846d96a6..c710db55 100644 --- a/src/store.rs +++ b/src/store.rs @@ -22,7 +22,9 @@ use pyo3::prelude::*; use object_store::aws::{AmazonS3, AmazonS3Builder}; use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; +use object_store::http::{HttpBuilder, HttpStore}; use object_store::local::LocalFileSystem; +use url::Url; #[derive(FromPyObject)] pub enum StorageContexts { @@ -30,6 +32,7 @@ pub enum StorageContexts { GoogleCloudStorage(PyGoogleCloudContext), MicrosoftAzure(PyMicrosoftAzureContext), LocalFileSystem(PyLocalFileSystemContext), + HTTP(PyHttpContext), } #[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)] @@ -219,10 +222,37 @@ impl PyAmazonS3Context { } } +#[pyclass(name = "Http", module = "datafusion.store", subclass)] +#[derive(Debug, Clone)] +pub struct PyHttpContext { + pub url: String, + pub store: Arc, +} + +#[pymethods] +impl PyHttpContext { + #[new] + fn new(url: String) -> Self { + let store = match Url::parse(url.as_str()) { + Ok(url) => HttpBuilder::new() + .with_url(url.origin().ascii_serialization()) + .build() + .unwrap(), + Err(_) => HttpBuilder::new().build().unwrap(), + }; + + Self { + url, + store: Arc::new(store), + } + } +} + pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) }