Skip to content

Commit

Permalink
feat: expose http object store
Browse files Browse the repository at this point in the history
  • Loading branch information
mesejo committed Sep 27, 2024
1 parent 8b36aac commit f1f8f30
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ uuid = { version = "1.9", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
parking_lot = "0.12"
regex-syntax = "0.8"
syn = "2.0.68"
Expand Down
8 changes: 2 additions & 6 deletions python/datafusion/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,9 @@
GoogleCloud = object_store.GoogleCloud
LocalFileSystem = object_store.LocalFileSystem
MicrosoftAzure = object_store.MicrosoftAzure
Http = object_store.Http

__all__ = [
"AmazonS3",
"GoogleCloud",
"LocalFileSystem",
"MicrosoftAzure",
]
__all__ = ["AmazonS3", "GoogleCloud", "LocalFileSystem", "MicrosoftAzure", "Http"]


def __getattr__(name):
Expand Down
11 changes: 10 additions & 1 deletion python/datafusion/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pyarrow.csv import write_csv
import pyarrow.dataset as ds
import pytest
from datafusion.object_store import LocalFileSystem
from datafusion.object_store import LocalFileSystem, Http

from datafusion import udf, col

Expand Down Expand Up @@ -104,6 +104,15 @@ def test_register_csv(ctx, tmp_path):
ctx.register_csv("csv4", path, file_compression_type="rar")


def test_register_http_csv(ctx):
url = "https://raw.githubusercontent.com/ibis-project/testing-data/refs/heads/master/csv/diamonds.csv"
ctx.register_object_store("", Http(url), None)
ctx.register_csv("remote", url)
assert ctx.table_exist("remote")
res, *_ = ctx.sql("SELECT COUNT(*) AS total FROM remote").to_pylist()
assert res["total"] > 0


def test_register_parquet(ctx, tmp_path):
path = helpers.write_parquet(tmp_path / "a.parquet", helpers.data())
ctx.register_parquet("t", path)
Expand Down
2 changes: 1 addition & 1 deletion python/datafusion/tests/test_wrapper_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def missing_exports(internal_obj, wrapped_obj) -> None:


def test_datafusion_missing_exports() -> None:
"""Check for any missing pythone exports.
"""Check for any missing python exports.
This test verifies that every exposed class, attribute, and function in
the internal (pyo3) module is also exposed in our python wrappers.
Expand Down
1 change: 1 addition & 0 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl PySessionContext {
StorageContexts::GoogleCloudStorage(gcs) => (gcs.inner, gcs.bucket_name),
StorageContexts::MicrosoftAzure(azure) => (azure.inner, azure.container_name),
StorageContexts::LocalFileSystem(local) => (local.inner, "".to_string()),
StorageContexts::HTTP(http) => (http.store, http.url),
};

// let users override the host to match the api signature from upstream
Expand Down
30 changes: 30 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ use pyo3::prelude::*;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::http::{HttpBuilder, HttpStore};
use object_store::local::LocalFileSystem;
use url::Url;

#[derive(FromPyObject)]
pub enum StorageContexts {
AmazonS3(PyAmazonS3Context),
GoogleCloudStorage(PyGoogleCloudContext),
MicrosoftAzure(PyMicrosoftAzureContext),
LocalFileSystem(PyLocalFileSystemContext),
HTTP(PyHttpContext),
}

#[pyclass(name = "LocalFileSystem", module = "datafusion.store", subclass)]
Expand Down Expand Up @@ -219,10 +222,37 @@ impl PyAmazonS3Context {
}
}

#[pyclass(name = "Http", module = "datafusion.store", subclass)]
#[derive(Debug, Clone)]
pub struct PyHttpContext {
pub url: String,
pub store: Arc<HttpStore>,
}

#[pymethods]
impl PyHttpContext {
#[new]
fn new(url: String) -> Self {
let store = match Url::parse(url.as_str()) {
Ok(url) => HttpBuilder::new()
.with_url(url.origin().ascii_serialization())
.build()
.unwrap(),
Err(_) => HttpBuilder::new().build().unwrap(),
};

Self {
url,
store: Arc::new(store),
}
}
}

pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyAmazonS3Context>()?;
m.add_class::<PyMicrosoftAzureContext>()?;
m.add_class::<PyGoogleCloudContext>()?;
m.add_class::<PyLocalFileSystemContext>()?;
m.add_class::<PyHttpContext>()?;
Ok(())
}

0 comments on commit f1f8f30

Please sign in to comment.