diff --git a/docs/pageserver-thread-mgmt.md b/docs/pageserver-thread-mgmt.md
index 0cc897f1542f..b91193352892 100644
--- a/docs/pageserver-thread-mgmt.md
+++ b/docs/pageserver-thread-mgmt.md
@@ -52,9 +52,7 @@ completion, or shield the rest of the code from surprise cancellations
by spawning a separate task. The code that handles incoming HTTP
requests, for example, spawns a separate task for each request,
because Hyper will drop the request-handling Future if the HTTP
-connection is lost. (FIXME: our HTTP handlers do not do that
-currently, but we should fix that. See [issue
-3478](https://github.com/neondatabase/neon/issues/3478)).
+connection is lost.
#### How to cancel, then?
diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs
index db3642b507e7..7cb96d909458 100644
--- a/libs/utils/src/http/endpoint.rs
+++ b/libs/utils/src/http/endpoint.rs
@@ -40,6 +40,12 @@ struct RequestId(String);
///
/// This also handles errors, logging them and converting them to an HTTP error response.
///
+/// NB: If the client disconnects, Hyper will drop the Future, without polling it to
+/// completion. In other words, the handler must be async cancellation safe! request_span
+/// prints a warning to the log when that happens, so that you have some trace of it in
+/// the log.
+///
+///
/// There could be other ways to implement similar functionality:
///
/// * procmacros placed on top of all handler methods
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 61028e23fea0..22dedbe5b29a 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -1,3 +1,6 @@
+//!
+//! Management HTTP API
+//!
use std::collections::HashMap;
use std::sync::Arc;
@@ -46,7 +49,6 @@ use utils::{
};
// Imports only used for testing APIs
-#[cfg(feature = "testing")]
use super::models::ConfigureFailpointsRequest;
struct State {
@@ -290,13 +292,19 @@ fn build_timeline_info_common(
}
// healthcheck handler
-async fn status_handler(request: Request
) -> Result, ApiError> {
+async fn status_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
check_permission(&request, None)?;
let config = get_config(&request);
json_response(StatusCode::OK, StatusResponse { id: config.id })
}
-async fn timeline_create_handler(mut request: Request) -> Result, ApiError> {
+async fn timeline_create_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
@@ -332,7 +340,10 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> {
+async fn timeline_list_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size: Option =
parse_query_param(&request, "include-non-incremental-logical-size")?;
@@ -366,7 +377,10 @@ async fn timeline_list_handler(request: Request) -> Result,
json_response(StatusCode::OK, response_data)
}
-async fn timeline_detail_handler(request: Request) -> Result, ApiError> {
+async fn timeline_detail_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size: Option =
@@ -400,7 +414,10 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result, ApiError> {
+async fn get_lsn_by_timestamp_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -424,7 +441,10 @@ async fn get_lsn_by_timestamp_handler(request: Request) -> Result) -> Result, ApiError> {
+async fn tenant_attach_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -460,7 +480,10 @@ async fn tenant_attach_handler(mut request: Request) -> Result) -> Result, ApiError> {
+async fn timeline_delete_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -474,7 +497,10 @@ async fn timeline_delete_handler(request: Request) -> Result) -> Result, ApiError> {
+async fn tenant_detach_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let detach_ignored: Option = parse_query_param(&request, "detach_ignored")?;
@@ -488,7 +514,10 @@ async fn tenant_detach_handler(request: Request) -> Result,
json_response(StatusCode::OK, ())
}
-async fn tenant_load_handler(request: Request) -> Result, ApiError> {
+async fn tenant_load_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -508,7 +537,10 @@ async fn tenant_load_handler(request: Request) -> Result, A
json_response(StatusCode::ACCEPTED, ())
}
-async fn tenant_ignore_handler(request: Request) -> Result, ApiError> {
+async fn tenant_ignore_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -521,7 +553,10 @@ async fn tenant_ignore_handler(request: Request) -> Result,
json_response(StatusCode::OK, ())
}
-async fn tenant_list_handler(request: Request) -> Result, ApiError> {
+async fn tenant_list_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
check_permission(&request, None)?;
let response_data = mgr::list_tenants()
@@ -541,7 +576,10 @@ async fn tenant_list_handler(request: Request) -> Result, A
json_response(StatusCode::OK, response_data)
}
-async fn tenant_status(request: Request) -> Result, ApiError> {
+async fn tenant_status(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -581,7 +619,10 @@ async fn tenant_status(request: Request) -> Result, ApiErro
/// Note: we don't update the cached size and prometheus metric here.
/// The retention period might be different, and it's nice to have a method to just calculate it
/// without modifying anything anyway.
-async fn tenant_size_handler(request: Request) -> Result, ApiError> {
+async fn tenant_size_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let inputs_only: Option = parse_query_param(&request, "inputs_only")?;
@@ -646,7 +687,10 @@ async fn tenant_size_handler(request: Request) -> Result, A
)
}
-async fn layer_map_info_handler(request: Request) -> Result, ApiError> {
+async fn layer_map_info_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let reset: LayerAccessStatsReset =
@@ -660,7 +704,10 @@ async fn layer_map_info_handler(request: Request) -> Result
json_response(StatusCode::OK, layer_map_info)
}
-async fn layer_download_handler(request: Request) -> Result, ApiError> {
+async fn layer_download_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -683,7 +730,10 @@ async fn layer_download_handler(request: Request) -> Result
}
}
-async fn evict_timeline_layer_handler(request: Request) -> Result, ApiError> {
+async fn evict_timeline_layer_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -761,7 +811,10 @@ pub fn html_response(status: StatusCode, data: String) -> Result,
Ok(response)
}
-async fn tenant_create_handler(mut request: Request) -> Result, ApiError> {
+async fn tenant_create_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let request_data: TenantCreateRequest = json_request(&mut request).await?;
let target_tenant_id = request_data.new_tenant_id;
check_permission(&request, None)?;
@@ -808,7 +861,10 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> {
+async fn get_tenant_config_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -834,6 +890,7 @@ async fn get_tenant_config_handler(request: Request) -> Result,
+ _cancel: CancellationToken,
) -> Result, ApiError> {
let request_data: TenantConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
@@ -851,8 +908,10 @@ async fn update_tenant_config_handler(
}
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
-#[cfg(feature = "testing")]
-async fn handle_tenant_break(r: Request) -> Result, ApiError> {
+async fn handle_tenant_break(
+ r: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
@@ -864,8 +923,10 @@ async fn handle_tenant_break(r: Request) -> Result, ApiErro
json_response(StatusCode::OK, ())
}
-#[cfg(feature = "testing")]
-async fn failpoints_handler(mut request: Request) -> Result, ApiError> {
+async fn failpoints_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow!(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
@@ -898,7 +959,10 @@ async fn failpoints_handler(mut request: Request) -> Result
}
// Run GC immediately on given timeline.
-async fn timeline_gc_handler(mut request: Request) -> Result, ApiError> {
+async fn timeline_gc_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -917,8 +981,10 @@ async fn timeline_gc_handler(mut request: Request) -> Result) -> Result, ApiError> {
+async fn timeline_compact_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -939,8 +1005,10 @@ async fn timeline_compact_handler(request: Request) -> Result) -> Result, ApiError> {
+async fn timeline_checkpoint_handler(
+ request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -964,6 +1032,7 @@ async fn timeline_checkpoint_handler(request: Request) -> Result,
+ _cancel: CancellationToken,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
@@ -979,6 +1048,7 @@ async fn timeline_download_remote_layers_handler_post(
async fn timeline_download_remote_layers_handler_get(
request: Request,
+ _cancel: CancellationToken,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -1002,7 +1072,10 @@ async fn active_timeline_of_active_tenant(
.map_err(ApiError::NotFound)
}
-async fn always_panic_handler(req: Request) -> Result, ApiError> {
+async fn always_panic_handler(
+ req: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
// Deliberately cause a panic to exercise the panic hook registered via std::panic::set_hook().
// For pageserver, the relevant panic hook is `tracing_panic_hook` , and the `sentry` crate's wrapper around it.
// Use catch_unwind to ensure that tokio nor hyper are distracted by our panic.
@@ -1013,7 +1086,10 @@ async fn always_panic_handler(req: Request) -> Result, ApiE
json_response(StatusCode::NO_CONTENT, ())
}
-async fn disk_usage_eviction_run(mut r: Request) -> Result, ApiError> {
+async fn disk_usage_eviction_run(
+ mut r: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
check_permission(&r, None)?;
#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
@@ -1103,8 +1179,10 @@ async fn handler_404(_: Request) -> Result, ApiError> {
)
}
-#[cfg(feature = "testing")]
-async fn post_tracing_event_handler(mut r: Request) -> Result, ApiError> {
+async fn post_tracing_event_handler(
+ mut r: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
enum Level {
@@ -1134,6 +1212,85 @@ async fn post_tracing_event_handler(mut r: Request) -> Result(request: Request, handler: H) -> Result, ApiError>
+where
+ R: std::future::Future