Skip to content

Commit

Permalink
fix: Allow HTTP object store for json/bson etc. (#2784)
Browse files Browse the repository at this point in the history
Fixes: #2756

---------

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal authored Mar 19, 2024
1 parent 589b9e3 commit 7ae7c75
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 41 deletions.
5 changes: 2 additions & 3 deletions crates/datasources/src/lake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use protogen::metastore::types::options::StorageOptions;
use crate::common::url::{DatasourceUrl, DatasourceUrlType};
use crate::object_store::azure::AzureStoreAccess;
use crate::object_store::gcs::GcsStoreAccess;
use crate::object_store::http::HttpStoreAccess;
use crate::object_store::local::LocalStoreAccess;
use crate::object_store::s3::S3StoreAccess;
use crate::object_store::ObjStoreAccess;
Expand Down Expand Up @@ -101,9 +102,7 @@ pub fn storage_options_into_store_access(
}))
}
DatasourceUrlType::File => Ok(Arc::new(LocalStoreAccess)),
DatasourceUrlType::Http => {
Err(LakeStorageOptionsError::UnsupportedObjectStore(url.clone()))
}
DatasourceUrlType::Http => Ok(Arc::new(HttpStoreAccess { url: url.as_url()? })),
}
}

Expand Down
327 changes: 294 additions & 33 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
use core::fmt;
use std::fmt::Display;
use std::io::SeekFrom;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::file_format::FileFormat;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::http::HttpBuilder;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::http::{HttpBuilder, HttpStore};
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{
ClientConfigKey,
GetOptions,
GetResult,
GetResultPayload,
ListResult,
MultipartId,
ObjectMeta,
ObjectStore,
PutOptions,
PutResult,
};
use tempfile::tempdir;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use url::Url;
use uuid::Uuid;

use super::glob_util::ResolvedPattern;
use super::ObjStoreAccess;
Expand All @@ -23,6 +42,36 @@ pub struct HttpStoreAccess {
pub url: Url,
}

impl HttpStoreAccess {
async fn content_length(u: Url) -> Result<Option<u64>> {
let res = reqwest::Client::new().head(u.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if u.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. \
Note that globbing is not supported for HTTP.",
status, u,
)));
}
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, u,
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| res.content_length());

Ok(len)
}
}

impl Display for HttpStoreAccess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HttpStoreAccess(url: {})", self.url)
Expand All @@ -43,15 +92,24 @@ impl ObjStoreAccess for HttpStoreAccess {
// To make path part of URL we make it a '/'.
.replace('/', "__slash__")
// TODO: Add more characters which might be invalid for domain.
.replace('%', "__percent__");
.replace('%', "__percent__")
.replace('?', "__question__")
.replace('=', "__equal__");

Ok(ObjectStoreUrl::parse(u)?)
}

fn create_store(&self) -> Result<Arc<dyn ObjectStore>> {
let builder = HttpBuilder::new().with_url(self.url.to_string());
let builder = HttpBuilder::new()
.with_url(self.url.to_string())
.with_config(ClientConfigKey::AllowHttp, "true");

let build = builder.build()?;
Ok(Arc::new(build))

Ok(Arc::new(SimpleHttpStore {
url: self.url.clone(),
obj_store: build,
}))
}

fn path(&self, _location: &str) -> Result<ObjectStorePath> {
Expand All @@ -78,38 +136,12 @@ impl ObjStoreAccess for HttpStoreAccess {
_store: &Arc<dyn ObjectStore>,
location: &ObjectStorePath,
) -> Result<ObjectMeta> {
let res = reqwest::Client::new().head(self.url.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if self.url.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. Note that globbing is not supported for HTTP.",
status, self.url
)));
}
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, self.url
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len: u64 = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok())
.unwrap_or_else(|| res.content_length().unwrap_or(0));
if len == 0 {
return Err(ObjectStoreSourceError::Static(
"Missing content-length header",
));
}
let content_length = Self::content_length(self.url.clone()).await?;

Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: len as usize,
size: content_length.unwrap_or_default() as usize,
e_tag: None,
version: None,
})
Expand Down Expand Up @@ -140,3 +172,232 @@ impl ObjStoreAccess for HttpStoreAccess {
.await?)
}
}

