-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tokio Might Need a Safe Method to Refresh Runtime #6760
Comments
I have seen issues in other framework reporting about the high memory usage, i guess it might help a lot if this issue solved |
also, can you share the scripts you used in testing tokio & other framework? |
I'm not sure which containers that would be. For example, with the multi-thread runtime, each worker thread has a fixed size buffer whose size never changes from 256. And for the global queue, tasks are stored linked lists and so the memory is freed when the task is done. Please verify that the memory actually stays high when measured using this utility: use core::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::alloc::{GlobalAlloc, Layout, System};
struct TrackedAlloc {}
#[global_allocator]
static ALLOC: TrackedAlloc = TrackedAlloc;
static TOTAL_MEM: AtomicUsize = AtomicUsize::new(0);
unsafe impl GlobalAlloc for TrackedAlloc {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ret = System.alloc(layout);
if !ret.is_null() {
TOTAL_MEM.fetch_add(layout.size(), Relaxed);
}
ret
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
TOTAL_MEM.fetch_sub(layout.size(), Relaxed);
System.dealloc(ptr, layout);
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ret = System.alloc_zeroed(layout);
if !ret.is_null() {
TOTAL_MEM.fetch_add(layout.size(), Relaxed);
}
ret
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let ret = System.realloc(ptr, layout, new_size);
if !ret.is_null() {
TOTAL_MEM.fetch_add(new_size.wrapping_sub(layout.size()), Relaxed);
}
ret
}
} If it doesn't, then this is entirely up to the allocator and not something Tokio really has any power over. |
After tidying up the code, I primarily tested memory usage under four conditions. For each state, 4 to 5 seconds after startup, a million concurrent HTTP/2 requests were made, and the chart reflects the status of the first two rounds of testing. In these charts, the top-left corner shows the default allocator without refreshing, the top-right corner displays mimalloc without refreshing, the bottom-left corner is the default allocator with refreshing, and the bottom-right corner is mimalloc with refreshing. The horizontal axis represents server startup time in seconds. The vertical axis on the left side indicates memory size, applicable to both inner_mem and outer_mem data, which represent the size of process memory tracked internally and externally, respectively, measured in Mb. The vertical axis on the right side shows the number of handle connections and concurrent tasks, with a maximum connection count of one million and a maximum task count of 1,000,008. Internal memory tracking was implemented by modifying the global memory allocator, while external monitoring of process memory was achieved using the third-party library sysinfo. The values for external monitoring of process memory can be unstable but generally align with those from the top command or the graphical task manager in Linux. From these charts, it's evident that when conducting a million concurrent tests, as real-time tasks increase, memory usage spikes sharply, reaching around 2,500 MB at its peak. The internal memory monitoring always clears automatically when tasks decrease, indicating there is no memory leak in the code. However, when using the default allocator, even when tasks decrease, the external memory monitoring continues to rise, only releasing when the number of connections drops to zero. After release, the memory stabilizes at around 2,000 MB. When using the mi_malloc allocator, the external memory monitoring releases memory as tasks are freed, but it doesn't reduce to a very low figure, instead leveling off at around 600 MB. After concurrency ends, if the runtime is refreshed, it's noticeable that the external memory monitoring can shrink to a very low level. With the default allocator, the external memory monitoring will contract once or twice and then stabilize, ending up between 10 and 100 MB. Using the mi_malloc allocator, the external memory monitoring gradually contracts to around 15 MB, remaining stable. From these test results, it's clear that refreshing Tokio's runtime does indeed have a meaningful impact on the release of externally monitored memory.
The client is a high-concurrency client implemented with reqwest, initialized with 8 HTTP/2 clients since the CPU has 8 threads. You can adjust this number when copying the code for testing. It can set multiple rounds of a million concurrent requests, with a default rest period of 10 seconds between each round (to observe memory shrinkage on the server side). My client's working memory is about 7-8 GB, and its idle memory is around 4 GB. Although the client code is presented first, you would typically start the server before launching the client. cargo.toml: [dependencies]
reqwest = "0.12.5"
tokio = { version = "1.39.2", features = ["full"] } src/main.rs: use std::time::{Duration, Instant};
/// Get the URL for the request; for multiple target servers, implement address rotation here.
fn get_url(round: usize) -> &'static str {
"http://0.0.0.0:3000?server=axum&expire=20000"
}
/// Number of rounds
const TOTAL_ROUNDS: usize = 300;
/// Requests per round
const TOTAL_REQUESTS: usize = 1_000_000;
/// Waiting period between rounds, in seconds
const WAITING_SECS: u64 = 10;
/// Number of clients
const NUM_CLIENTS: usize = 8;
/// Countdown output during waiting periods
async fn countdown(seconds: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(1));
for i in 0..=seconds {
interval.tick().await;
print!("\rwaiting for ({}/{}) seconds...", i, seconds);
use std::io::Write;
std::io::stdout().flush().unwrap();
}
println!("\r{}\r", " ".repeat(30));
}
/// Calculate percentile times for responses
fn calculate_percentiles(durations: Vec<Duration>) -> [Duration; 5] {
let mut sorted_durations = durations;
sorted_durations.sort_by_key(|d| d.as_millis());
let total_requests = sorted_durations.len();
[
sorted_durations[total_requests / 2],
sorted_durations[total_requests * 9 / 10],
sorted_durations[total_requests * 99 / 100],
sorted_durations[total_requests * 999 / 1000],
sorted_durations.last().cloned().unwrap_or_default(),
]
}
#[tokio::main]
async fn main() {
let start = Instant::now();
let clients: Vec<reqwest::Client> = (0..NUM_CLIENTS)
.map(|_| {
reqwest::ClientBuilder::new()
.pool_idle_timeout(Duration::from_secs(1))
.http2_prior_knowledge() // mark as an HTTP/2 client
.build()
.unwrap()
})
.collect();
println!("build-clients-time: {:.3?}", start.elapsed());
for round in 1..=TOTAL_ROUNDS {
multi_requests(round, &clients).await;
countdown(WAITING_SECS).await;
}
}
/// Concurrent request function per round
async fn multi_requests(round: usize, clients: &Vec<reqwest::Client>) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let mut results = std::collections::HashMap::new();
let mut successful_times = Vec::new();
let mut successful_count = 0u32;
let clients_len = clients.len();
let start = Instant::now();
let mut tasks = Vec::with_capacity(TOTAL_REQUESTS);
for i in 0..TOTAL_REQUESTS {
let client = clients[i % clients_len].clone();
tasks.push(runtime.spawn(async move {
let url = get_url(round);
let request_start = Instant::now();
let response = client
.get(url)
.timeout(Duration::from_secs(60))
.send()
.await;
let result = match response {
Ok(r) => r.text().await,
Err(e) => Err(e),
};
(result, request_start.elapsed())
}));
}
println!("{round} - prepared-time: {:.3?}", start.elapsed());
// Convert all responses and error types to strings for statistics
for task in tasks {
let (result, elapsed) = task.await.unwrap();
if result.is_ok() {
successful_times.push(elapsed);
successful_count += 1;
}
let result = result.unwrap_or_else(|e| e.to_string());
*results.entry(result).or_insert(0) += 1;
}
println!("{round} - completed-time: {:.3?}", start.elapsed());
// Output specific data if there are successful responses
if successful_count > 0 {
println!(
"{round} - successful-count: {successful_count}, average-time: {:.3?}",
successful_times.iter().sum::<Duration>() / (successful_count as u32)
);
let percentiles = calculate_percentiles(successful_times);
println!(
"{round} - percentiles: {{ 50%: {:.3?}, 90%: {:.3?}, 99%: {:.3?}, 99.9%: {:.3?}, 100%: {:.3?} }}",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4]
);
}
// Calculate the success rate of responses
println!(
"{round} - results-len: {}, success-rate: {:.2}%",
results.len(),
100. * successful_count as f64 / TOTAL_REQUESTS as f64
);
// Do not output the results if there are too many.
if results.len() <= 100 {
println!("{round} - results: {:#?}", results);
}
runtime.shutdown_background();
} Typical output data:
Correct responses include: server type identifier, server process ID, HTTP type, and the client's IP address. There are a total of one million requests, eight clients, with each client receiving 125,000 requests. If you press
The server uses axum and hyper in low-level mode. The code is lengthy and includes memory monitoring (tracking both the allocator's internal memory and the external process memory), global runtime monitoring and replacement (using When investigating memory leaks in Rust, we usually monitor the memory state through the internal memory allocator (referred to below as INNER_MEMORY). However, during operations management, we typically monitor the external memory (referred to below as OUTER_MEMORY). These two memories are not the same; stable internal memory usually indicates no memory leaks, while external memory significantly larger than internal memory often means the process memory is not promptly returned to the system. mi_malloc is currently mainly compatible with Linux, so you may need to comment out mi_malloc-related code for other systems, but this does not affect observing whether process memory is released. You can switch between memory allocators and whether to refresh Tokio's runtime by toggling comments. After switching comments, you need to recompile incrementally. static REAL_ALLOCATOR: (std::alloc::System, &str, u64) = (std::alloc::System, "DefaultAlloc", 20);
// static REAL_ALLOCATOR: (mimalloc::MiMalloc, &str, u64) = (mimalloc::MiMalloc, "MiMalloc", 20);
// const AUTO_REFRESH: bool = false;
const AUTO_REFRESH: bool = true; cargo.toml: [dependencies]
axum = { version = "0.7.5", features = ["http2"] }
tokio = { version = "1.39.2", features = ["full"] }
mimalloc = "0.1.42"
hyper = "1.4.1"
hyper-util = { version = "0.1.7", features = ["tokio", "server-auto", "http2"] }
tower = { version = "0.4.13", features = ["full"] }
sysinfo = "0.30.13"
parking_lot = "0.12.3" src/main.rs: use std::alloc::{GlobalAlloc, Layout};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, LazyLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use sysinfo::{Pid, System};
use tokio::runtime::{Handle, Runtime};
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server;
use tower::Service;
use axum::extract::{ConnectInfo, Query, Request, State};
use axum::{Extension, Router, routing::get};
/// Define the global runtime
use parking_lot::RwLock;
static GLOBAL_RUNTIME: RwLock<Option<Runtime>> = RwLock::new(None);
/// Process ID
static PID: LazyLock<Pid> = LazyLock::new(|| Pid::from_u32(std::process::id()));
/// Monitor output frequency in the command line, default is once every second
const INTERVAL_SECS: u64 = 1;
/// Custom memory allocator tracker
struct TrackedAlloc;
#[global_allocator]
static ALLOC: TrackedAlloc = TrackedAlloc;
/// Internal memory tracking
static INNER_MEMORY: AtomicUsize = AtomicUsize::new(0);
/// The actual allocator, the first option is the default allocator, the second is MiMalloc. Only one allocator can be used in the same code, so they must be selectively compiled via comments.
/// The tuple contains three elements: the allocator, the allocator name, and the threshold for memory contraction (i.e., the memory value above which the Tokio runtime is refreshed, in MB).
static REAL_ALLOCATOR: (std::alloc::System, &str, u64) = (std::alloc::System, "DefaultAlloc", 20);
// static REAL_ALLOCATOR: (mimalloc::MiMalloc, &str, u64) = (mimalloc::MiMalloc, "MiMalloc", 20);
/// Whether to automatically refresh Tokio's runtime
// const AUTO_REFRESH: bool = false;
const AUTO_REFRESH: bool = true;
/// Implementation of the internal memory tracking
unsafe impl GlobalAlloc for TrackedAlloc {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ret = REAL_ALLOCATOR.0.alloc(layout);
if !ret.is_null() {
INNER_MEMORY.fetch_add(layout.size(), Ordering::Relaxed);
}
ret
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
REAL_ALLOCATOR.0.dealloc(ptr, layout);
INNER_MEMORY.fetch_sub(layout.size(), Ordering::Relaxed);
}
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
let ret = REAL_ALLOCATOR.0.alloc_zeroed(layout);
if !ret.is_null() {
INNER_MEMORY.fetch_add(layout.size(), Ordering::Relaxed);
}
ret
}
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let ret = REAL_ALLOCATOR.0.realloc(ptr, layout, new_size);
if !ret.is_null() {
INNER_MEMORY.fetch_add(new_size.wrapping_sub(layout.size()), Ordering::Relaxed);
}
ret
}
}
/// Handle state tracking
#[derive(Debug, Default)]
struct MyState {
counter: AtomicUsize,
connection: AtomicUsize,
max_connection: AtomicUsize,
}
#[tokio::main]
async fn main() {
// Initialize or replace the global runtime with a write lock
GLOBAL_RUNTIME.write().replace(Runtime::new().unwrap());
println!("axum-demo pid is {}", *PID);
// Bind to the port
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("listening on {}", listener.local_addr().unwrap());
// Initialize the handle state
let my_status: Arc<MyState> = Default::default();
// Start the asynchronous monitor using the main runtime
tokio::spawn(monitor(my_status.clone()));
// Create the axum service
let app = Router::new().route("/", get(index)).with_state(my_status);
// Loop to listen on the port in low-level form
while let Ok((stream, addr)) = listener.accept().await {
let app = app.clone();
// Spawn using a read lock to get the global runtime
GLOBAL_RUNTIME.read().as_ref().unwrap().spawn(async move {
let io = TokioIo::new(stream);
let hyper_service = hyper::service::service_fn(move |req: Request<Incoming>| {
// Inject the address information of each connection into the handle
app.clone().layer(Extension(ConnectInfo(addr))).call(req)
});
if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
.http2() // Set the HTTP/2 service flag
.max_concurrent_streams(1000000) // Maximum concurrent streams per HTTP/2 connection
.keep_alive_timeout(Duration::from_secs(1))
.serve_connection_with_upgrades(io, hyper_service)
.await
{
println!("error serving connection from {addr:?}: {err:#}");
}
});
}
}
/// Handle, sleeps based on the 'expire' value extracted from params, and updates the connection and counter state of `my_status`
async fn index(
version: axum::http::Version,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(my_status): State<Arc<MyState>>,
Query(params): Query<HashMap<String, String>>,
) -> String {
my_status.counter.fetch_add(1, Ordering::Relaxed);
let conn = 1 + my_status.connection.fetch_add(1, Ordering::Relaxed);
if conn > my_status.max_connection.load(Ordering::Relaxed) {
my_status.max_connection.store(conn, Ordering::Relaxed);
}
if let Some(Ok(expire)) = params.get("expire").map(|s| s.parse::<u64>()) {
tokio::time::sleep(Duration::from_millis(expire)).await;
}
my_status.connection.fetch_sub(1, Ordering::Relaxed);
format!("axum({}) {version:?} {addr:?}", *PID)
}
/// Obtain process memory from the outside using the sysinfo third-party library
fn get_memory_usage() -> u64 {
let system_info = System::new_all();
let current_process = system_info.process(*PID).expect("Process not found");
current_process.memory()
}
/// Asynchronous monitor function
async fn monitor(my_status: Arc<MyState>) {
let mut interval = tokio::time::interval(Duration::from_secs(INTERVAL_SECS));
let mut tick = 0;
loop {
interval.tick().await;
let ct = my_status.counter.load(Ordering::Relaxed);
let conn = my_status.connection.load(Ordering::Relaxed);
let max_conn = my_status.max_connection.load(Ordering::Relaxed);
let outer_memory = get_memory_usage();
let global_spawn = GLOBAL_RUNTIME
.read()
.as_ref()
.unwrap()
.handle()
.metrics()
.num_alive_tasks();
let main_spawn = Handle::current().metrics().num_alive_tasks();
// Refresh the global runtime when there are no tasks in the global runtime and the external memory exceeds the threshold
let refresh_message =
if AUTO_REFRESH && global_spawn == 0 && outer_memory > REAL_ALLOCATOR.2 * 1024 * 1024 {
let start = std::time::Instant::now();
// Replace the old global runtime
let old_global_runtime = GLOBAL_RUNTIME.write().replace(Runtime::new().unwrap());
// Shutdown the old global runtime in the main runtime
old_global_runtime.unwrap().shutdown_background();
format!(
", refresh runtime on {} until {}M! cost {:?}",
REAL_ALLOCATOR.1,
REAL_ALLOCATOR.2,
start.elapsed()
)
} else {
"".to_string()
};
// Output the handle counter, real-time connection count, maximum connection count, internal memory, external memory, main runtime running tasks, global runtime running tasks, and refresh information
println!(
"axum {tick}, ct: {ct}, conn: {conn}, max_conn: {max_conn}, inner_mem: {:.3}M, outer_mem: {:.3}M, main_rt: {}, global_rt: {}{}",
INNER_MEMORY.load(Ordering::Relaxed) as f64 / 1024. / 1024.,
outer_memory as f64 / 1024. / 1024.,
main_spawn,
global_spawn,
refresh_message,
);
tick += INTERVAL_SECS;
}
} This is not a good practice for refreshing runtimes. Refreshing the global runtime requires that the number of active tasks reaches zero, meaning that the refresh must happen outside the runtime itself. Moreover, reaching zero active tasks requires waiting, such as waiting for all long-lived connections to end before entering the refresh process. Even when the number of active tasks reaches zero, it does not guarantee that the global runtime can be safely shut down. A zero active task count only indicates that the server's asynchronous state has ended, but TCP connections may not be fully closed. Shutting down the global runtime at this point might close the TCP connections, potentially leading to the loss of data in the client's TCP buffer. Currently, this refresh method is suitable only for axum/hyper; other frameworks depending on Tokio cannot safely use it. I am unsure whether refreshing Tokio's runtime thread and its internal containers allows the system to fully reclaim memory. This would require further detailed testing by someone familiar with the Tokio runtime code. However, overall, replacing the runtime does help the system reclaim memory.
The original data of server logs in the chart is as follows: 1、default-without-refresh
2、mimalloc-without-refresh
3、default-with-refresh
4、mimalloc-with-refresh
|
Hi, @lithbitren, not sure if it would help, but I came across libc's malloc_trim a few month ago when I was trying to reduce the memory usage of my server (using default/system allocator) when idle, it would give back free pages at the top of the heap to the system and did work pretty well for my use case. |
上午看到这个问题,做了下测试,free 后会将空闲的内存合并成大的内存块,并一定会立马还给操作系统。(这块具体可以看 free 源码实现),malloc_trim 就是将空闲的块都还回去。 unsafe {
libc::malloc_trim(0);
} |
我觉得刷新运行时和自己手动 |
我除了上学的时候用过C语言刷题,之后就再没有用过C语言了,对C语言的系统级编程不是很了解。 I tried replacing the memory refresh part of the server-side code with malloc_trim. After testing, it appears that In summary, the best practice currently is to switch to mi_malloc as the memory allocator, but mi_malloc can only release around 70% of the peak memory, making it difficult to shrink the externally monitored process memory to be close to the internal memory. default-with-malloc_trim:
mi_malloc-with-malloc_trim:
|
|
Although it's reasonable for |
@lixiang365 @TinusgragLin @Darksonn 谢谢你们的回复!我这个周末重新测试了一下服务器直接使用malloc_trim来释放内存的情况,大致策略和刷新运行时一致,我发现使用malloc_trim和刷新运行时并不完全冲突,每半分钟进行一次百万并发,每个测试大约持续4个小时几百轮次的百万并发。最终的内存结果如下: Thanks for your replies! Over the weekend, I retested the server using malloc_trim to release memory directly. The strategy was similar to refreshing the runtime, and I found that using malloc_trim and refreshing the runtime are not entirely conflicting. I performed one million concurrent requests every half-minute, with each test lasting approximately 4 hours and consisting of hundreds of rounds. Here are the final memory results:
@TinusgragLin 你说得对,这种测量时间的方法确实不能测量出销毁变量并释放内存的时间。我在测试长期来看,如果只用malloc_trim,最高每次只要160-200ms,但如果在malloc_trim之前刷新运行时,那最终只稳定的需要100ms这样,说明刷新运行时让malloc_trim少执行了100ms。 malloc_trim感觉算是内存分配处理的一个补充,如果先刷新运行时,再使用malloc_trim,内存最终会几乎完全释放。如果只是单纯的使用malloc_trim,好处是可以跨平台,而且可以释放85%左右的内存,坏处是需要手动设计策略。 使用mi_malloc就只能在特定平台,但对代码的入侵程度最小,只需要一次全局声明就可以了,可以大约释放70%的内存。 我这种刷新运行时的方法显然不是一个good practice,对代码的入侵太大,并不适合老项目,如果tokio能够提供一种方便的刷新方法的话,可以配合malloc_trim使用,可以让内存最终尽可能的释放。 @TinusgragLin You were right, this method of measurement doesn't accurately reflect the time it takes to destroy variables and release memory. In the long-term test, if I only used malloc_trim, it took a maximum of just 160-200ms each time, but if I refreshed the runtime before using malloc_trim, the final stable time needed was only around 100ms, indicating that refreshing the runtime reduced the execution time of malloc_trim by 100ms. malloc_trim feels like a supplement to memory allocation handling. If you refresh the runtime first and then use malloc_trim, the memory will almost completely release. If you simply use malloc_trim, the advantage is that it's cross-platform and can release about 85% of the memory, but the downside is that you need to manually design a strategy. Using mi_malloc is limited to specific platforms, but it has the least code intrusion, requiring only a single global declaration, can release about 70% of the memory. Clearly, my method of refreshing the runtime is not a good practice, as it introduces too much code intrusion and is not suitable for older projects. If Tokio could provide a convenient way to refresh, it could be used in conjunction with malloc_trim, allowing for nearly complete memory release. default_alloctor-malloc_trim(runing for 3hours):
default_alloctor-refresh_runtime-malloc_trim(runing for 3hous):
|
hi!
I've been following many web frameworks based on tokio, including actix, hyper, axum, warp, salvo, and others. Many of these web frameworks have issues related to memory leaks, but after investigation, I found that most of these frameworks don't actually have memory leaks. A process not immediately returning memory to the system can't be strictly defined as a memory leak in most cases.
For example, during stress testing, I used reqwest on one Ubuntu machine to perform millions of concurrent HTTP/2 requests against another Ubuntu machine running a tokio-based web service. The server-side handlers were simply designed to asynchronously sleep for 20 seconds.
On the server-side, each framework's performance is very similar. They typically accept all requests within about 4 seconds, then sleep for 20 seconds before responding to the client. Clients finish processing all responses in around 30 seconds, with an average response time from sending to receiving of about 22 seconds. All tokio-based web frameworks perform well.
However, after millions of concurrent requests, the process memory for actix and axum remains at 3.5GB to 4.5GB, and this memory never decreases automatically. Unless the memory allocator is changed to mi_malloc, in which case the memory drops to 0.7GB to 1.5GB after concurrency ends. Many rust web frameworks' memory leak issues are like this – the memory does not contract after high concurrency without changing the memory allocator.
In the issue sections for tokio, actix, axum, and hyper, when encountering memory leak issues, community developers and contributors often say that not returning memory to the system improves performance for future memory usage. However, there is no evidence to support that releasing memory significantly impacts server performance. Based on my multiple rounds of testing, whether starting up or using mi_malloc, regardless of the size of the memory, the response times for clients during the next round of million-level concurrency are similar. This means that the server-side handling performance remains consistent, making it hard to prove the claims that not releasing memory significantly improves the performance of future memory allocations.
For the server-side, if an attacker targets a slow web API and initiates a massive concurrent attack, even if the server process does not crash, the consumed memory will not automatically disappear. Only by restarting the process can the memory be reduced to normal levels. For example, if your business QPS is usually a few hundred to a few thousand, and regular memory usage is only 100 MB, after an attack, the memory might become 4.5GB that cannot contract. This situation is unfavorable for operations management, as monitoring data should be as accurate as possible. The behavior of tokio-based web frameworks in this regard is quite troubling.
However, there are some workarounds. For instance, when using axum or hyper, I tried declaring a global
RUNTIME
and then spawning coroutines to handle requests usingRUNTIME.spawn(async move {...})
during loop-accept. When refreshing is needed, I usestd::mem::replace
in a lock-free manner to replace the globalRUNTIME
. The replaced runtime waits asynchronously for a period before shutting down withrt.shutdown_background()
. Through this method, the process memory of axum, which was at 4.5GB, can be reduced to 0.5GB after several refreshes. If using mi_malloc, the memory can shrink to as low as 20MB, which is a good result with almost no loss in performance.However, this approach has its drawbacks. If the service includes long-lived connections like HTTP/2, SSE, or WebSockets, safely shutting down the server becomes challenging. Sometimes, even if
rt.metrics().num_alive_tasks()
shows zero active tasks, it doesn't necessarily mean the runtime can be safely shut down. It might be due to existing TCP connections where the client is still reading data from the TCP buffer. If the TCP connection is terminated at this point, the client's buffered data would be discarded, causing the client to fail to receive the data. Also, this method only works for axum and hyper, and is not suitable for many other web frameworks.From these tests, it seems that
tokio::runtime::Runtime
indeed has an issue with not releasing memory, though this cannot strictly be defined as a memory leak. Similar to basic data structures like VecQueue, LinkedList, and HashMap, after inserting a large number of elements, even if all elements are removed, the process memory does not immediately return to the system, unless the variable holding the data structure is destroyed. In most cases, destroying the container variable results in the process memory being released normally.To address the title, I hope that tokio could provide a safe way to refresh the runtime. I am not familiar with all the source code of the runtime, but I suspect that there may be a situation where containers are expanded but the memory is not immediately returned to the system. If there were a method to manually refresh all containers in the runtime – create new containers, transfer elements from old containers to the new ones, and then safely destroy the old containers, it might help mitigate the problem of memory not being released. Ideally, this process should be lock-free so that it doesn't impact the spawning of new tasks during the refresh.
It's important to note that during testing, you shouldn't merely use
tokio::spawn + tokio::time::sleep
for concurrency testing. For simple memory structures, the system can sometimes recover memory, but when dealing with complex coroutines in hyper or axum, you can't guarantee memory contraction. However, overall destruction and recreation of the runtime always ensures memory contraction. Additionally, the code for million-level concurrency with HTTP/2 is not complex. The only thing to note is to increase themax_concurrent_streams
parameter in hyper (I set it to 1,000,000).In summary, for the issue of memory release, perhaps only tokio can solve it.
Additionally, I wrote an article about testing rust web frameworks in Chinese, which can be translated for reading:
rust的web框架单机百万并发的性能与开销
The text was updated successfully, but these errors were encountered: