Skip to content

Commit

Permalink
feat: support http chunk response
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Apr 1, 2024
1 parent eb9a7e7 commit 0e08836
Show file tree
Hide file tree
Showing 14 changed files with 335 additions and 78 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pingap"
version = "0.1.5"
version = "0.1.6"
authors = ["Tree Xie <tree.xie@outlook.com>"]
edition = "2021"
categories = ["network-programming", "web-programming::http-server"]
Expand Down Expand Up @@ -30,6 +30,7 @@ http = "1.1.0"
humantime = "2.1.0"
humantime-serde = "1.1.1"
log = "0.4.21"
mime_guess = "2.0.4"
num_cpus = "1.16.0"
once_cell = "1.19.0"
path-absolutize = "3.1.1"
Expand All @@ -41,12 +42,15 @@ serde_json = "1.0.115"
snafu = "0.8.2"
substring = "1.4.5"
tempfile = "3.10.1"
tokio = { version = "1.37.0", features = ["fs"] }
toml = "0.8.12"
url = "2.5.0"
urlencoding = "2.1.3"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
pretty_assertions = "1.4.0"
tokio-test = "0.4.4"

[profile.release]
codegen-units = 1
Expand Down
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
- [x] support get pingap start time
- [ ] custom error for pingora error
- [ ] authentication for admin page
- [x] static file serve
- [ ] set priority for location
4 changes: 2 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use http::{HeaderName, HeaderValue, StatusCode};
use pingap::cache::{convert_headers, HttpResponse};
use pingap::config::{LocationConf, UpstreamConf};
use pingap::http_extra::{convert_headers, HttpResponse};
use pingap::proxy::{Location, Upstream};
use pingora::http::ResponseHeader;
use std::sync::Arc;
Expand Down Expand Up @@ -78,7 +78,7 @@ fn get_response_header(c: &mut Criterion) {
.as_secs()
- 10,
),
private: Some(true),
cache_private: Some(true),
headers: Some(
convert_headers(&[
"Contont-Type: application/json".to_string(),
Expand Down
3 changes: 2 additions & 1 deletion conf/pingap.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ idle_timeout = "120s"

# Anther upstream using all default config.
[upstreams.diving]
addrs = ["127.0.0.1:5001"]
# static file
addrs = ["file://~/Downloads"]


# Location config list, it will defined as [locations.name]
Expand Down
23 changes: 2 additions & 21 deletions src/config/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use crate::utils;
use base64::{engine::general_purpose::STANDARD, Engine};
use glob::glob;
use http::HeaderValue;
use path_absolutize::*;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt, Snafu};
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
use substring::Substring;
use toml::{map::Map, Value};
use url::Url;

Expand Down Expand Up @@ -238,23 +236,6 @@ struct TomlConfig {
work_stealing: Option<bool>,
}

fn resolve_path(path: &str) -> String {
if path.is_empty() {
return "".to_string();
}
let mut p = path.to_string();
if p.starts_with('~') {
if let Some(home) = dirs::home_dir() {
p = home.to_string_lossy().to_string() + p.substring(1, p.len());
};
}
if let Ok(p) = Path::new(&p).absolutize() {
p.to_string_lossy().to_string()
} else {
p
}
}

fn format_toml(value: &Value) -> String {
if let Some(value) = value.as_table() {
value.to_string()
Expand All @@ -268,7 +249,7 @@ fn format_toml(value: &Value) -> String {
/// Validate the config before save.
pub fn save_config(path: &str, conf: &PingapConf) -> Result<()> {
conf.validate()?;
let filepath = resolve_path(path);
let filepath = utils::resolve_path(path);
let buf = toml::to_string_pretty(conf).context(SerSnafu)?;
std::fs::write(&filepath, buf).context(IoSnafu { file: filepath })?;

Expand All @@ -277,7 +258,7 @@ pub fn save_config(path: &str, conf: &PingapConf) -> Result<()> {

/// Load the config from path.
pub fn load_config(path: &str, admin: bool) -> Result<PingapConf> {
let filepath = resolve_path(path);
let filepath = utils::resolve_path(path);
ensure!(
!filepath.is_empty(),
InvalidSnafu {
Expand Down
7 changes: 7 additions & 0 deletions src/http_extra/http_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ pub static HTTP_HEADER_CONTENT_JSON: Lazy<HttpHeader> = Lazy::new(|| {
)
});

pub static HTTP_HEADER_TRANSFER_CHUNKED: Lazy<HttpHeader> = Lazy::new(|| {
(
header::TRANSFER_ENCODING,
HeaderValue::from_str("chunked").unwrap(),
)
});

#[cfg(test)]
mod tests {
use super::convert_headers;
Expand Down
123 changes: 100 additions & 23 deletions src/http_extra/http_response.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::HTTP_HEADER_TRANSFER_CHUNKED;
use super::{HttpHeader, HTTP_HEADER_CONTENT_JSON, HTTP_HEADER_NO_STORE};
use bytes::Bytes;
use http::header;
Expand All @@ -6,9 +7,26 @@ use log::error;
use pingora::http::ResponseHeader;
use pingora::proxy::Session;
use serde::Serialize;
use std::pin::Pin;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::AsyncReadExt;

#[derive(Default, Debug, Clone)]
fn get_cache_control(max_age: Option<u32>, cache_private: Option<bool>) -> HttpHeader {
if let Some(max_age) = max_age {
let category = if cache_private.unwrap_or_default() {
"private"
} else {
"public"
};
if let Ok(value) = header::HeaderValue::from_str(&format!("{category}, max-age={max_age}"))
{
return (header::CACHE_CONTROL, value);
}
}
HTTP_HEADER_NO_STORE.clone()
}

#[derive(Default, Clone)]
pub struct HttpResponse {
// http response status
pub status: StatusCode,
Expand All @@ -19,33 +37,44 @@ pub struct HttpResponse {
// created time of http response
pub created_at: Option<u64>,
// private for cache control
pub private: Option<bool>,
pub cache_private: Option<bool>,
// headers for http response
pub headers: Option<Vec<HttpHeader>>,
}

impl HttpResponse {
pub fn no_content() -> HttpResponse {
pub fn no_content() -> Self {
HttpResponse {
status: StatusCode::NO_CONTENT,
headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
..Default::default()
}
}
pub fn not_found() -> HttpResponse {
pub fn not_found() -> Self {
HttpResponse {
status: StatusCode::NOT_FOUND,
headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
body: Bytes::from("Not Found"),
..Default::default()
}
}
pub fn unknown_error() -> HttpResponse {
pub fn unknown_error() -> Self {
HttpResponse {
status: StatusCode::INTERNAL_SERVER_ERROR,
headers: Some(vec![HTTP_HEADER_NO_STORE.clone()]),
body: Bytes::from("Unknown error"),
..Default::default()
}
}
pub fn try_from_json<T>(value: &T) -> pingora::Result<HttpResponse>
pub fn try_from_json_status<T>(value: &T, status: StatusCode) -> pingora::Result<Self>
where
T: ?Sized + Serialize,
{
let mut resp = HttpResponse::try_from_json(value)?;
resp.status = status;
Ok(resp)
}
pub fn try_from_json<T>(value: &T) -> pingora::Result<Self>
where
T: ?Sized + Serialize,
{
Expand All @@ -68,24 +97,11 @@ impl HttpResponse {
.as_ref()
.map_or_else(|| fix_size, |headers| headers.len() + fix_size);
let mut resp = ResponseHeader::build(self.status, Some(size))?;
resp.insert_header(http::header::CONTENT_LENGTH, self.body.len().to_string())?;
resp.insert_header(header::CONTENT_LENGTH, self.body.len().to_string())?;

// set cache control
if let Some(max_age) = self.max_age {
let category = if self.private.unwrap_or_default() {
"private"
} else {
"public"
};
if let Ok(value) =
header::HeaderValue::from_str(&format!("{category}, max-age={max_age}"))
{
resp.insert_header(header::CACHE_CONTROL, value)?;
}
} else {
let h = HTTP_HEADER_NO_STORE.clone();
resp.insert_header(h.0, h.1)?;
}
let cache_control = get_cache_control(self.max_age, self.cache_private);
resp.insert_header(cache_control.0, cache_control.1)?;

if let Some(created_at) = self.created_at {
let secs = SystemTime::now()
Expand Down Expand Up @@ -115,6 +131,67 @@ impl HttpResponse {
}
}

pub struct HttpChunkResponse<'r, R> {
pub reader: Pin<&'r mut R>,
pub chunk_size: usize,
// max age of http response
pub max_age: Option<u32>,
// private for cache control
pub cache_private: Option<bool>,
// headers for http response
pub headers: Option<Vec<HttpHeader>>,
}

impl<'r, R> HttpChunkResponse<'r, R>
where
R: tokio::io::AsyncRead + std::marker::Unpin,
{
pub fn new(r: &'r mut R) -> Self {
Self {
reader: Pin::new(r),
chunk_size: 5 * 1024,
max_age: None,
headers: None,
cache_private: None,
}
}
pub async fn send(&mut self, session: &mut Session) -> pingora::Result<usize> {
let mut sent = 0;
let mut resp = ResponseHeader::build(StatusCode::OK, Some(4))?;
if let Some(headers) = &self.headers {
for (name, value) in headers {
resp.insert_header(name.to_owned(), value)?;
}
}

let chunked = HTTP_HEADER_TRANSFER_CHUNKED.clone();
resp.insert_header(chunked.0, chunked.1)?;

let cache_control = get_cache_control(self.max_age, self.cache_private);
resp.insert_header(cache_control.0, cache_control.1)?;

session.write_response_header(Box::new(resp)).await?;

let mut buffer = vec![0; self.chunk_size.max(512)];
loop {
let size = self.reader.read(&mut buffer).await.map_err(|e| {
error!("Read data fail: {e}");
pingora::Error::new_str("Read data fail")
})?;
if size == 0 {
break;
}
session
.write_response_body(Bytes::copy_from_slice(&buffer[..size]))
.await?;
sent += size
}
session.finish_body().await?;

Ok(sent)
}
}

#[cfg(test)]
mod tests {
use super::HttpResponse;
Expand All @@ -136,7 +213,7 @@ mod tests {
.as_secs()
- 10,
),
private: Some(true),
cache_private: Some(true),
headers: Some(
convert_headers(&[
"Contont-Type: application/json".to_string(),
Expand Down
Loading

0 comments on commit 0e08836

Please sign in to comment.