diff --git a/crates/datasources/src/lake/mod.rs b/crates/datasources/src/lake/mod.rs index 3142e2ca5..9e6c914ce 100644 --- a/crates/datasources/src/lake/mod.rs +++ b/crates/datasources/src/lake/mod.rs @@ -16,6 +16,7 @@ use protogen::metastore::types::options::StorageOptions; use crate::common::url::{DatasourceUrl, DatasourceUrlType}; use crate::object_store::azure::AzureStoreAccess; use crate::object_store::gcs::GcsStoreAccess; +use crate::object_store::http::HttpStoreAccess; use crate::object_store::local::LocalStoreAccess; use crate::object_store::s3::S3StoreAccess; use crate::object_store::ObjStoreAccess; @@ -101,9 +102,7 @@ pub fn storage_options_into_store_access( })) } DatasourceUrlType::File => Ok(Arc::new(LocalStoreAccess)), - DatasourceUrlType::Http => { - Err(LakeStorageOptionsError::UnsupportedObjectStore(url.clone())) - } + DatasourceUrlType::Http => Ok(Arc::new(HttpStoreAccess { url: url.as_url()? })), } } diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index 61125077d..03205d5e7 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -1,16 +1,35 @@ +use core::fmt; use std::fmt::Display; +use std::io::SeekFrom; use std::sync::Arc; use async_trait::async_trait; +use bytes::{Bytes, BytesMut}; use chrono::Utc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::FileFormat; use datafusion::execution::context::SessionState; use datafusion::execution::object_store::ObjectStoreUrl; -use object_store::http::HttpBuilder; +use futures::stream::BoxStream; +use futures::StreamExt; +use object_store::http::{HttpBuilder, HttpStore}; use object_store::path::Path as ObjectStorePath; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ + ClientConfigKey, + GetOptions, + GetResult, + GetResultPayload, + ListResult, + MultipartId, + ObjectMeta, + ObjectStore, + PutOptions, + PutResult, +}; +use tempfile::tempdir; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use url::Url; +use uuid::Uuid; use super::glob_util::ResolvedPattern; use super::ObjStoreAccess; @@ -23,6 +42,36 @@ pub struct HttpStoreAccess { pub url: Url, } +impl HttpStoreAccess { + async fn content_length(u: Url) -> Result> { + let res = reqwest::Client::new().head(u.clone()).send().await?; + let status = res.status(); + if !status.is_success() { + if u.as_str().contains('*') { + return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( + "Unexpected status code '{}' for url: '{}'. \ + Note that globbing is not supported for HTTP.", + status, u, + ))); + } + return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( + "Unexpected status code '{}' for url: '{}'", + status, u, + ))); + } + // reqwest doesn't check the content length header, instead looks at the contents + // See: https://github.com/seanmonstar/reqwest/issues/843 + let len = res + .headers() + .get("Content-Length") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .or_else(|| res.content_length()); + + Ok(len) + } +} + impl Display for HttpStoreAccess { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "HttpStoreAccess(url: {})", self.url) @@ -43,15 +92,24 @@ impl ObjStoreAccess for HttpStoreAccess { // To make path part of URL we make it a '/'. .replace('/', "__slash__") // TODO: Add more characters which might be invalid for domain. - .replace('%', "__percent__"); + .replace('%', "__percent__") + .replace('?', "__question__") + .replace('=', "__equal__"); Ok(ObjectStoreUrl::parse(u)?) } fn create_store(&self) -> Result> { - let builder = HttpBuilder::new().with_url(self.url.to_string()); + let builder = HttpBuilder::new() + .with_url(self.url.to_string()) + .with_config(ClientConfigKey::AllowHttp, "true"); + let build = builder.build()?; - Ok(Arc::new(build)) + + Ok(Arc::new(SimpleHttpStore { + url: self.url.clone(), + obj_store: build, + })) } fn path(&self, _location: &str) -> Result { @@ -78,38 +136,12 @@ impl ObjStoreAccess for HttpStoreAccess { _store: &Arc, location: &ObjectStorePath, ) -> Result { - let res = reqwest::Client::new().head(self.url.clone()).send().await?; - let status = res.status(); - if !status.is_success() { - if self.url.as_str().contains('*') { - return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( - "Unexpected status code '{}' for url: '{}'. Note that globbing is not supported for HTTP.", - status, self.url - ))); - } - return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( - "Unexpected status code '{}' for url: '{}'", - status, self.url - ))); - } - // reqwest doesn't check the content length header, instead looks at the contents - // See: https://github.com/seanmonstar/reqwest/issues/843 - let len: u64 = res - .headers() - .get("Content-Length") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse().ok()) - .unwrap_or_else(|| res.content_length().unwrap_or(0)); - if len == 0 { - return Err(ObjectStoreSourceError::Static( - "Missing content-length header", - )); - } + let content_length = Self::content_length(self.url.clone()).await?; Ok(ObjectMeta { location: location.clone(), last_modified: Utc::now(), - size: len as usize, + size: content_length.unwrap_or_default() as usize, e_tag: None, version: None, }) @@ -140,3 +172,232 @@ impl ObjStoreAccess for HttpStoreAccess { .await?) } } + +#[derive(Debug)] +struct SimpleHttpStore { + url: Url, + // Used when content length available. + obj_store: HttpStore, +} + +impl SimpleHttpStore { + async fn simple_get_req(&self, location: ObjectStorePath) -> Result { + let res = reqwest::get(self.url.clone()).await?; + if !res.status().is_success() { + return Err(ObjectStoreSourceError::InvalidHttpStatus(format!( + "getting data for '{}' resulted in error status: {}", + self.url, + res.status(), + ))); + } + + let download_file_dir = tempdir()?; + + let download_file_path = download_file_dir.path().join(format!("{}", Uuid::new_v4())); + + let mut contents = res.bytes_stream(); + + let mut download_file = tokio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .read(true) + .open(&download_file_path) + .await?; + + // We unfortunately need to download the contents first to get the + // content length since it is unknown. + while let Some(b) = contents.next().await { + let b = b?; + download_file.write_all(&b).await?; + } + + let file_meta = download_file.metadata().await?; + let size = file_meta.len() as usize; + + // Seek the downloaded file to start (before reading). + download_file.seek(SeekFrom::Start(0)).await?; + + let stream = async_stream::stream! { + // Delete when the stream is dropped. + let _download_file_dir = download_file_dir; + + // Create a chunked stream. + loop { + let mut buf = BytesMut::with_capacity(8 * 1024); + + let bytes_read = download_file + .read_buf(&mut buf) + .await + .map_err(|e| object_store::Error::Generic { + store: "HTTP", + source: Box::new(e), + })?; + + if bytes_read == 0 { + break; + } + + yield Ok(buf.freeze()); + } + }; + + let payload = GetResultPayload::Stream(Box::pin(stream)); + + Ok(GetResult { + payload, + meta: ObjectMeta { + location, + last_modified: Utc::now(), + size, + e_tag: None, + version: None, + }, + range: 0..size, + }) + } +} + +impl fmt::Display for SimpleHttpStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SimpleHttpStore({})", self.url) + } +} + +#[async_trait] +impl ObjectStore for SimpleHttpStore { + async fn put( + &self, + location: &ObjectStorePath, + bytes: Bytes, + ) -> Result { + self.obj_store.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &ObjectStorePath, + bytes: bytes::Bytes, + opts: PutOptions, + ) -> Result { + self.obj_store.put_opts(location, bytes, opts).await + } + + async fn put_multipart( + &self, + location: &ObjectStorePath, + ) -> Result<(MultipartId, Box), object_store::Error> { + self.obj_store.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &ObjectStorePath, + multipart_id: &MultipartId, + ) -> Result<(), object_store::Error> { + self.abort_multipart(location, multipart_id).await + } + + // This uses the default impl for `get`, `get_range`, `get_ranges`, `head` + // (from our custom get_opts impl). + + async fn get_opts( + &self, + location: &ObjectStorePath, + options: GetOptions, + ) -> Result { + if options.if_match.is_some() + || options.if_none_match.is_some() + || options.if_modified_since.is_some() + || options.if_unmodified_since.is_some() + || options.range.is_some() + || options.version.is_some() + || options.head + { + // Let the default implementation handle everything weird. + self.obj_store.get_opts(location, options).await + } else { + // Try to get the content length. + let content_length = HttpStoreAccess::content_length(self.url.clone()) + .await + .ok() + .flatten() + .unwrap_or_default(); + + if content_length != 0 { + self.obj_store.get_opts(location, options).await + } else { + self.simple_get_req(location.clone()).await.map_err(|e| { + object_store::Error::Generic { + store: "HTTP", + source: Box::new(e), + } + }) + } + } + } + + async fn delete(&self, location: &ObjectStorePath) -> Result<(), object_store::Error> { + self.obj_store.delete(location).await + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, Result>, + ) -> BoxStream<'a, Result> { + self.obj_store.delete_stream(locations) + } + + fn list( + &self, + prefix: Option<&ObjectStorePath>, + ) -> BoxStream<'_, Result> { + self.obj_store.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&ObjectStorePath>, + offset: &ObjectStorePath, + ) -> BoxStream<'_, Result> { + self.obj_store.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&ObjectStorePath>, + ) -> Result { + self.obj_store.list_with_delimiter(prefix).await + } + + async fn copy( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> Result<(), object_store::Error> { + self.obj_store.copy(from, to).await + } + + async fn rename( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> Result<(), object_store::Error> { + self.obj_store.rename(from, to).await + } + + async fn copy_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> Result<(), object_store::Error> { + self.obj_store.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists( + &self, + from: &ObjectStorePath, + to: &ObjectStorePath, + ) -> Result<(), object_store::Error> { + self.obj_store.rename_if_not_exists(from, to).await + } +} diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs index 81ca5b5bd..ce6018e7f 100644 --- a/crates/sqlbuiltins/src/functions/table/mod.rs +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -184,6 +184,7 @@ pub fn table_location_and_opts( let mut storage_options = StorageOptions::default(); match (source_url.datasource_url_type(), maybe_cred_opts) { (DatasourceUrlType::File, None) => {} // no options fine in this case + (DatasourceUrlType::Http, None) => {} // no options fine in this case (DatasourceUrlType::File, _) => { return Err(ExtensionError::String( "Credentials incorrectly provided when accessing local delta table".to_string(), @@ -224,11 +225,6 @@ pub fn table_location_and_opts( creds.access_key, ); } - (DatasourceUrlType::Http, _) => { - return Err(ExtensionError::String( - "Accessing delta tables over http not supported".to_string(), - )) - } (datasource, creds) => { return Err(ExtensionError::String(format!( "Invalid credentials for {datasource}, got {} creds", diff --git a/testdata/sqllogictests/http.slt b/testdata/sqllogictests/http.slt index 58b7bd133..392960521 100644 --- a/testdata/sqllogictests/http.slt +++ b/testdata/sqllogictests/http.slt @@ -4,3 +4,8 @@ select * from 'http://host.com/path/*.parquet' statement error Error during planning: missing file extension: http://host.com/path/* select * from 'http://host.com/path/*' +# Querying a source without "Content-Length" information. +query I +SELECT count(*) FROM read_json('https://opdb.org/api/search/typeahead?q=*'); +---- +100