Skip to content

Commit

Permalink
tempdir
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Mar 19, 2024
1 parent 2016eb1 commit b5edcca
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use core::fmt;
use std::fmt::Display;
use std::io::SeekFrom;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::file_format::FileFormat;
Expand All @@ -25,7 +26,8 @@ use object_store::{
PutOptions,
PutResult,
};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tempfile::tempdir;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use url::Url;
use uuid::Uuid;

Expand Down Expand Up @@ -189,7 +191,9 @@ impl SimpleHttpStore {
)));
}

let download_file_path = std::env::temp_dir().join(format!("{}", Uuid::new_v4()));
let download_file_dir = tempdir()?;

let download_file_path = download_file_dir.path().join(format!("{}", Uuid::new_v4()));

let mut contents = res.bytes_stream();

Expand All @@ -200,16 +204,44 @@ impl SimpleHttpStore {
.open(&download_file_path)
.await?;

// We unfortunately need to download the contents first to get the
// content length since it is unknown.
while let Some(b) = contents.next().await {
let b = b?;
download_file.write_all(&b).await?;
}

let file_meta = download_file.metadata().await?;

let size = file_meta.len() as usize;

let payload = GetResultPayload::File(download_file.into_std().await, download_file_path);
// Seek the downloaded file to start (before reading).
download_file.seek(SeekFrom::Start(0)).await?;

let stream = async_stream::stream! {
// Delete when the stream is dropped.
let _download_file_dir = download_file_dir;

// Create a chunked stream.
loop {
let mut buf = BytesMut::with_capacity(8 * 1024);

let bytes_read = download_file
.read_buf(&mut buf)
.await
.map_err(|e| object_store::Error::Generic {
store: "HTTP",
source: Box::new(e),
})?;

if bytes_read == 0 {
break;
}

yield Ok(buf.freeze());
}
};

let payload = GetResultPayload::Stream(Box::pin(stream));

Ok(GetResult {
payload,
Expand Down

0 comments on commit b5edcca

Please sign in to comment.