Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return timing data in response header(s) #438

Merged
merged 11 commits into from
Jun 23, 2023
35 changes: 29 additions & 6 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use datafusion::error::DataFusionError;

use std::fmt::Debug;
use std::io::Write;
use std::time::Instant;
use std::{net::SocketAddr, sync::Arc};
use warp::{hyper, Rejection};

Expand Down Expand Up @@ -34,6 +35,7 @@ use warp::multipart::{FormData, Part};
use warp::reply::{with_header, Response};
use warp::{hyper::header, hyper::StatusCode, Filter, Reply};

use super::http_utils::{handle_rejection, into_response, ApiError};
use crate::auth::{token_to_principal, AccessPolicy, Action, UserContext};
use crate::catalog::DEFAULT_DB;
use crate::config::schema::{AccessSettings, MEBIBYTES};
Expand All @@ -44,13 +46,12 @@ use crate::{
},
};

use super::http_utils::{handle_rejection, into_response, ApiError};

const QUERY_HEADER: &str = "X-Seafowl-Query";
const BEARER_PREFIX: &str = "Bearer ";
// We have a very lax CORS on this, so we don't mind browsers
// caching it for as long as possible.
const CORS_MAXAGE: u32 = 86400;
const QUERY_TIME_HEADER: &str = "X-Seafowl-Query-Time";

// Vary on Origin, as warp's CORS responds with Access-Control-Allow-Origin: [origin],
// so we can't cache the response in the browser if the origin changes.
Expand Down Expand Up @@ -155,6 +156,8 @@ pub async fn uncached_read_write_query(
query: String,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let timer = Instant::now();

// If a specific DB name was used as a parameter in the route, scope the context to it,
// effectively making it the default DB for the duration of the session.
if database_name != context.database {
Expand Down Expand Up @@ -215,6 +218,12 @@ pub async fn uncached_read_write_query(
.headers_mut()
.insert(header::CONTENT_TYPE, content_type_with_schema(schema));
}

let elapsed = timer.elapsed().as_millis().to_string();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());

Ok(response)
}

Expand Down Expand Up @@ -285,6 +294,8 @@ pub async fn cached_read_query(
if_none_match: Option<String>,
mut context: Arc<DefaultSeafowlContext>,
) -> Result<Response, ApiError> {
let timer = Instant::now();

// Ignore dots at the end
let query_or_hash = query_or_hash.split('.').next().unwrap();

Expand Down Expand Up @@ -346,6 +357,10 @@ pub async fn cached_read_query(
let schema = physical.schema().clone();
let mut response = plan_to_response(context, physical).await?;

let elapsed = timer.elapsed().as_millis().to_string();
response
.headers_mut()
.insert(QUERY_TIME_HEADER, elapsed.parse().unwrap());
response
.headers_mut()
.insert(header::ETAG, etag.parse().unwrap());
Expand Down Expand Up @@ -477,7 +492,6 @@ pub fn filters(
.max_age(CORS_MAXAGE);

let log = warp::log(module_path!());

// Cached read query
let ctx = context.clone();
let cached_read_query_route = warp::path!(String / "q" / String)
Expand Down Expand Up @@ -605,10 +619,10 @@ pub mod tests {

use crate::catalog::DEFAULT_DB;
use crate::config::schema::{str_to_hex_hash, HttpFrontend};
use crate::testutils::schema_from_header;
use crate::testutils::{assert_header_is_float, schema_from_header};
use crate::{
context::{test_utils::in_memory_context, DefaultSeafowlContext, SeafowlContext},
frontend::http::{filters, QUERY_HEADER},
frontend::http::{filters, QUERY_HEADER, QUERY_TIME_HEADER},
};

fn http_config_from_access_policy_and_cache_control(
Expand Down Expand Up @@ -1518,7 +1532,10 @@ pub mod tests {
)]
#[case::uncached_post("POST", "/q")]
#[tokio::test]
async fn test_http_type_conversion(#[case] method: &str, #[case] path: &str) {
async fn test_http_type_conversion_and_timing_header(
#[case] method: &str,
#[case] path: &str,
) {
let context = Arc::new(in_memory_context().await);
let handler = filters(
context.clone(),
Expand Down Expand Up @@ -1572,5 +1589,11 @@ SELECT
"#
)
);

// Assert the "request-to-response" time header is present
assert!(resp.headers().contains_key(QUERY_TIME_HEADER));
// Assert that it's a float
let header_value = resp.headers().get(QUERY_TIME_HEADER).unwrap();
assert_header_is_float(header_value);
}
}
7 changes: 7 additions & 0 deletions src/testutils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cmp::min;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::Int32Array;
Expand Down Expand Up @@ -161,3 +162,9 @@ pub fn schema_from_header(headers: &HeaderMap<HeaderValue>) -> Schema {

schema_from_json(&schema_json).expect("arrow schema reconstructable from JSON")
}

pub fn assert_header_is_float(header: &HeaderValue) -> bool {
let float_str = header.to_str().unwrap();
let parsed_float = f64::from_str(float_str).unwrap();
parsed_float.is_finite()
}