diff --git a/src/bson_util/mod.rs b/src/bson_util/mod.rs index cc24e8022..c1057e297 100644 --- a/src/bson_util/mod.rs +++ b/src/bson_util/mod.rs @@ -191,7 +191,6 @@ where /// The size in bytes of the provided document's entry in a BSON array at the given index. pub(crate) fn array_entry_size_bytes(index: usize, doc_len: usize) -> u64 { - // // * type (1 byte) // * number of decimal digits in key // * null terminator for the key (1 byte) diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index 6e6a9d668..813f79d8f 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -13,14 +13,17 @@ use crate::{ selection_criteria::ReadPreference, test::{ run_spec_test, - EventClient, + Event, + EventHandler, FailCommandOptions, FailPoint, FailPointMode, + SdamEvent, TestClient, CLIENT_OPTIONS, LOCK, }, + ServerType, RUNTIME, }; @@ -114,14 +117,13 @@ async fn load_balancing_test() { let mut setup_client_options = CLIENT_OPTIONS.clone(); - // TODO: RUST-1004 unskip on auth variants - if setup_client_options.credential.is_some() { - println!("skipping load_balancing_test test due to auth being enabled"); + if setup_client_options.load_balanced.unwrap_or(false) { + println!("skipping load_balancing_test test due to load-balanced topology"); return; } - if setup_client_options.load_balanced.unwrap_or(false) { - println!("skipping load_balancing_test test due to load-balanced topology"); + if setup_client_options.credential.is_some() { + println!("skipping load_balancing_test test due to auth being enabled"); return; } @@ -158,8 +160,14 @@ async fn load_balancing_test() { /// min_share is the lower bound for the % of times the the less selected server /// was selected. max_share is the upper bound. - async fn do_test(client: &mut EventClient, min_share: f64, max_share: f64, iterations: usize) { - client.clear_cached_events(); + async fn do_test( + client: &TestClient, + handler: &mut EventHandler, + min_share: f64, + max_share: f64, + iterations: usize, + ) { + handler.clear_cached_events(); let mut handles: Vec> = Vec::new(); for _ in 0..10 { @@ -180,7 +188,7 @@ async fn load_balancing_test() { futures::future::join_all(handles).await; let mut tallies: HashMap = HashMap::new(); - for event in client.get_command_started_events(&["find"]) { + for event in handler.get_command_started_events(&["find"]) { *tallies.entry(event.connection.address.clone()).or_insert(0) += 1; } @@ -203,10 +211,33 @@ async fn load_balancing_test() { ); } - let mut client = EventClient::new().await; + let mut handler = EventHandler::new(); + let mut subscriber = handler.subscribe(); + let mut options = CLIENT_OPTIONS.clone(); + options.local_threshold = Duration::from_secs(30).into(); + let client = TestClient::with_handler(Some(Arc::new(handler.clone())), options).await; + + // wait for both servers to be discovered. + subscriber + .wait_for_event(Duration::from_secs(30), |event| { + if let Event::Sdam(SdamEvent::TopologyDescriptionChanged(event)) = event { + event + .new_description + .servers() + .into_iter() + .filter(|s| matches!(s.1.server_type(), ServerType::Mongos)) + .count() + == 2 + } else { + false + } + }) + .await + .expect("timed out waiting for both mongoses to be discovered"); + drop(subscriber); // saturate pools - do_test(&mut client, 0.0, 0.50, 100).await; + do_test(&client, &mut handler, 0.0, 0.50, 100).await; // enable a failpoint on one of the mongoses to slow it down let options = FailCommandOptions::builder() @@ -220,9 +251,9 @@ async fn load_balancing_test() { .expect("enabling failpoint should succeed"); // verify that the lesser picked server (slower one) was picked less than 25% of the time. - do_test(&mut client, 0.05, 0.25, 10).await; + do_test(&client, &mut handler, 0.05, 0.25, 10).await; // disable failpoint and rerun, should be back to even split drop(fp_guard); - do_test(&mut client, 0.40, 0.50, 100).await; + do_test(&client, &mut handler, 0.40, 0.50, 100).await; } diff --git a/src/test/util/event.rs b/src/test/util/event.rs index 3355b57a9..5195cd564 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -219,6 +219,12 @@ impl EventHandler { pub fn connections_checked_out(&self) -> u32 { *self.connections_checked_out.lock().unwrap() } + + pub fn clear_cached_events(&self) { + self.command_events.write().unwrap().clear(); + self.cmap_events.write().unwrap().clear(); + self.sdam_events.write().unwrap().clear(); + } } impl CmapEventHandler for EventHandler { @@ -538,8 +544,7 @@ impl EventClient { } pub fn clear_cached_events(&self) { - self.handler.command_events.write().unwrap().clear(); - self.handler.cmap_events.write().unwrap().clear(); + self.handler.clear_cached_events() } } diff --git a/src/test/util/mod.rs b/src/test/util/mod.rs index 7ba8ed82a..26a1baf70 100644 --- a/src/test/util/mod.rs +++ b/src/test/util/mod.rs @@ -61,7 +61,7 @@ impl TestClient { Self::with_handler(None, options).await } - async fn with_handler( + pub async fn with_handler( event_handler: Option>, options: impl Into>, ) -> Self {