#[derive(Debug)]
struct SimpleHttpStore {
url: Url,
// Used when content length available.
obj_store: HttpStore,
}

impl SimpleHttpStore {
async fn simple_get_req(&self, location: ObjectStorePath) -> Result<GetResult> {
let res = reqwest::get(self.url.clone()).await?;
if !res.status().is_success() {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"getting data for '{}' resulted in error status: {}",
self.url,
res.status(),
)));
}

let download_file_dir = tempdir()?;

let download_file_path = download_file_dir.path().join(format!("{}", Uuid::new_v4()));

let mut contents = res.bytes_stream();

let mut download_file = tokio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.read(true)
.open(&download_file_path)
.await?;

// We unfortunately need to download the contents first to get the
// content length since it is unknown.
while let Some(b) = contents.next().await {
let b = b?;
download_file.write_all(&b).await?;
}

let file_meta = download_file.metadata().await?;
let size = file_meta.len() as usize;

// Seek the downloaded file to start (before reading).
download_file.seek(SeekFrom::Start(0)).await?;

let stream = async_stream::stream! {
// Delete when the stream is dropped.
let _download_file_dir = download_file_dir;

// Create a chunked stream.
loop {
let mut buf = BytesMut::with_capacity(8 * 1024);

let bytes_read = download_file
.read_buf(&mut buf)
.await
.map_err(|e| object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
})?;

if bytes_read == 0 {
break;
}

yield Ok(buf.freeze());
}
};

let payload = GetResultPayload::Stream(Box::pin(stream));

Ok(GetResult {
payload,
meta: ObjectMeta {
location,
last_modified: Utc::now(),
size,
e_tag: None,
version: None,
},
range: 0..size,
})
}
}

impl fmt::Display for SimpleHttpStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SimpleHttpStore({})", self.url)
}
}

#[async_trait]
impl ObjectStore for SimpleHttpStore {
async fn put(
&self,
location: &ObjectStorePath,
bytes: Bytes,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put(location, bytes).await
}

async fn put_opts(
&self,
location: &ObjectStorePath,
bytes: bytes::Bytes,
opts: PutOptions,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &ObjectStorePath,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>), object_store::Error> {
self.obj_store.put_multipart(location).await
}

async fn abort_multipart(
&self,
location: &ObjectStorePath,
multipart_id: &MultipartId,
) -> Result<(), object_store::Error> {
self.abort_multipart(location, multipart_id).await
}

// This uses the default impl for `get`, `get_range`, `get_ranges`, `head`
// (from our custom get_opts impl).

async fn get_opts(
&self,
location: &ObjectStorePath,
options: GetOptions,
) -> Result<GetResult, object_store::Error> {
if options.if_match.is_some()
|| options.if_none_match.is_some()
|| options.if_modified_since.is_some()
|| options.if_unmodified_since.is_some()
|| options.range.is_some()
|| options.version.is_some()
|| options.head
{
// Let the default implementation handle everything weird.
self.obj_store.get_opts(location, options).await
} else {
// Try to get the content length.
let content_length = HttpStoreAccess::content_length(self.url.clone())
.await
.ok()
.flatten()
.unwrap_or_default();

if content_length != 0 {
self.obj_store.get_opts(location, options).await
} else {
self.simple_get_req(location.clone()).await.map_err(|e| {
object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
}
})
}
}
}

async fn delete(&self, location: &ObjectStorePath) -> Result<(), object_store::Error> {
self.obj_store.delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<ObjectStorePath, object_store::Error>>,
) -> BoxStream<'a, Result<ObjectStorePath, object_store::Error>> {
self.obj_store.delete_stream(locations)
}

fn list(
&self,
prefix: Option<&ObjectStorePath>,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&ObjectStorePath>,
offset: &ObjectStorePath,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(
&self,
prefix: Option<&ObjectStorePath>,
) -> Result<ListResult, object_store::Error> {
self.obj_store.list_with_delimiter(prefix).await
}

async fn copy(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy(from, to).await
}

async fn rename(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename(from, to).await
}

async fn copy_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename_if_not_exists(from, to).await
}
}
Loading

0 comments on commit 7ae7c75

Please sign in to comment.