Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow HTTP object store for json/bson etc. #2784

Merged
merged 5 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/datasources/src/lake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use protogen::metastore::types::options::StorageOptions;
use crate::common::url::{DatasourceUrl, DatasourceUrlType};
use crate::object_store::azure::AzureStoreAccess;
use crate::object_store::gcs::GcsStoreAccess;
use crate::object_store::http::HttpStoreAccess;
use crate::object_store::local::LocalStoreAccess;
use crate::object_store::s3::S3StoreAccess;
use crate::object_store::ObjStoreAccess;
Expand Down Expand Up @@ -101,9 +102,7 @@ pub fn storage_options_into_store_access(
}))
}
DatasourceUrlType::File => Ok(Arc::new(LocalStoreAccess)),
DatasourceUrlType::Http => {
Err(LakeStorageOptionsError::UnsupportedObjectStore(url.clone()))
}
DatasourceUrlType::Http => Ok(Arc::new(HttpStoreAccess { url: url.as_url()? })),
}
}

Expand Down
327 changes: 294 additions & 33 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
use core::fmt;
use std::fmt::Display;
use std::io::SeekFrom;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::file_format::FileFormat;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::http::HttpBuilder;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::http::{HttpBuilder, HttpStore};
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{
ClientConfigKey,
GetOptions,
GetResult,
GetResultPayload,
ListResult,
MultipartId,
ObjectMeta,
ObjectStore,
PutOptions,
PutResult,
};
use tempfile::tempdir;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use url::Url;
use uuid::Uuid;

use super::glob_util::ResolvedPattern;
use super::ObjStoreAccess;
Expand All @@ -23,6 +42,36 @@ pub struct HttpStoreAccess {
pub url: Url,
}

impl HttpStoreAccess {
async fn content_length(u: Url) -> Result<Option<u64>> {
let res = reqwest::Client::new().head(u.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if u.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. \
Note that globbing is not supported for HTTP.",
status, u,
)));
}
Comment on lines +50 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have imagined that we'd do this somewhere other than the check for the content length?

return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, u,
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| res.content_length());

Ok(len)
}
}

impl Display for HttpStoreAccess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HttpStoreAccess(url: {})", self.url)
Expand All @@ -43,15 +92,24 @@ impl ObjStoreAccess for HttpStoreAccess {
// To make path part of URL we make it a '/'.
.replace('/', "__slash__")
// TODO: Add more characters which might be invalid for domain.
.replace('%', "__percent__");
.replace('%', "__percent__")
.replace('?', "__question__")
.replace('=', "__equal__");

Ok(ObjectStoreUrl::parse(u)?)
}

fn create_store(&self) -> Result<Arc<dyn ObjectStore>> {
let builder = HttpBuilder::new().with_url(self.url.to_string());
let builder = HttpBuilder::new()
.with_url(self.url.to_string())
.with_config(ClientConfigKey::AllowHttp, "true");

let build = builder.build()?;
Ok(Arc::new(build))

Ok(Arc::new(SimpleHttpStore {
url: self.url.clone(),
obj_store: build,
}))
}

fn path(&self, _location: &str) -> Result<ObjectStorePath> {
Expand All @@ -78,38 +136,12 @@ impl ObjStoreAccess for HttpStoreAccess {
_store: &Arc<dyn ObjectStore>,
location: &ObjectStorePath,
) -> Result<ObjectMeta> {
let res = reqwest::Client::new().head(self.url.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if self.url.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. Note that globbing is not supported for HTTP.",
status, self.url
)));
}
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, self.url
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len: u64 = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok())
.unwrap_or_else(|| res.content_length().unwrap_or(0));
if len == 0 {
return Err(ObjectStoreSourceError::Static(
"Missing content-length header",
));
}
let content_length = Self::content_length(self.url.clone()).await?;

Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: len as usize,
size: content_length.unwrap_or_default() as usize,
e_tag: None,
version: None,
})
Expand Down Expand Up @@ -140,3 +172,232 @@ impl ObjStoreAccess for HttpStoreAccess {
.await?)
}
}

#[derive(Debug)]
struct SimpleHttpStore {
url: Url,
// Used when content length available.
obj_store: HttpStore,
}

