Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-812 Reduce flakiness of in_window::load_balancing_test #568

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/bson_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ where

/// The size in bytes of the provided document's entry in a BSON array at the given index.
pub(crate) fn array_entry_size_bytes(index: usize, doc_len: usize) -> u64 {
//
// * type (1 byte)
// * number of decimal digits in key
// * null terminator for the key (1 byte)
Expand Down
57 changes: 44 additions & 13 deletions src/sdam/description/topology/server_selection/test/in_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ use crate::{
selection_criteria::ReadPreference,
test::{
run_spec_test,
EventClient,
Event,
EventHandler,
FailCommandOptions,
FailPoint,
FailPointMode,
SdamEvent,
TestClient,
CLIENT_OPTIONS,
LOCK,
},
ServerType,
RUNTIME,
};

Expand Down Expand Up @@ -114,14 +117,13 @@ async fn load_balancing_test() {

let mut setup_client_options = CLIENT_OPTIONS.clone();

// TODO: RUST-1004 unskip on auth variants
if setup_client_options.credential.is_some() {
println!("skipping load_balancing_test test due to auth being enabled");
if setup_client_options.load_balanced.unwrap_or(false) {
println!("skipping load_balancing_test test due to load-balanced topology");
return;
}

if setup_client_options.load_balanced.unwrap_or(false) {
println!("skipping load_balancing_test test due to load-balanced topology");
if setup_client_options.credential.is_some() {
println!("skipping load_balancing_test test due to auth being enabled");
return;
}

Expand Down Expand Up @@ -158,8 +160,14 @@ async fn load_balancing_test() {

/// min_share is the lower bound for the % of times the the less selected server
/// was selected. max_share is the upper bound.
async fn do_test(client: &mut EventClient, min_share: f64, max_share: f64, iterations: usize) {
client.clear_cached_events();
async fn do_test(
client: &TestClient,
handler: &mut EventHandler,
min_share: f64,
max_share: f64,
iterations: usize,
) {
handler.clear_cached_events();

let mut handles: Vec<AsyncJoinHandle<()>> = Vec::new();
for _ in 0..10 {
Expand All @@ -180,7 +188,7 @@ async fn load_balancing_test() {
futures::future::join_all(handles).await;

let mut tallies: HashMap<ServerAddress, u32> = HashMap::new();
for event in client.get_command_started_events(&["find"]) {
for event in handler.get_command_started_events(&["find"]) {
*tallies.entry(event.connection.address.clone()).or_insert(0) += 1;
}

Expand All @@ -203,10 +211,33 @@ async fn load_balancing_test() {
);
}

let mut client = EventClient::new().await;
let mut handler = EventHandler::new();
let mut subscriber = handler.subscribe();
let mut options = CLIENT_OPTIONS.clone();
options.local_threshold = Duration::from_secs(30).into();
let client = TestClient::with_handler(Some(Arc::new(handler.clone())), options).await;

// wait for both servers to be discovered.
subscriber
.wait_for_event(Duration::from_secs(30), |event| {
if let Event::Sdam(SdamEvent::TopologyDescriptionChanged(event)) = event {
event
.new_description
.servers()
.into_iter()
.filter(|s| matches!(s.1.server_type(), ServerType::Mongos))
.count()
== 2
} else {
false
}
})
.await
.expect("timed out waiting for both mongoses to be discovered");
drop(subscriber);

// saturate pools
do_test(&mut client, 0.0, 0.50, 100).await;
do_test(&client, &mut handler, 0.0, 0.50, 100).await;

// enable a failpoint on one of the mongoses to slow it down
let options = FailCommandOptions::builder()
Expand All @@ -220,9 +251,9 @@ async fn load_balancing_test() {
.expect("enabling failpoint should succeed");

// verify that the lesser picked server (slower one) was picked less than 25% of the time.
do_test(&mut client, 0.05, 0.25, 10).await;
do_test(&client, &mut handler, 0.05, 0.25, 10).await;

// disable failpoint and rerun, should be back to even split
drop(fp_guard);
do_test(&mut client, 0.40, 0.50, 100).await;
do_test(&client, &mut handler, 0.40, 0.50, 100).await;
}
9 changes: 7 additions & 2 deletions src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ impl EventHandler {
pub fn connections_checked_out(&self) -> u32 {
*self.connections_checked_out.lock().unwrap()
}

pub fn clear_cached_events(&self) {
self.command_events.write().unwrap().clear();
self.cmap_events.write().unwrap().clear();
self.sdam_events.write().unwrap().clear();
}
}

impl CmapEventHandler for EventHandler {
Expand Down Expand Up @@ -538,8 +544,7 @@ impl EventClient {
}

pub fn clear_cached_events(&self) {
self.handler.command_events.write().unwrap().clear();
self.handler.cmap_events.write().unwrap().clear();
self.handler.clear_cached_events()
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TestClient {
Self::with_handler(None, options).await
}

async fn with_handler(
pub async fn with_handler(
event_handler: Option<Arc<EventHandler>>,
options: impl Into<Option<ClientOptions>>,
) -> Self {
Expand Down