Skip to content

Commit

Permalink
download data to a file and read from it
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Mar 19, 2024
1 parent 0428ced commit e5879a2
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion::datasource::file_format::FileFormat;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::http::{HttpBuilder, HttpStore};
use object_store::path::Path as ObjectStorePath;
use object_store::{
Expand All @@ -24,8 +25,9 @@ use object_store::{
PutOptions,
PutResult,
};
use tokio::io::AsyncWrite;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use url::Url;
use uuid::Uuid;

use super::glob_util::ResolvedPattern;
use super::ObjStoreAccess;
Expand Down Expand Up @@ -187,18 +189,27 @@ impl SimpleHttpStore {
)));
}

// TODO: Maybe write the byte stream to file?
// Would only be useful when the returned bytes are too many.
let contents = res.bytes().await?;
let download_file_path = std::env::temp_dir().join(format!("{}", Uuid::new_v4()));

let size = contents.len();
let mut contents = res.bytes_stream();

let stream = async_stream::stream! {
let res: Result<Bytes, object_store::Error> = Ok(contents);
yield res;
};
let mut download_file = tokio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.read(true)
.open(&download_file_path)
.await?;

let payload = GetResultPayload::Stream(Box::pin(stream));
while let Some(b) = contents.next().await {
let b = b?;
download_file.write_all(&b).await?;
}

let file_meta = download_file.metadata().await?;

let size = file_meta.len() as usize;

let payload = GetResultPayload::File(download_file.into_std().await, download_file_path);

Ok(GetResult {
payload,
Expand Down

0 comments on commit e5879a2

Please sign in to comment.