impl SimpleHttpStore {
async fn simple_get_req(&self, location: ObjectStorePath) -> Result<GetResult> {
let res = reqwest::get(self.url.clone()).await?;
if !res.status().is_success() {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"getting data for '{}' resulted in error status: {}",
self.url,
res.status(),
)));
}

let download_file_dir = tempdir()?;

let download_file_path = download_file_dir.path().join(format!("{}", Uuid::new_v4()));

let mut contents = res.bytes_stream();

let mut download_file = tokio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.read(true)
.open(&download_file_path)
.await?;

// We unfortunately need to download the contents first to get the
// content length since it is unknown.
while let Some(b) = contents.next().await {
let b = b?;
download_file.write_all(&b).await?;
}

let file_meta = download_file.metadata().await?;
let size = file_meta.len() as usize;

// Seek the downloaded file to start (before reading).
download_file.seek(SeekFrom::Start(0)).await?;

let stream = async_stream::stream! {
// Delete when the stream is dropped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need to call this with move? I've been tending to use the futures::stream::once() rather than the macro to make this more clear, but if it's in an Arc then I think it ends up working ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The macro expands to async move. Moreover, the compiler would just error if it didn't here.

let _download_file_dir = download_file_dir;

// Create a chunked stream.
loop {
let mut buf = BytesMut::with_capacity(8 * 1024);

let bytes_read = download_file
.read_buf(&mut buf)
.await
.map_err(|e| object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
})?;

if bytes_read == 0 {
break;
}

yield Ok(buf.freeze());
}
};

let payload = GetResultPayload::Stream(Box::pin(stream));

Ok(GetResult {
payload,
meta: ObjectMeta {
location,
last_modified: Utc::now(),
size,
e_tag: None,
version: None,
},
range: 0..size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this an array for every byte?

})
}
}

impl fmt::Display for SimpleHttpStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SimpleHttpStore({})", self.url)
}
}

#[async_trait]
impl ObjectStore for SimpleHttpStore {
async fn put(
&self,
location: &ObjectStorePath,
bytes: Bytes,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put(location, bytes).await
}

async fn put_opts(
&self,
location: &ObjectStorePath,
bytes: bytes::Bytes,
opts: PutOptions,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &ObjectStorePath,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>), object_store::Error> {
self.obj_store.put_multipart(location).await
}

async fn abort_multipart(
&self,
location: &ObjectStorePath,
multipart_id: &MultipartId,
) -> Result<(), object_store::Error> {
self.abort_multipart(location, multipart_id).await
}

// This uses the default impl for `get`, `get_range`, `get_ranges`, `head`
// (from our custom get_opts impl).

async fn get_opts(
&self,
location: &ObjectStorePath,
options: GetOptions,
) -> Result<GetResult, object_store::Error> {
if options.if_match.is_some()
|| options.if_none_match.is_some()
|| options.if_modified_since.is_some()
|| options.if_unmodified_since.is_some()
|| options.range.is_some()
|| options.version.is_some()
|| options.head
{
// Let the default implementation handle everything weird.
self.obj_store.get_opts(location, options).await
} else {
// Try to get the content length.
let content_length = HttpStoreAccess::content_length(self.url.clone())
.await
.ok()
.flatten()
.unwrap_or_default();

if content_length != 0 {
self.obj_store.get_opts(location, options).await
} else {
self.simple_get_req(location.clone()).await.map_err(|e| {
object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
}
})
}
}
}

async fn delete(&self, location: &ObjectStorePath) -> Result<(), object_store::Error> {
self.obj_store.delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<ObjectStorePath, object_store::Error>>,
) -> BoxStream<'a, Result<ObjectStorePath, object_store::Error>> {
self.obj_store.delete_stream(locations)
}

fn list(
&self,
prefix: Option<&ObjectStorePath>,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&ObjectStorePath>,
offset: &ObjectStorePath,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(
&self,
prefix: Option<&ObjectStorePath>,
) -> Result<ListResult, object_store::Error> {
self.obj_store.list_with_delimiter(prefix).await
}

async fn copy(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy(from, to).await
}

async fn rename(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename(from, to).await
}

async fn copy_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename_if_not_exists(from, to).await
}
}
Loading