Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rewrite chunked file #21

Merged
merged 1 commit into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/depot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fmt;

/// Depot if for store temp data of current request. Each handler can read or write data to it.
///
#[derive(Default)]
pub struct Depot {
data: HashMap<String, Box<dyn Any + Send>>,
}
Expand Down
95 changes: 42 additions & 53 deletions core/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,30 @@ pub use named_file::*;

use bytes::BytesMut;
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, io};
use tokio::io::{AsyncRead, AsyncSeek};
use std::{
cmp,
io::{self, Read, Seek},
};

pub(crate) enum ChunkedState<T> {
File(Option<T>),
Future(tokio::task::JoinHandle<Result<(T, BytesMut), io::Error>>),
}

pub struct FileChunk<T> {
chunk_size: u64,
read_size: u64,
buffer_size: u64,
offset: u64,
file: T,
state: ChunkedState<T>,
}

impl<T> Stream for FileChunk<T>
where
T: AsyncRead + AsyncSeek + Unpin,
T: Read + Seek + Unpin + Send + 'static,
{
type Item = Result<BytesMut, io::Error>;

Expand All @@ -27,63 +35,44 @@ where
return Poll::Ready(None);
}

let max_bytes = cmp::min(self.chunk_size.saturating_sub(self.read_size), self.buffer_size) as usize;
let offset = self.offset;
match self.state {
ChunkedState::File(ref mut file) => {
let mut file = file.take().expect("ChunkedReadFile polled after completion");
let max_bytes = cmp::min(self.chunk_size.saturating_sub(self.read_size), self.buffer_size) as usize;
let offset = self.offset;
let fut = tokio::task::spawn_blocking(move || {
let mut buf = BytesMut::with_capacity(max_bytes);
// safety: it has max bytes capacity, and we don't read it
unsafe {
buf.set_len(max_bytes);
}
file.seek(io::SeekFrom::Start(offset))?;

// must call poll_complete before start_seek, and call poll_complete to confirm seek finished
// https://docs.rs/tokio/1.4.0/tokio/io/trait.AsyncSeek.html#errors
futures::ready!(Pin::new(&mut self.file).poll_complete(cx))?;
Pin::new(&mut self.file).start_seek(io::SeekFrom::Start(offset))?;
futures::ready!(Pin::new(&mut self.file).poll_complete(cx))?;
file.by_ref().read_exact(&mut buf)?;

let mut data = BytesMut::with_capacity(max_bytes);
// safety: it has max bytes capacity, and we don't read it
unsafe {
data.set_len(max_bytes);
}
// Temporary index
let mut read_num = 0;

loop {
let mut buf = tokio::io::ReadBuf::new(&mut data.as_mut()[read_num..]);
match Pin::new(&mut self.file).poll_read(cx, &mut buf) {
Poll::Ready(Ok(())) => {
// we only read this size data from the file
let filled = buf.filled().len();
if filled == 0 {
return Poll::Ready(Some(Err(std::io::ErrorKind::UnexpectedEof.into())));
} else {
self.offset += filled as u64;
self.read_size += filled as u64;
read_num += filled;
// read to end
if read_num == max_bytes {
return Poll::Ready(Some(Ok(data)));
} else {
// try read more
continue;
}
}
}
Poll::Pending => {
// have read some buf, but pending here
// so return read these data
if read_num != 0 {
data.truncate(read_num);
return Poll::Ready(Some(Ok(data)));
} else {
return Poll::Pending;
}
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Ok((file, buf))
});

self.state = ChunkedState::Future(fut);
self.poll_next(cx)
}
ChunkedState::Future(ref mut fut) => {
let (file, buf) = futures::ready!(Pin::new(fut).poll(cx))
.map_err(|_| io::Error::new(io::ErrorKind::Other, "BlockingErr"))??;
self.state = ChunkedState::File(Some(file));

self.offset += buf.len() as u64;
self.read_size += buf.len() as u64;

Poll::Ready(Some(Ok(buf)))
}
}
}
}

#[cfg(test)]
mod test {
use super::FileChunk;
use super::{ChunkedState, FileChunk};
use futures::stream::StreamExt;
use std::io::Cursor;

Expand All @@ -97,7 +86,7 @@ mod test {
read_size: 0,
buffer_size: 65535,
offset: 0,
file: mock.clone(),
state: ChunkedState::File(Some(mock.clone())),
};

let mut result = bytes::BytesMut::with_capacity(SIZE as usize);
Expand Down
6 changes: 3 additions & 3 deletions core/src/fs/named_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use bitflags::bitflags;
use headers::*;
use mime_guess::from_path;

use super::FileChunk;
use super::{ChunkedState, FileChunk};
use crate::http::header;
use crate::http::header::{CONTENT_DISPOSITION, CONTENT_ENCODING};
use crate::http::range::HttpRange;
Expand Down Expand Up @@ -404,7 +404,7 @@ impl Writer for NamedFile {
offset,
chunk_size: cmp::min(length, self.metadata.len()),
read_size: 0,
file: self.file,
state: ChunkedState::File(Some(self.file.into_std().await)),
buffer_size: self.buffer_size,
};
res.headers_mut().typed_insert(ContentLength(reader.chunk_size));
Expand All @@ -413,7 +413,7 @@ impl Writer for NamedFile {
res.set_status_code(StatusCode::OK);
let reader = FileChunk {
offset,
file: self.file,
state: ChunkedState::File(Some(self.file.into_std().await)),
chunk_size: length,
read_size: 0,
buffer_size: self.buffer_size,
Expand Down
2 changes: 1 addition & 1 deletion extra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ compression = ["async-compression", "tokio", "tokio-stream", "tokio-util"]
cors = []
size_limiter = []
proxy = ["hyper", "hyper-tls"]
serve = ["chrono", "mime", "percent-encoding"]
serve = ["chrono", "mime", "percent-encoding", "tokio"]
sse = ["futures", "pin-project", "tokio"]
ws = ["futures", "tokio", "tokio-tungstenite"]

Expand Down
24 changes: 11 additions & 13 deletions extra/src/serve/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::prelude::*;
use percent_encoding::{utf8_percent_encode, CONTROLS};
use serde_json::json;
use std::collections::HashMap;
use std::fs::{self, Metadata};
use std::fs::Metadata;
use std::path::{Path, PathBuf};
use std::time::SystemTime;

Expand Down Expand Up @@ -188,18 +188,16 @@ impl Handler for StaticDir {
}
}
//list the dir
if let Ok(entries) = fs::read_dir(&path) {
for entry in entries {
if let Ok(entry) = entry {
if let Ok(metadata) = entry.metadata() {
if metadata.is_dir() {
dirs.entry(entry.file_name().into_string().unwrap_or_else(|_| "".to_owned()))
.or_insert(metadata);
} else {
files
.entry(entry.file_name().into_string().unwrap_or_else(|_| "".to_owned()))
.or_insert(metadata);
}
if let Ok(mut entries) = tokio::fs::read_dir(&path).await {
while let Ok(Some(entry)) = entries.next_entry().await {
if let Ok(metadata) = entry.metadata().await {
if metadata.is_dir() {
dirs.entry(entry.file_name().into_string().unwrap_or_else(|_| "".to_owned()))
.or_insert(metadata);
} else {
files
.entry(entry.file_name().into_string().unwrap_or_else(|_| "".to_owned()))
.or_insert(metadata);
}
}
}
Expand Down