Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify server pool #177

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
components: clippy, rustfmt
override: true
Expand All @@ -29,7 +29,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
components: clippy, rustfmt
override: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: actions-rs/toolchain@v1
with:
toolchain: 1.65.0
toolchain: 1.68.0
profile: minimal
override: true
- name: Check
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keywords = ["mock", "mocks", "http", "webmock", "webmocks"]
categories = ["development-tools::testing", "web-programming"]
exclude = ["/.appveyor.yml", "/.travis.yml", "/benchmarks.txt", "/docs/", "/slides.pdf"]
edition = "2021"
rust-version = "1.68"

[badges]
travis-ci = { repository = "lipanski/mockito", branch = "master" }
Expand All @@ -21,7 +22,6 @@ assert-json-diff = "2.0"
colored = { version = "2.0", optional = true }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
log = "0.4"
rand = "0.8"
regex = "1.7"
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<p align="center">
<a href="https://docs.rs/mockito"><img src="https://docs.rs/mockito/badge.svg"></a>
<a href="https://crates.io/crates/mockito"><img src="https://img.shields.io/crates/v/mockito.svg"></a>
<img src="https://img.shields.io/badge/rust%20version-%3E%3D1.65.0-orange">
<img src="https://img.shields.io/badge/rust%20version-%3E%3D1.68.0-orange">
<a href="https://crates.io/crates/mockito"><img src="https://img.shields.io/crates/d/mockito"></a>
<a href="https://github.com/lipanski/mockito/actions/workflows/tests.yml/?branch=master"><img src="https://github.com/lipanski/mockito/actions/workflows/tests.yml/badge.svg?branch=master"></a>
</p>
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn test_simple_route_mock_async() {

## Minimum supported Rust toolchain

The current minimum support Rust toolchain is **1.65.0**
The current minimum support Rust toolchain is **1.68.0**

## Contribution Guidelines

Expand All @@ -144,7 +144,7 @@ cargo test
...or run tests using a different toolchain:

```sh
rustup run --install 1.65.0 cargo test
rustup run --install 1.68.0 cargo test
```

...or run tests while disabling the default features (e.g. the colors):
Expand Down Expand Up @@ -184,7 +184,7 @@ rustup component add clippy
The linter is always run on the minimum supported Rust version:

```sh
rustup run --install 1.65.0 cargo clippy-mockito
rustup run --install 1.68.0 cargo clippy-mockito
```

### Release
Expand Down
68 changes: 18 additions & 50 deletions src/server_pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use crate::Server;
use crate::{Error, ErrorKind};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut, Drop};
use std::sync::{Arc, Mutex};
use std::sync::Mutex;
use tokio::sync::{Semaphore, SemaphorePermit};

const DEFAULT_POOL_SIZE: usize = 50;

lazy_static! {
pub(crate) static ref SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);
}
// macOS has small default ulimits. Sync it with test_server_pool()
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's test_server_pool_async and test_server_pool that might fail on a Mac now => please update the tests to use the values conditionally as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests happen to pass currently, because they have a bug — the servers vec is emptied on every iteration, so they never exhaust the pool.

But when these tests actually try to get the whole pool, they cause a deadlock whenever any other test runs in parallel that uses more than one server at a time.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, that vec was supposed to be outside the for loop 🤦

pub(crate) static SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);

///
/// A handle around a pooled `Server` object which dereferences to `Server`.
Expand Down Expand Up @@ -46,75 +43,46 @@ impl DerefMut for ServerGuard {
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(server) = self.server.take() {
// the permit is still held when recycling,
// so the next acquire will already see the recycled server
SERVER_POOL.recycle(server);
}
}
}

pub(crate) struct ServerPool {
max_size: usize,
created: Arc<Mutex<usize>>,
semaphore: Semaphore,
state: Arc<Mutex<VecDeque<Server>>>,
free_list: Mutex<VecDeque<Server>>,
}

