Skip to content

Commit

Permalink
RUST-812 Reduce flakiness of in_window::load_balancing_test (#568)
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickfreed authored Feb 4, 2022
1 parent 2f6a9e3 commit 015c198
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 16 deletions.
57 changes: 44 additions & 13 deletions src/sdam/description/topology/server_selection/test/in_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ use crate::{
selection_criteria::ReadPreference,
test::{
run_spec_test,
EventClient,
Event,
EventHandler,
FailCommandOptions,
FailPoint,
FailPointMode,
SdamEvent,
TestClient,
CLIENT_OPTIONS,
LOCK,
},
ServerType,
RUNTIME,
};

Expand Down Expand Up @@ -114,14 +117,13 @@ async fn load_balancing_test() {

let mut setup_client_options = CLIENT_OPTIONS.clone();

// TODO: RUST-1004 unskip on auth variants
if setup_client_options.credential.is_some() {
println!("skipping load_balancing_test test due to auth being enabled");
if setup_client_options.load_balanced.unwrap_or(false) {
println!("skipping load_balancing_test test due to load-balanced topology");
return;
}

if setup_client_options.load_balanced.unwrap_or(false) {
println!("skipping load_balancing_test test due to load-balanced topology");
if setup_client_options.credential.is_some() {
println!("skipping load_balancing_test test due to auth being enabled");
return;
}

Expand Down Expand Up @@ -158,8 +160,14 @@ async fn load_balancing_test() {

/// min_share is the lower bound for the % of times the the less selected server
/// was selected. max_share is the upper bound.
async fn do_test(client: &mut EventClient, min_share: f64, max_share: f64, iterations: usize) {
client.clear_cached_events();
async fn do_test(
client: &TestClient,
handler: &mut EventHandler,
min_share: f64,
max_share: f64,
iterations: usize,
) {
handler.clear_cached_events();

let mut handles: Vec<AsyncJoinHandle<()>> = Vec::new();
for _ in 0..10 {
Expand All @@ -180,7 +188,7 @@ async fn load_balancing_test() {
futures::future::join_all(handles).await;

let mut tallies: HashMap<ServerAddress, u32> = HashMap::new();
for event in client.get_command_started_events(&["find"]) {
for event in handler.get_command_started_events(&["find"]) {
*tallies.entry(event.connection.address.clone()).or_insert(0) += 1;
}

Expand All @@ -203,10 +211,33 @@ async fn load_balancing_test() {
);
}

let mut client = EventClient::new().await;
let mut handler = EventHandler::new();
let mut subscriber = handler.subscribe();
let mut options = CLIENT_OPTIONS.clone();
options.local_threshold = Duration::from_secs(30).into();
let client = TestClient::with_handler(Some(Arc::new(handler.clone())), options).await;

// wait for both servers to be discovered.
subscriber
.wait_for_event(Duration::from_secs(30), |event| {
if let Event::Sdam(SdamEvent::TopologyDescriptionChanged(event)) = event {
event
.new_description
.servers()
.into_iter()
.filter(|s| matches!(s.1.server_type(), ServerType::Mongos))
.count()
== 2
} else {
false
}
})
.await
.expect("timed out waiting for both mongoses to be discovered");
drop(subscriber);

// saturate pools
do_test(&mut client, 0.0, 0.50, 100).await;
do_test(&client, &mut handler, 0.0, 0.50, 100).await;

// enable a failpoint on one of the mongoses to slow it down
let options = FailCommandOptions::builder()
Expand All @@ -220,9 +251,9 @@ async fn load_balancing_test() {
.expect("enabling failpoint should succeed");

// verify that the lesser picked server (slower one) was picked less than 25% of the time.
do_test(&mut client, 0.05, 0.25, 10).await;
do_test(&client, &mut handler, 0.05, 0.25, 10).await;

// disable failpoint and rerun, should be back to even split
drop(fp_guard);
do_test(&mut client, 0.40, 0.50, 100).await;
do_test(&client, &mut handler, 0.40, 0.50, 100).await;
}
9 changes: 7 additions & 2 deletions src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ impl EventHandler {
pub fn connections_checked_out(&self) -> u32 {
*self.connections_checked_out.lock().unwrap()
}

pub fn clear_cached_events(&self) {
self.command_events.write().unwrap().clear();
self.cmap_events.write().unwrap().clear();
self.sdam_events.write().unwrap().clear();
}
}

impl CmapEventHandler for EventHandler {
Expand Down Expand Up @@ -538,8 +544,7 @@ impl EventClient {
}

pub fn clear_cached_events(&self) {
self.handler.command_events.write().unwrap().clear();
self.handler.cmap_events.write().unwrap().clear();
self.handler.clear_cached_events()
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TestClient {
Self::with_handler(None, options).await
}

async fn with_handler(
pub async fn with_handler(
event_handler: Option<Arc<EventHandler>>,
options: impl Into<Option<ClientOptions>>,
) -> Self {
Expand Down

0 comments on commit 015c198

Please sign in to comment.