diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index 1d810bf16..03205d5e7 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -1,9 +1,10 @@ use core::fmt; use std::fmt::Display; +use std::io::SeekFrom; use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use chrono::Utc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::FileFormat; @@ -25,7 +26,8 @@ use object_store::{ PutOptions, PutResult, }; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tempfile::tempdir; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; use url::Url; use uuid::Uuid; @@ -189,7 +191,9 @@ impl SimpleHttpStore { ))); } - let download_file_path = std::env::temp_dir().join(format!("{}", Uuid::new_v4())); + let download_file_dir = tempdir()?; + + let download_file_path = download_file_dir.path().join(format!("{}", Uuid::new_v4())); let mut contents = res.bytes_stream(); @@ -200,16 +204,44 @@ impl SimpleHttpStore { .open(&download_file_path) .await?; + // We unfortunately need to download the contents first to get the + // content length since it is unknown. while let Some(b) = contents.next().await { let b = b?; download_file.write_all(&b).await?; } let file_meta = download_file.metadata().await?; - let size = file_meta.len() as usize; - let payload = GetResultPayload::File(download_file.into_std().await, download_file_path); + // Seek the downloaded file to start (before reading). + download_file.seek(SeekFrom::Start(0)).await?; + + let stream = async_stream::stream! { + // Delete when the stream is dropped. + let _download_file_dir = download_file_dir; + + // Create a chunked stream. + loop { + let mut buf = BytesMut::with_capacity(8 * 1024); + + let bytes_read = download_file + .read_buf(&mut buf) + .await + .map_err(|e| object_store::Error::Generic { + store: "HTTP", + source: Box::new(e), + })?; + + if bytes_read == 0 { + break; + } + + yield Ok(buf.freeze()); + } + }; + + let payload = GetResultPayload::Stream(Box::pin(stream)); Ok(GetResult { payload,