Skip to content

Commit

Permalink
fix: HTTP requests when content length is missing
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Mar 19, 2024
1 parent 792a23f commit be09a6e
Showing 1 changed file with 245 additions and 31 deletions.
276 changes: 245 additions & 31 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
use core::fmt;
use std::fmt::Display;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::file_format::FileFormat;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use object_store::http::HttpBuilder;
use futures::stream::BoxStream;
use object_store::http::{HttpBuilder, HttpStore};
use object_store::path::Path as ObjectStorePath;
use object_store::{ClientConfigKey, ObjectMeta, ObjectStore};
use object_store::{
ClientConfigKey,
GetOptions,
GetResult,
GetResultPayload,
ListResult,
MultipartId,
ObjectMeta,
ObjectStore,
PutOptions,
PutResult,
};
use tokio::io::AsyncWrite;
use url::Url;

use super::glob_util::ResolvedPattern;
Expand All @@ -23,6 +38,36 @@ pub struct HttpStoreAccess {
pub url: Url,
}

impl HttpStoreAccess {
async fn content_length(u: Url) -> Result<Option<u64>> {
let res = reqwest::Client::new().head(u.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if u.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. \
Note that globbing is not supported for HTTP.",
status, u,
)));
}
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, u,
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| res.content_length());

Ok(len)
}
}

impl Display for HttpStoreAccess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "HttpStoreAccess(url: {})", self.url)
Expand Down Expand Up @@ -54,8 +99,13 @@ impl ObjStoreAccess for HttpStoreAccess {
let builder = HttpBuilder::new()
.with_url(self.url.to_string())
.with_config(ClientConfigKey::AllowHttp, "true");

let build = builder.build()?;
Ok(Arc::new(build))

Ok(Arc::new(SimpleHttpStore {
url: self.url.clone(),
obj_store: build,
}))
}

fn path(&self, _location: &str) -> Result<ObjectStorePath> {
Expand All @@ -82,38 +132,12 @@ impl ObjStoreAccess for HttpStoreAccess {
_store: &Arc<dyn ObjectStore>,
location: &ObjectStorePath,
) -> Result<ObjectMeta> {
let res = reqwest::Client::new().head(self.url.clone()).send().await?;
let status = res.status();
if !status.is_success() {
if self.url.as_str().contains('*') {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'. Note that globbing is not supported for HTTP.",
status, self.url
)));
}
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"Unexpected status code '{}' for url: '{}'",
status, self.url
)));
}
// reqwest doesn't check the content length header, instead looks at the contents
// See: https://github.com/seanmonstar/reqwest/issues/843
let len = res
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.or_else(|| res.content_length())
// TODO: Download the contents of the file and handle using local
// store (maybe outside of HTTP store).
.ok_or(ObjectStoreSourceError::Static(
"Missing content-length header",
))?;
let content_length = Self::content_length(self.url.clone()).await?;

Ok(ObjectMeta {
location: location.clone(),
last_modified: Utc::now(),
size: len as usize,
size: content_length.unwrap_or_default() as usize,
e_tag: None,
version: None,
})
Expand Down Expand Up @@ -144,3 +168,193 @@ impl ObjStoreAccess for HttpStoreAccess {
.await?)
}
}

#[derive(Debug)]
struct SimpleHttpStore {
url: Url,
// Used when content length available.
obj_store: HttpStore,
}

impl SimpleHttpStore {
async fn simple_get_req(&self, location: ObjectStorePath) -> Result<GetResult> {
let res = reqwest::get(self.url.clone()).await?;
if !res.status().is_success() {
return Err(ObjectStoreSourceError::InvalidHttpStatus(format!(
"getting data for '{}' resulted in error status: {}",
self.url,
res.status(),
)));
}

// TODO: Maybe write the byte stream to file?
// Would only be useful when the returned bytes are too many.
let contents = res.bytes().await?;

let size = contents.len();

let stream = async_stream::stream! {
let res: Result<Bytes, object_store::Error> = Ok(contents);
yield res;
};

let payload = GetResultPayload::Stream(Box::pin(stream));

Ok(GetResult {
payload,
meta: ObjectMeta {
location,
last_modified: Utc::now(),
size,
e_tag: None,
version: None,
},
range: 0..size,
})
}
}

impl fmt::Display for SimpleHttpStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SimpleHttpStore({})", self.url)
}
}

#[async_trait]
impl ObjectStore for SimpleHttpStore {
async fn put(
&self,
location: &ObjectStorePath,
bytes: Bytes,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put(location, bytes).await
}

async fn put_opts(
&self,
location: &ObjectStorePath,
bytes: bytes::Bytes,
opts: PutOptions,
) -> Result<PutResult, object_store::Error> {
self.obj_store.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &ObjectStorePath,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>), object_store::Error> {
self.obj_store.put_multipart(location).await
}

async fn abort_multipart(
&self,
location: &ObjectStorePath,
multipart_id: &MultipartId,
) -> Result<(), object_store::Error> {
self.abort_multipart(location, multipart_id).await
}

// This uses the default impl for `get`, `get_range`, `get_ranges`, `head`
// (from our custom get_opts impl).

async fn get_opts(
&self,
location: &ObjectStorePath,
options: GetOptions,
) -> Result<GetResult, object_store::Error> {
if options.if_match.is_some()
|| options.if_none_match.is_some()
|| options.if_modified_since.is_some()
|| options.if_unmodified_since.is_some()
|| options.range.is_some()
|| options.version.is_some()
|| options.head
{
// Let the default implementation handle everything weird.
self.obj_store.get_opts(location, options).await
} else {
// Try to get the content length.
let content_length = HttpStoreAccess::content_length(self.url.clone())
.await
.ok()
.flatten()
.unwrap_or_default();

if content_length != 0 {
self.obj_store.get_opts(location, options).await
} else {
self.simple_get_req(location.clone()).await.map_err(|e| {
object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
}
})
}
}
}

async fn delete(&self, location: &ObjectStorePath) -> Result<(), object_store::Error> {
self.obj_store.delete(location).await
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<ObjectStorePath, object_store::Error>>,
) -> BoxStream<'a, Result<ObjectStorePath, object_store::Error>> {
self.obj_store.delete_stream(locations)
}

fn list(
&self,
prefix: Option<&ObjectStorePath>,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&ObjectStorePath>,
offset: &ObjectStorePath,
) -> BoxStream<'_, Result<ObjectMeta, object_store::Error>> {
self.obj_store.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(
&self,
prefix: Option<&ObjectStorePath>,
) -> Result<ListResult, object_store::Error> {
self.obj_store.list_with_delimiter(prefix).await
}

async fn copy(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy(from, to).await
}

async fn rename(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename(from, to).await
}

async fn copy_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.copy_if_not_exists(from, to).await
}

async fn rename_if_not_exists(
&self,
from: &ObjectStorePath,
to: &ObjectStorePath,
) -> Result<(), object_store::Error> {
self.obj_store.rename_if_not_exists(from, to).await
}
}

0 comments on commit be09a6e

Please sign in to comment.