impl ServerPool {
fn new(max_size: usize) -> ServerPool {
let created = Arc::new(Mutex::new(0));
let semaphore = Semaphore::new(max_size);
let state = Arc::new(Mutex::new(VecDeque::new()));
const fn new(max_size: usize) -> ServerPool {
ServerPool {
max_size,
created,
semaphore,
state,
semaphore: Semaphore::const_new(max_size),
free_list: Mutex::new(VecDeque::new()),
}
}

pub(crate) async fn get_async(&'static self) -> Result<ServerGuard, Error> {
// number of active permits limits the number of servers created
let permit = self
.semaphore
.acquire()
.await
.map_err(|err| Error::new_with_context(ErrorKind::Deadlock, err))?;

let should_create = {
let created_mutex = self.created.clone();
let mut created = created_mutex.lock().unwrap();
if *created < self.max_size {
*created += 1;
true
} else {
false
}
};

let server = {
if should_create {
Some(Server::try_new_with_port_async(0).await?)
} else {
None
}
// be careful not to lock locks in match - it extends scope of temporaries
let recycled = self.free_list.lock().unwrap().pop_front();
let server = match recycled {
Some(server) => server,
None => Server::try_new_with_port_async(0).await?,
};

let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();

if let Some(server) = server {
state.push_back(server);
}

if let Some(server) = state.pop_front() {
Ok(ServerGuard::new(server, permit))
} else {
Err(Error::new(ErrorKind::ServerBusy))
}
Ok(ServerGuard::new(server, permit))
}

fn recycle(&self, mut server: Server) {
server.reset();
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
state.push_back(server);
self.free_list.lock().unwrap().push_back(server);
}
}
33 changes: 23 additions & 10 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1915,35 +1915,45 @@ fn test_running_multiple_servers() {
assert_eq!("s3", body3);
}

static SERIAL_POOL_TESTS: Mutex<()> = Mutex::new(());
const DEFAULT_POOL_SIZE: usize = if cfg!(target_os = "macos") { 20 } else { 50 };

#[test]
#[allow(clippy::vec_init_then_push)]
fn test_server_pool() {
// two tests can't monopolize the pool at the same time
let _lock = SERIAL_POOL_TESTS.lock().unwrap();

// If the pool is not working, this will hit the file descriptor limit (Too many open files)
for _ in 0..20 {
// The pool size is 50, anything beyond that will block
for _ in 0..50 {
let mut servers = vec![];
let mut servers = vec![];
// Anything beyond pool size will block.
for _ in 0..DEFAULT_POOL_SIZE {
servers.push(Server::new());

let s = servers.first_mut().unwrap();
let s = servers.last_mut().unwrap();
let m = s.mock("GET", "/pool").create();
let (_, _, _) = request_with_body(&s.host_with_port(), "GET /pool", "", "");
m.assert();
}
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
#[allow(clippy::vec_init_then_push)]
async fn test_server_pool_async() {
// two tests can't monopolize the pool at the same time
tokio::task::yield_now().await;
let _lock = tokio::task::block_in_place(|| SERIAL_POOL_TESTS.lock().unwrap());

// If the pool is not working, this will hit the file descriptor limit (Too many open files)
for _ in 0..20 {
// The pool size is 50, anything beyond that will block
for _ in 0..50 {
let mut servers = vec![];
let mut servers = vec![];
// Anything beyond pool size will block
for _ in 0..DEFAULT_POOL_SIZE {
servers.push(Server::new_async().await);

let s = servers.first_mut().unwrap();
let s = servers.last_mut().unwrap();
let m = s.mock("GET", "/pool").create_async().await;
let (_, _, _) = request_with_body(&s.host_with_port(), "GET /pool", "", "");
m.assert_async().await;
Expand Down Expand Up @@ -2052,8 +2062,11 @@ async fn test_match_body_asnyc() {
assert_eq!(200, response.status());
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_join_all_async() {
tokio::task::yield_now().await;
let _lock = tokio::task::block_in_place(|| SERIAL_POOL_TESTS.lock().unwrap());

let futures = (0..10).map(|_| async {
let mut s = Server::new_async().await;
let m = s.mock("POST", "/").create_async().await;
Expand Down