diff --git a/.evergreen/check-clippy.sh b/.evergreen/check-clippy.sh index 9ee3cfef5..8b39d98eb 100755 --- a/.evergreen/check-clippy.sh +++ b/.evergreen/check-clippy.sh @@ -10,3 +10,4 @@ cargo clippy --all-targets -p mongodb -- -D warnings cargo clippy --all-targets -p mongodb --features zstd-compression,snappy-compression,zlib-compression -- -D warnings cargo clippy --all-targets --no-default-features --features async-std-runtime -p mongodb -- -D warnings cargo clippy --all-targets --no-default-features --features sync -p mongodb -- -D warnings +cargo clippy --all-targets --features tokio-sync -p mongodb -- -D warnings diff --git a/.evergreen/check-rustdoc.sh b/.evergreen/check-rustdoc.sh index 17ca1d3e2..23a587278 100755 --- a/.evergreen/check-rustdoc.sh +++ b/.evergreen/check-rustdoc.sh @@ -6,3 +6,4 @@ set -o errexit cargo +nightly rustdoc -- -D warnings --cfg docsrs cargo +nightly rustdoc --no-default-features --features async-std-runtime -- -D warnings --cfg docsrs cargo +nightly rustdoc --no-default-features --features sync -- -D warnings --cfg docsrs +cargo +nightly rustdoc --features tokio-sync -- -D warnings --cfg docsrs diff --git a/.evergreen/compile-only-tokio.sh b/.evergreen/compile-only-tokio.sh index bfd0d0def..0c039e2e0 100755 --- a/.evergreen/compile-only-tokio.sh +++ b/.evergreen/compile-only-tokio.sh @@ -13,3 +13,4 @@ if [[ $RUST_VERSION == "nightly" ]]; then fi rustup run $RUST_VERSION cargo build --features $FEATURE_FLAGS +rustup run $RUST_VERSION cargo build --features tokio-sync,$FEATURE_FLAGS diff --git a/.evergreen/run-tokio-tests.sh b/.evergreen/run-tokio-tests.sh index 42572f4e8..ea69f3677 100755 --- a/.evergreen/run-tokio-tests.sh +++ b/.evergreen/run-tokio-tests.sh @@ -15,4 +15,10 @@ FEATURE_FLAGS="zstd-compression,snappy-compression,zlib-compression" echo "cargo test options: --features $FEATURE_FLAGS ${OPTIONS}" RUST_BACKTRACE=1 cargo test --features $FEATURE_FLAGS $OPTIONS | tee results.json -cat results.json | cargo2junit > results.xml +cat results.json | cargo2junit > async-tests.xml +RUST_BACKTRACE=1 cargo test sync --features tokio-sync,$FEATURE_FLAGS $OPTIONS | tee sync-tests.json +cat sync-tests.json | cargo2junit > sync-tests.xml +RUST_BACKTRACE=1 cargo test --doc sync --features tokio-sync,$FEATURE_FLAGS $OPTIONS | tee sync-doc-tests.json +cat sync-doc-tests.json | cargo2junit > sync-doc-tests.xml + +junit-report-merger results.xml async-tests.xml sync-tests.xml sync-doc-tests.xml diff --git a/Cargo.toml b/Cargo.toml index d95ac2b96..fc299f06b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ default = ["tokio-runtime"] tokio-runtime = ["tokio/macros", "tokio/net", "tokio/rt", "tokio/time", "serde_bytes"] async-std-runtime = ["async-std", "async-std/attributes", "async-std-resolver", "tokio-util/compat"] sync = ["async-std-runtime"] +tokio-sync = ["tokio-runtime"] # Enable support for v0.4 of the chrono crate in the public API of the BSON library. bson-chrono-0_4 = ["bson/chrono-0_4"] diff --git a/src/change_stream/mod.rs b/src/change_stream/mod.rs index edbb27595..86eab6334 100644 --- a/src/change_stream/mod.rs +++ b/src/change_stream/mod.rs @@ -45,7 +45,7 @@ use crate::{ /// A `ChangeStream` can be iterated like any other [`Stream`]: /// /// ``` -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use futures::stream::StreamExt; /// # use mongodb::{Client, error::Result, bson::doc, /// # change_stream::event::ChangeStreamEvent}; diff --git a/src/client/auth/mod.rs b/src/client/auth/mod.rs index d977b4b1b..f2b436da5 100644 --- a/src/client/auth/mod.rs +++ b/src/client/auth/mod.rs @@ -352,7 +352,7 @@ pub struct Credential { } impl Credential { - #[cfg(all(test, not(feature = "sync")))] + #[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] pub(crate) fn into_document(mut self) -> Document { use crate::bson::Bson; diff --git a/src/client/mod.rs b/src/client/mod.rs index 73469bcf3..05c85f277 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -52,14 +52,14 @@ const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30); /// so it can safely be shared across threads or async tasks. For example: /// /// ```rust -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use mongodb::{bson::Document, Client, error::Result}; /// # #[cfg(feature = "async-std-runtime")] /// # use async_std::task; /// # #[cfg(feature = "tokio-runtime")] /// # use tokio::task; /// # -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # async fn start_workers() -> Result<()> { /// let client = Client::with_uri_str("mongodb://example.com").await?; /// @@ -431,7 +431,7 @@ impl Client { } } - #[cfg(all(test, not(feature = "sync")))] + #[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] pub(crate) async fn get_hosts(&self) -> Vec { let servers = self.inner.topology.servers().await; servers diff --git a/src/client/options/mod.rs b/src/client/options/mod.rs index cace3ed38..c876cd821 100644 --- a/src/client/options/mod.rs +++ b/src/client/options/mod.rs @@ -1,4 +1,4 @@ -#[cfg(all(test, not(feature = "sync")))] +#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] mod test; mod resolver_config; @@ -54,6 +54,9 @@ use crate::{ srv::{OriginalSrvInfo, SrvResolver}, }; +#[cfg(any(feature = "sync", feature = "tokio-sync"))] +use crate::runtime; + pub use resolver_config::ResolverConfig; const DEFAULT_PORT: u16 = 27017; @@ -244,7 +247,7 @@ impl ServerAddress { }) } - #[cfg(all(test, not(feature = "sync")))] + #[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] pub(crate) fn into_document(self) -> Document { match self { Self::Tcp { host, port } => { @@ -1038,17 +1041,17 @@ impl ClientOptions { /// /// Note: if the `sync` feature is enabled, then this method will be replaced with [the sync /// version](#method.parse-1). - #[cfg(not(feature = "sync"))] + #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] pub async fn parse(s: impl AsRef) -> Result { Self::parse_uri(s, None).await } /// This method will be present if the `sync` feature is enabled. It's otherwise identical to /// [the async version](#method.parse) - #[cfg(any(feature = "sync", docsrs))] - #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + #[cfg(any(feature = "sync", feature = "tokio-sync", docsrs))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "tokio-sync"))))] pub fn parse(s: impl AsRef) -> Result { - crate::RUNTIME.block_on(Self::parse_uri(s.as_ref(), None)) + runtime::block_on(Self::parse_uri(s.as_ref(), None)) } /// Parses a MongoDB connection string into a `ClientOptions` struct. @@ -1065,7 +1068,7 @@ impl ClientOptions { /// /// Note: if the `sync` feature is enabled, then this method will be replaced with [the sync /// version](#method.parse_with_resolver_config-1). - #[cfg(not(feature = "sync"))] + #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] pub async fn parse_with_resolver_config( uri: impl AsRef, resolver_config: ResolverConfig, @@ -1075,10 +1078,10 @@ impl ClientOptions { /// This method will be present if the `sync` feature is enabled. It's otherwise identical to /// [the async version](#method.parse_with_resolver_config) - #[cfg(any(feature = "sync", docsrs))] - #[cfg_attr(docsrs, doc(cfg(feature = "sync")))] + #[cfg(any(feature = "sync", feature = "tokio-sync", docsrs))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "tokio-sync"))))] pub fn parse_with_resolver_config(uri: &str, resolver_config: ResolverConfig) -> Result { - crate::RUNTIME.block_on(Self::parse_uri(uri, Some(resolver_config))) + runtime::block_on(Self::parse_uri(uri, Some(resolver_config))) } /// Populate this `ClientOptions` from the given URI, optionally using the resolver config for @@ -2015,7 +2018,7 @@ impl ClientOptionsParser { } } -#[cfg(all(test, not(feature = "sync")))] +#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))] mod tests { use std::time::Duration; diff --git a/src/client/session/mod.rs b/src/client/session/mod.rs index 76eb37074..e873a49cf 100644 --- a/src/client/session/mod.rs +++ b/src/client/session/mod.rs @@ -18,10 +18,10 @@ use crate::{ error::{ErrorKind, Result}, operation::{AbortTransaction, CommitTransaction, Operation}, options::{SessionOptions, TransactionOptions}, + runtime, sdam::{ServerInfo, TransactionSupportStatus}, selection_criteria::SelectionCriteria, Client, - RUNTIME, }; pub use cluster_time::ClusterTime; pub(super) use pool::ServerSessionPool; @@ -607,14 +607,14 @@ impl Drop for ClientSession { snapshot_time: self.snapshot_time, operation_time: self.operation_time, }; - RUNTIME.execute(async move { + runtime::execute(async move { let mut session: ClientSession = dropped_session.into(); let _result = session.abort_transaction().await; }); } else { let client = self.client.clone(); let server_session = self.server_session.clone(); - RUNTIME.execute(async move { + runtime::execute(async move { client.check_in_server_session(server_session).await; }); } diff --git a/src/client/session/test/mod.rs b/src/client/session/test/mod.rs index 4278d3241..ec4a6fb8b 100644 --- a/src/client/session/test/mod.rs +++ b/src/client/session/test/mod.rs @@ -11,11 +11,11 @@ use crate::{ coll::options::{CountOptions, InsertManyOptions}, error::Result, options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern}, + runtime, sdam::ServerInfo, selection_criteria::SelectionCriteria, test::{log_uncaptured, EventClient, TestClient, CLIENT_OPTIONS, LOCK}, Collection, - RUNTIME, }; /// Macro defining a closure that returns a future populated by an operation on the @@ -219,10 +219,10 @@ async fn pool_is_lifo() { // End both sessions, waiting after each to ensure the background task got scheduled // in the Drop impls. drop(a); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; drop(b); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; let s1 = client.start_session(None).await.unwrap(); assert_eq!(s1.id(), &b_id); @@ -368,7 +368,7 @@ async fn implicit_session_returned_after_immediate_exhaust() { .expect("insert should succeed"); // wait for sessions to be returned to the pool and clear them out. - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; client.clear_session_pool().await; let mut cursor = coll.find(doc! {}, None).await.expect("find should succeed"); @@ -382,7 +382,7 @@ async fn implicit_session_returned_after_immediate_exhaust() { .as_document() .expect("session id should be a document"); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert!( client.is_session_checked_in(session_id).await, "session not checked back in" @@ -413,7 +413,7 @@ async fn implicit_session_returned_after_exhaust_by_get_more() { } // wait for sessions to be returned to the pool and clear them out. - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; client.clear_session_pool().await; let options = FindOptions::builder().batch_size(3).build(); @@ -434,7 +434,7 @@ async fn implicit_session_returned_after_exhaust_by_get_more() { .as_document() .expect("session id should be a document"); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert!( client.is_session_checked_in(session_id).await, "session not checked back in" diff --git a/src/cmap/establish/handshake/mod.rs b/src/cmap/establish/handshake/mod.rs index 6400d41cc..857364eda 100644 --- a/src/cmap/establish/handshake/mod.rs +++ b/src/cmap/establish/handshake/mod.rs @@ -18,7 +18,7 @@ use crate::{ sdam::Topology, }; -#[cfg(feature = "tokio-runtime")] +#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))] const RUNTIME_NAME: &str = "tokio"; #[cfg(all(feature = "async-std-runtime", not(feature = "sync")))] @@ -27,6 +27,9 @@ const RUNTIME_NAME: &str = "async-std"; #[cfg(feature = "sync")] const RUNTIME_NAME: &str = "sync (with async-std)"; +#[cfg(feature = "tokio-sync")] +const RUNTIME_NAME: &str = "sync (with tokio)"; + #[derive(Clone, Debug)] struct ClientMetadata { application: Option, diff --git a/src/cmap/test/event.rs b/src/cmap/test/event.rs index 5cb1ead81..037eb1657 100644 --- a/src/cmap/test/event.rs +++ b/src/cmap/test/event.rs @@ -5,7 +5,7 @@ use std::{ use serde::{de::Unexpected, Deserialize, Deserializer}; -use crate::{event::cmap::*, options::ServerAddress, RUNTIME}; +use crate::{event::cmap::*, options::ServerAddress, runtime}; use tokio::sync::broadcast::error::{RecvError, SendError}; #[derive(Clone, Debug)] @@ -98,21 +98,20 @@ impl EventSubscriber<'_> { where F: Fn(&Event) -> bool, { - RUNTIME - .timeout(timeout, async { - loop { - match self.receiver.recv().await { - Ok(event) if filter(&event) => return event.into(), - // the channel hit capacity and the channnel will skip a few to catch up. - Err(RecvError::Lagged(_)) => continue, - Err(_) => return None, - _ => continue, - } + runtime::timeout(timeout, async { + loop { + match self.receiver.recv().await { + Ok(event) if filter(&event) => return event.into(), + // the channel hit capacity and the channnel will skip a few to catch up. + Err(RecvError::Lagged(_)) => continue, + Err(_) => return None, + _ => continue, } - }) - .await - .ok() - .flatten() + } + }) + .await + .ok() + .flatten() } /// Returns the received events without waiting for any more. diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index bb78d2a69..62fc38bf4 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -10,6 +10,7 @@ use crate::{ cmap::{options::ConnectionPoolOptions, Command, ConnectionPool}, event::cmap::{CmapEventHandler, ConnectionClosedReason}, operation::CommandResponse, + runtime, sdam::ServerUpdateSender, selection_criteria::ReadPreference, test::{ @@ -21,7 +22,6 @@ use crate::{ CLIENT_OPTIONS, LOCK, }, - RUNTIME, }; use semver::VersionReq; use std::{sync::Arc, time::Duration}; @@ -130,11 +130,9 @@ async fn concurrent_connections() { let tasks = (0..2).map(|_| { let pool_clone = pool.clone(); - RUNTIME - .spawn(async move { - pool_clone.check_out().await.unwrap(); - }) - .unwrap() + runtime::spawn(async move { + pool_clone.check_out().await.unwrap(); + }) }); futures::future::join_all(tasks).await; diff --git a/src/cmap/test/mod.rs b/src/cmap/test/mod.rs index b9bbc687a..fd359ab72 100644 --- a/src/cmap/test/mod.rs +++ b/src/cmap/test/mod.rs @@ -16,6 +16,7 @@ use crate::{ error::{Error, ErrorKind, Result}, event::cmap::ConnectionPoolOptions as EventOptions, options::TlsOptions, + runtime, runtime::AsyncJoinHandle, sdam::{ServerUpdate, ServerUpdateSender}, test::{ @@ -30,7 +31,6 @@ use crate::{ LOCK, SERVER_API, }, - RUNTIME, }; use bson::doc; @@ -101,14 +101,12 @@ struct CmapThread { impl CmapThread { fn start(state: Arc) -> Self { let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); - let handle = RUNTIME - .spawn(async move { - while let Some(operation) = receiver.recv().await { - operation.execute(state.clone()).await?; - } - Ok(()) - }) - .unwrap(); + let handle = runtime::spawn(async move { + while let Some(operation) = receiver.recv().await { + operation.execute(state.clone()).await?; + } + Ok(()) + }); Self { dispatcher: sender, @@ -166,7 +164,7 @@ impl Executor { // Mock a monitoring task responding to errors reported by the pool. let manager = pool.manager.clone(); - RUNTIME.execute(async move { + runtime::execute(async move { while let Some(update) = update_receiver.recv().await { match update.message() { ServerUpdate::Error { error, .. } => { @@ -226,7 +224,7 @@ impl Operation { /// Execute this operation. async fn execute(self, state: Arc) -> Result<()> { match self { - Operation::Wait { ms } => RUNTIME.delay_for(Duration::from_millis(ms)).await, + Operation::Wait { ms } => runtime::delay_for(Duration::from_millis(ms)).await, Operation::WaitForThread { target } => { state .threads @@ -245,11 +243,10 @@ impl Operation { let event_name = event.clone(); let task = async move { while state.count_events(&event) < count { - RUNTIME.delay_for(Duration::from_millis(100)).await; + runtime::delay_for(Duration::from_millis(100)).await; } }; - RUNTIME - .timeout(timeout.unwrap_or(EVENT_TIMEOUT), task) + runtime::timeout(timeout.unwrap_or(EVENT_TIMEOUT), task) .await .unwrap_or_else(|_| { panic!("waiting for {} {} event(s) timed out", count, event_name) diff --git a/src/cmap/worker.rs b/src/cmap/worker.rs index 4b2fa7657..a6861628d 100644 --- a/src/cmap/worker.rs +++ b/src/cmap/worker.rs @@ -32,9 +32,8 @@ use crate::{ PoolReadyEvent, }, options::ServerAddress, - runtime::HttpClient, + runtime, sdam::ServerUpdateSender, - RUNTIME, }; use std::{ @@ -143,7 +142,7 @@ impl ConnectionPoolWorker { /// and close the pool. pub(super) fn start( address: ServerAddress, - http_client: HttpClient, + http_client: runtime::HttpClient, server_updater: ServerUpdateSender, options: Option, ) -> (PoolManager, ConnectionRequester, PoolGenerationSubscriber) { @@ -241,7 +240,7 @@ impl ConnectionPoolWorker { server_updater, }; - RUNTIME.execute(async move { + runtime::execute(async move { worker.execute().await; }); @@ -252,7 +251,7 @@ impl ConnectionPoolWorker { /// dropped. Once all handles are dropped, the pool will close any available connections and /// emit a pool closed event. async fn execute(mut self) { - let mut maintenance_interval = RUNTIME.interval(self.maintenance_frequency); + let mut maintenance_interval = runtime::interval(self.maintenance_frequency); loop { let task = tokio::select! { @@ -391,7 +390,7 @@ impl ConnectionPoolWorker { let manager = self.manager.clone(); let mut server_updater = self.server_updater.clone(); - let handle = RUNTIME.spawn(async move { + let handle = runtime::spawn(async move { let mut establish_result = establish_connection( &establisher, pending_connection, @@ -411,16 +410,6 @@ impl ConnectionPoolWorker { establish_result }); - let handle = match handle { - Some(h) => h, - - // The async runtime was dropped which means nothing will be waiting - // on the request, so we can just exit. - None => { - return; - } - }; - // this only fails if the other end stopped listening (e.g. due to timeout), in // which case we just let the connection establish in the background. let _: std::result::Result<_, _> = @@ -613,7 +602,7 @@ impl ConnectionPoolWorker { let manager = self.manager.clone(); let establisher = self.establisher.clone(); let mut updater = self.server_updater.clone(); - RUNTIME.execute(async move { + runtime::execute(async move { let connection = establish_connection( &establisher, pending_connection, diff --git a/src/coll/mod.rs b/src/coll/mod.rs index cf237281b..a4b217ba1 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -88,7 +88,7 @@ use crate::{ /// # #[cfg(feature = "tokio-runtime")] /// # use tokio::task; /// # -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # async fn start_workers() -> Result<()> { /// # use mongodb::Client; /// # diff --git a/src/cursor/common.rs b/src/cursor/common.rs index 81353d2d9..f31f4f7a8 100644 --- a/src/cursor/common.rs +++ b/src/cursor/common.rs @@ -21,9 +21,9 @@ use crate::{ operation, options::ServerAddress, results::GetMoreResult, + runtime, Client, Namespace, - RUNTIME, }; /// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`. @@ -472,7 +472,7 @@ pub(super) fn kill_cursor( let coll = client .database(ns.db.as_str()) .collection::(ns.coll.as_str()); - RUNTIME.execute(async move { + runtime::execute(async move { if !pinned_conn.is_invalid() { let _ = coll .kill_cursor(cursor_id, pinned_conn.handle(), drop_address) diff --git a/src/db/mod.rs b/src/db/mod.rs index 99180d548..3a2429f59 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -46,7 +46,7 @@ use crate::{ /// /// ```rust /// -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use mongodb::{bson::Document, Client, error::Result}; /// # #[cfg(feature = "async-std-runtime")] /// # use async_std::task; @@ -54,7 +54,7 @@ use crate::{ /// # use tokio::task; /// # /// # -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # async fn start_workers() -> Result<()> { /// # let client = Client::with_uri_str("mongodb://example.com").await?; /// let db = client.database("items"); diff --git a/src/event/cmap.rs b/src/event/cmap.rs index ea11f1947..149a08420 100644 --- a/src/event/cmap.rs +++ b/src/event/cmap.rs @@ -255,9 +255,9 @@ fn default_connection_id() -> u32 { /// # }, /// # options::ClientOptions, /// # }; -/// # #[cfg(feature = "sync")] +/// # #[cfg(any(feature = "sync", feature = "tokio-sync"))] /// # use mongodb::sync::Client; -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use mongodb::Client; /// # /// struct FailedCheckoutLogger; diff --git a/src/event/command.rs b/src/event/command.rs index 25ceb5cf9..93cb77038 100644 --- a/src/event/command.rs +++ b/src/event/command.rs @@ -99,9 +99,9 @@ pub struct CommandFailedEvent { /// # }, /// # options::ClientOptions, /// # }; -/// # #[cfg(feature = "sync")] +/// # #[cfg(any(feature = "sync", feature = "tokio-sync"))] /// # use mongodb::sync::Client; -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use mongodb::Client; /// # /// struct FailedCommandLogger; diff --git a/src/event/sdam/mod.rs b/src/event/sdam/mod.rs index 7c33e18ad..583c65e20 100644 --- a/src/event/sdam/mod.rs +++ b/src/event/sdam/mod.rs @@ -146,9 +146,9 @@ pub struct ServerHeartbeatFailedEvent { /// # }, /// # options::ClientOptions, /// # }; -/// # #[cfg(feature = "sync")] +/// # #[cfg(any(feature = "sync", feature = "tokio-sync"))] /// # use mongodb::sync::Client; -/// # #[cfg(not(feature = "sync"))] +/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] /// # use mongodb::Client; /// # /// struct FailedHeartbeatLogger; diff --git a/src/lib.rs b/src/lib.rs index bd4731902..99d7f8f6e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -201,7 +201,7 @@ //! crate's top level like in the async API. The sync API calls through to the async API internally //! though, so it looks and behaves similarly to it. //! ```no_run -//! # #[cfg(feature = "sync")] +//! # #[cfg(any(feature = "sync", feature = "tokio-sync"))] //! # { //! use mongodb::{ //! bson::doc, @@ -266,7 +266,7 @@ //! # bson::doc, //! # }; //! # -//! # #[cfg(all(not(feature = "sync"), feature = "tokio-runtime"))] +//! # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync"), feature = "tokio-runtime"))] //! # async fn foo() -> std::result::Result<(), Box> { //! # //! # let client = Client::with_uri_str("mongodb://example.com").await?; @@ -330,8 +330,8 @@ pub(crate) mod runtime; mod sdam; mod selection_criteria; mod srv; -#[cfg(any(feature = "sync", docsrs))] -#[cfg_attr(docsrs, doc(cfg(feature = "sync")))] +#[cfg(any(feature = "sync", feature = "tokio-sync", docsrs))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "tokio-sync"))))] pub mod sync; #[cfg(test)] mod test; @@ -340,7 +340,7 @@ mod test; #[macro_use] extern crate derive_more; -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] pub use crate::{ client::{Client, session::ClientSession}, coll::Collection, @@ -348,7 +348,7 @@ pub use crate::{ db::Database, }; -#[cfg(feature = "sync")] +#[cfg(any(feature = "sync", feature = "tokio-sync"))] pub(crate) use crate::{ client::{Client, session::ClientSession}, coll::Collection, @@ -361,7 +361,8 @@ pub use {coll::Namespace, index::IndexModel, client::session::ClusterTime, sdam: #[cfg(all( feature = "tokio-runtime", feature = "async-std-runtime", - not(feature = "sync") + not(feature = "sync"), + not(feature = "tokio-sync"), ))] compile_error!( "`tokio-runtime` and `async-std-runtime` can't both be enabled; either disable \ @@ -370,18 +371,21 @@ compile_error!( #[cfg(all(feature = "tokio-runtime", feature = "sync"))] compile_error!( - "`tokio-runtime` and `sync` can't both be enabled; either disable `sync` or set \ - `default-features = false` in your Cargo.toml" + "`tokio-runtime` and `sync` can't both be enabled; either disable `sync` and enable \ + `tokio-sync` to use the sync API with tokio, or set `default-features = false` in \ + your Cargo.toml to use the sync API with async-std" ); -#[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] +#[cfg(all(feature = "async-std-runtime", feature = "tokio-sync"))] compile_error!( - "one of `tokio-runtime`, `async-std-runtime`, or `sync` must be enabled; either enable \ - `default-features`, or enable one of those features specifically in your Cargo.toml" + "`async-std-runtime` and `tokio-sync` can't both be enabled; either disable `tokio-sync` \ + and enable `sync` to use the sync API with async-std, or disable `async-std-runtime` to \ + use the sync API with tokio" ); -#[cfg(all(feature = "tokio-runtime", not(feature = "async-std-runtime")))] -pub(crate) static RUNTIME: runtime::AsyncRuntime = runtime::AsyncRuntime::Tokio; - -#[cfg(all(not(feature = "tokio-runtime"), feature = "async-std-runtime"))] -pub(crate) static RUNTIME: runtime::AsyncRuntime = runtime::AsyncRuntime::AsyncStd; +#[cfg(all(not(feature = "tokio-runtime"), not(feature = "async-std-runtime")))] +compile_error!( + "one of `tokio-runtime`, `async-std-runtime`, `sync`, or `tokio-sync` must be enabled; \ + either enable `default-features`, or enable one of those features specifically in your \ + Cargo.toml" +); diff --git a/src/runtime/interval.rs b/src/runtime/interval.rs index 3db0108d9..34b8543fc 100644 --- a/src/runtime/interval.rs +++ b/src/runtime/interval.rs @@ -1,6 +1,6 @@ use std::time::{Duration, Instant}; -use crate::RUNTIME; +use crate::runtime; /// Interval implementation using async-std. /// For tokio, we just use tokio::time::Interval. @@ -21,7 +21,7 @@ impl Interval { pub(crate) async fn tick(&mut self) -> Instant { match self.interval.checked_sub(self.last_time.elapsed()) { Some(duration) => { - RUNTIME.delay_for(duration).await; + runtime::delay_for(duration).await; self.last_time = Instant::now(); } None => self.last_time = Instant::now(), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3a22ce7e7..b6c1dcd94 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -25,177 +25,135 @@ use interval::Interval; #[cfg(feature = "tokio-runtime")] use tokio::time::Interval; -/// An abstract handle to the async runtime. -#[derive(Clone, Copy, Debug)] -pub(crate) enum AsyncRuntime { - /// Represents the `tokio` runtime. - #[cfg(feature = "tokio-runtime")] - Tokio, +/// Spawn a task in the background to run a future. +/// +/// If the runtime is still running, this will return a handle to the background task. +/// Otherwise, it will return `None`. As a result, this must be called from an async block +/// or function running on a runtime. +#[allow(clippy::unnecessary_wraps)] +pub(crate) fn spawn(fut: F) -> AsyncJoinHandle +where + F: Future + Send + 'static, + O: Send + 'static, +{ + #[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))] + { + let handle = tokio::runtime::Handle::current(); + AsyncJoinHandle::Tokio(handle.spawn(fut)) + } + + #[cfg(feature = "tokio-sync")] + { + let handle = crate::sync::TOKIO_RUNTIME.handle(); + AsyncJoinHandle::Tokio(handle.spawn(fut)) + } - /// Represents the `async-std` runtime. #[cfg(feature = "async-std-runtime")] - AsyncStd, + { + AsyncJoinHandle::AsyncStd(async_std::task::spawn(fut)) + } +} + +/// Spawn a task in the background to run a future. +/// +/// Note: this must only be called from an async block or function running on a runtime. +pub(crate) fn execute(fut: F) +where + F: Future + Send + 'static, + O: Send + 'static, +{ + spawn(fut); } -impl AsyncRuntime { - /// Spawn a task in the background to run a future. - /// - /// If the runtime is still running, this will return a handle to the background task. - /// Otherwise, it will return `None`. As a result, this must be called from an async block - /// or function running on a runtime. - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn spawn(self, fut: F) -> Option> - where - F: Future + Send + 'static, - O: Send + 'static, +#[cfg(any(test, feature = "sync", feature = "tokio-sync"))] +pub(crate) fn block_on(fut: F) -> T +where + F: Future, +{ + #[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))] { - match self { - #[cfg(feature = "tokio-runtime")] - Self::Tokio => match TokioCallingContext::current() { - TokioCallingContext::Async(handle) => { - Some(AsyncJoinHandle::Tokio(handle.spawn(fut))) - } - TokioCallingContext::Sync => None, - }, - - #[cfg(feature = "async-std-runtime")] - Self::AsyncStd => Some(AsyncJoinHandle::AsyncStd(async_std::task::spawn(fut))), - } + tokio::task::block_in_place(|| futures::executor::block_on(fut)) } - /// Spawn a task in the background to run a future. - /// - /// Note: this must only be called from an async block or function running on a runtime. - pub(crate) fn execute(self, fut: F) - where - F: Future + Send + 'static, - O: Send + 'static, + #[cfg(feature = "tokio-sync")] { - self.spawn(fut); + crate::sync::TOKIO_RUNTIME.block_on(fut) } - /// Run a future in the foreground, blocking on it completing. - /// - /// This will panic if called from a sychronous context when tokio is being used. - #[cfg(any(feature = "sync", test))] - pub(crate) fn block_on(self, fut: F) -> T - where - F: Future, + #[cfg(feature = "async-std-runtime")] { - #[cfg(all(feature = "tokio-runtime", not(feature = "async-std-runtime")))] - { - match TokioCallingContext::current() { - TokioCallingContext::Async(_handle) => { - tokio::task::block_in_place(|| futures::executor::block_on(fut)) - } - TokioCallingContext::Sync => { - panic!("block_on called from tokio outside of async context") - } - } - } - - #[cfg(feature = "async-std-runtime")] - { - async_std::task::block_on(fut) - } + async_std::task::block_on(fut) } +} - /// Run a future in the foreground, blocking on it completing. - /// This does not notify the runtime that it will be blocking and should only be used for - /// operations that will immediately (or quickly) succeed. - pub(crate) fn block_in_place(self, fut: F) -> T - where - F: Future + Send, - T: Send, +/// Run a future in the foreground, blocking on it completing. +/// This does not notify the runtime that it will be blocking and should only be used for +/// operations that will immediately (or quickly) succeed. +pub(crate) fn block_in_place(fut: F) -> T +where + F: Future + Send, + T: Send, +{ + futures_executor::block_on(fut) +} + +/// Delay for the specified duration. +pub(crate) async fn delay_for(delay: Duration) { + #[cfg(feature = "tokio-runtime")] { - futures_executor::block_on(fut) + tokio::time::sleep(delay).await } - /// Delay for the specified duration. - pub(crate) async fn delay_for(self, delay: Duration) { - #[cfg(feature = "tokio-runtime")] - { - tokio::time::sleep(delay).await - } - - #[cfg(feature = "async-std-runtime")] - { - async_std::task::sleep(delay).await - } + #[cfg(feature = "async-std-runtime")] + { + async_std::task::sleep(delay).await } +} - /// Await on a future for a maximum amount of time before returning an error. - pub(crate) async fn timeout( - self, - timeout: Duration, - future: F, - ) -> Result { - #[cfg(feature = "tokio-runtime")] - { - tokio::time::timeout(timeout, future) - .await - .map_err(|_| std::io::ErrorKind::TimedOut.into()) - } - - #[cfg(feature = "async-std-runtime")] - { - async_std::future::timeout(timeout, future) - .await - .map_err(|_| std::io::ErrorKind::TimedOut.into()) - } +/// Await on a future for a maximum amount of time before returning an error. +pub(crate) async fn timeout(timeout: Duration, future: F) -> Result { + #[cfg(feature = "tokio-runtime")] + { + tokio::time::timeout(timeout, future) + .await + .map_err(|_| std::io::ErrorKind::TimedOut.into()) } - /// Create a new `Interval` that yields with interval of `duration`. - /// See: - pub(crate) fn interval(self, duration: Duration) -> Interval { - match self { - #[cfg(feature = "tokio-runtime")] - Self::Tokio => tokio::time::interval(duration), + #[cfg(feature = "async-std-runtime")] + { + async_std::future::timeout(timeout, future) + .await + .map_err(|_| std::io::ErrorKind::TimedOut.into()) + } +} - #[cfg(feature = "async-std-runtime")] - Self::AsyncStd => Interval::new(duration), - } +/// Create a new `Interval` that yields with interval of `duration`. +/// See: +pub(crate) fn interval(duration: Duration) -> Interval { + #[cfg(feature = "tokio-runtime")] + { + tokio::time::interval(duration) } - pub(crate) async fn resolve_address( - self, - address: &ServerAddress, - ) -> Result> { - match self { - #[cfg(feature = "tokio-runtime")] - Self::Tokio => { - let socket_addrs = tokio::net::lookup_host(format!("{}", address)).await?; - Ok(socket_addrs) - } - - #[cfg(feature = "async-std-runtime")] - Self::AsyncStd => { - let host = (address.host(), address.port().unwrap_or(27017)); - let socket_addrs = async_std::net::ToSocketAddrs::to_socket_addrs(&host).await?; - Ok(socket_addrs) - } - } + #[cfg(feature = "async-std-runtime")] + { + Interval::new(duration) } } -/// Represents the context in which a given runtime method is being called from. -#[cfg(feature = "tokio-runtime")] -enum TokioCallingContext { - /// From a syncronous setting (i.e. not from a runtime thread). - Sync, - - /// From an asyncronous setting (i.e. from an async block or function being run on a runtime). - /// Includes a handle to the current runtime. - Async(tokio::runtime::Handle), -} +pub(crate) async fn resolve_address( + address: &ServerAddress, +) -> Result> { + #[cfg(feature = "tokio-runtime")] + { + let socket_addrs = tokio::net::lookup_host(format!("{}", address)).await?; + Ok(socket_addrs) + } -#[cfg(feature = "tokio-runtime")] -impl TokioCallingContext { - /// Get the current calling context. - fn current() -> Self { - match tokio::runtime::Handle::try_current() { - Ok(handle) => TokioCallingContext::Async(handle), - Err(_) => TokioCallingContext::Sync, - } + #[cfg(feature = "async-std-runtime")] + { + let host = (address.host(), address.port().unwrap_or(27017)); + let socket_addrs = async_std::net::ToSocketAddrs::to_socket_addrs(&host).await?; + Ok(socket_addrs) } } diff --git a/src/runtime/stream.rs b/src/runtime/stream.rs index 668da3401..de214747c 100644 --- a/src/runtime/stream.rs +++ b/src/runtime/stream.rs @@ -16,7 +16,7 @@ use crate::{ cmap::options::StreamOptions, error::{ErrorKind, Result}, options::ServerAddress, - RUNTIME, + runtime, }; const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); @@ -71,7 +71,7 @@ impl AsyncTcpStream { let stream = if connect_timeout == Duration::from_secs(0) { stream_future.await? } else { - RUNTIME.timeout(connect_timeout, stream_future).await?? + runtime::timeout(connect_timeout, stream_future).await?? }; stream.set_nodelay(true)?; @@ -111,7 +111,7 @@ impl AsyncTcpStream { async fn connect(address: &ServerAddress, connect_timeout: Option) -> Result { let timeout = connect_timeout.unwrap_or(DEFAULT_CONNECT_TIMEOUT); - let mut socket_addrs: Vec<_> = RUNTIME.resolve_address(address).await?.collect(); + let mut socket_addrs: Vec<_> = runtime::resolve_address(address).await?.collect(); if socket_addrs.is_empty() { return Err(ErrorKind::DnsResolve { diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index e8d60cef9..ac61232e6 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -8,6 +8,7 @@ use tokio::sync::RwLockWriteGuard; use crate::{ options::ServerAddress, + runtime, runtime::AsyncJoinHandle, sdam::{description::topology::server_selection, Server}, selection_criteria::ReadPreference, @@ -25,7 +26,6 @@ use crate::{ LOCK, }, ServerType, - RUNTIME, }; use super::TestTopologyDescription; @@ -175,15 +175,11 @@ async fn load_balancing_test() { let collection = client .database("load_balancing_test") .collection::("load_balancing_test"); - handles.push( - RUNTIME - .spawn(async move { - for _ in 0..iterations { - let _ = collection.find_one(None, None).await; - } - }) - .unwrap(), - ) + handles.push(runtime::spawn(async move { + for _ in 0..iterations { + let _ = collection.find_one(None, None).await; + } + })) } futures::future::join_all(handles).await; diff --git a/src/sdam/message_manager.rs b/src/sdam/message_manager.rs index 23fe1fa3c..d0c20df22 100644 --- a/src/sdam/message_manager.rs +++ b/src/sdam/message_manager.rs @@ -2,7 +2,7 @@ use std::time::Duration; use tokio::sync::broadcast::{self, Receiver, Sender}; -use crate::RUNTIME; +use crate::runtime; /// Provides functionality for message passing between server selection operations and SDAM /// background tasks. @@ -57,6 +57,8 @@ impl TopologyMessageSubscriber { /// Waits for either `timeout` to elapse or a message to be received. /// Returns true if a message was received, false for a timeout. pub(crate) async fn wait_for_message(&mut self, timeout: Duration) -> bool { - RUNTIME.timeout(timeout, self.receiver.recv()).await.is_ok() + runtime::timeout(timeout, self.receiver.recv()) + .await + .is_ok() } } diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index a1f31669d..9dc1088c6 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -14,7 +14,7 @@ use crate::{ error::{Error, Result}, is_master::{is_master_command, run_is_master, IsMasterReply}, options::{ClientOptions, ServerAddress}, - RUNTIME, + runtime, }; pub(super) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10); @@ -61,7 +61,7 @@ impl Monitor { self.topology.clone(), self.client_options, ); - RUNTIME.execute(async move { + runtime::execute(async move { heartbeat_monitor.execute().await; }); } @@ -71,7 +71,7 @@ impl Monitor { topology: self.topology, update_receiver: self.update_receiver, }; - RUNTIME.execute(async move { + runtime::execute(async move { update_monitor.execute().await; }); } @@ -147,7 +147,7 @@ impl HeartbeatMonitor { #[cfg(not(test))] let min_frequency = MIN_HEARTBEAT_FREQUENCY; - RUNTIME.delay_for(min_frequency).await; + runtime::delay_for(min_frequency).await; topology_check_requests_subscriber .wait_for_message(heartbeat_frequency - min_frequency) .await; diff --git a/src/sdam/srv_polling/mod.rs b/src/sdam/srv_polling/mod.rs index 3a8137c0e..7115032dd 100644 --- a/src/sdam/srv_polling/mod.rs +++ b/src/sdam/srv_polling/mod.rs @@ -11,8 +11,8 @@ use super::{ use crate::{ error::{Error, Result}, options::ClientOptions, + runtime, srv::{LookupHosts, SrvResolver}, - RUNTIME, }; const MIN_RESCAN_SRV_INTERVAL: Duration = Duration::from_secs(60); @@ -47,7 +47,7 @@ impl SrvPollingMonitor { /// set of mongos in the cluster have changed. A weak reference is used to ensure that the /// monitoring task doesn't keep the topology alive after the client has been dropped. pub(super) fn start(topology: WeakTopology) { - RUNTIME.execute(async move { + runtime::execute(async move { if let Some(mut monitor) = Self::new(topology) { monitor.execute().await; } @@ -64,7 +64,7 @@ impl SrvPollingMonitor { } while self.topology.is_alive() { - RUNTIME.delay_for(self.rescan_interval()).await; + runtime::delay_for(self.rescan_interval()).await; let topology = match self.topology.upgrade() { Some(topology) => topology, diff --git a/src/sdam/srv_polling/test.rs b/src/sdam/srv_polling/test.rs index 6235deb0e..8e0067f9c 100644 --- a/src/sdam/srv_polling/test.rs +++ b/src/sdam/srv_polling/test.rs @@ -6,8 +6,8 @@ use super::{LookupHosts, SrvPollingMonitor}; use crate::{ error::Result, options::{ClientOptions, ServerAddress}, + runtime, sdam::Topology, - RUNTIME, }; fn localhost_test_build_10gen(port: u16) -> ServerAddress { @@ -120,7 +120,7 @@ async fn load_balanced_no_srv_polling() { localhost_test_build_10gen(27018), ])); let topology = Topology::new(options).unwrap(); - RUNTIME.delay_for(rescan_interval * 2).await; + runtime::delay_for(rescan_interval * 2).await; assert_eq!( hosts.into_iter().collect::>(), topology.servers().await diff --git a/src/sdam/state/mod.rs b/src/sdam/state/mod.rs index d2f3484ce..fe42ec36b 100644 --- a/src/sdam/state/mod.rs +++ b/src/sdam/state/mod.rs @@ -36,6 +36,7 @@ use crate::{ TopologyOpeningEvent, }, options::{ClientOptions, SelectionCriteria, ServerAddress}, + runtime, runtime::HttpClient, sdam::{ description::{ @@ -45,7 +46,6 @@ use crate::{ srv_polling::SrvPollingMonitor, TopologyMessageManager, }, - RUNTIME, }; /// A strong reference to the topology, which includes the current state as well as the client @@ -119,7 +119,7 @@ impl Topology { // we can block in place here because we're the only ones with access to the lock, so it // should be acquired immediately. - let mut topology_state = RUNTIME.block_in_place(topology.state.write()); + let mut topology_state = runtime::block_in_place(topology.state.write()); for address in &options.hosts { topology_state.add_new_server(address.clone(), options.clone(), &topology.downgrade()); } diff --git a/src/sdam/test.rs b/src/sdam/test.rs index ac88d6810..810b5571d 100644 --- a/src/sdam/test.rs +++ b/src/sdam/test.rs @@ -9,6 +9,7 @@ use tokio::sync::RwLockWriteGuard; use crate::{ error::ErrorKind, + runtime, sdam::ServerType, test::{ log_uncaptured, @@ -25,7 +26,6 @@ use crate::{ LOCK, }, Client, - RUNTIME, }; #[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] @@ -245,7 +245,7 @@ async fn sdam_min_pool_size_error() { // Wait a little while for the server to be marked as Unknown. // Once we have SDAM monitoring, this wait can be removed and be replaced // with another event waiting. - RUNTIME.delay_for(Duration::from_millis(750)).await; + runtime::delay_for(Duration::from_millis(750)).await; let ping_err = client .database("admin") diff --git a/src/sync/change_stream.rs b/src/sync/change_stream.rs index fd15f68fe..cbfd3125c 100644 --- a/src/sync/change_stream.rs +++ b/src/sync/change_stream.rs @@ -8,7 +8,7 @@ use crate::{ ChangeStream as AsyncChangeStream, }, error::Result, - RUNTIME, + runtime, }; use super::ClientSession; @@ -112,7 +112,7 @@ where /// # } /// ``` pub fn next_if_any(&mut self) -> Result> { - RUNTIME.block_on(self.async_stream.next_if_any()) + runtime::block_on(self.async_stream.next_if_any()) } } @@ -123,7 +123,7 @@ where type Item = Result; fn next(&mut self) -> Option { - RUNTIME.block_on(self.async_stream.next()) + runtime::block_on(self.async_stream.next()) } } @@ -200,7 +200,7 @@ where /// # } /// ``` pub fn next(&mut self, session: &mut ClientSession) -> Result> { - RUNTIME.block_on(self.async_stream.next(&mut session.async_client_session)) + runtime::block_on(self.async_stream.next(&mut session.async_client_session)) } /// Returns whether the change stream will continue to receive events. @@ -234,7 +234,7 @@ where /// # } /// ``` pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_stream .next_if_any(&mut session.async_client_session), ) diff --git a/src/sync/client/mod.rs b/src/sync/client/mod.rs index 9b6ffb994..7b43ad747 100644 --- a/src/sync/client/mod.rs +++ b/src/sync/client/mod.rs @@ -14,8 +14,8 @@ use crate::{ SessionOptions, }, results::DatabaseSpecification, + runtime, Client as AsyncClient, - RUNTIME, }; /// This is the main entry point for the synchronous API. A `Client` is used to connect to a MongoDB @@ -75,7 +75,7 @@ impl Client { /// [`ClientOptions::parse`](../options/struct.ClientOptions.html#method.parse) for more /// details. pub fn with_uri_str(uri: impl AsRef) -> Result { - let async_client = RUNTIME.block_on(AsyncClient::with_uri_str(uri.as_ref()))?; + let async_client = runtime::block_on(AsyncClient::with_uri_str(uri.as_ref()))?; Ok(Self { async_client }) } @@ -134,7 +134,7 @@ impl Client { filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_client .list_databases(filter.into(), options.into()), ) @@ -146,7 +146,7 @@ impl Client { filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_client .list_database_names(filter.into(), options.into()), ) @@ -154,9 +154,7 @@ impl Client { /// Starts a new `ClientSession`. pub fn start_session(&self, options: Option) -> Result { - RUNTIME - .block_on(self.async_client.start_session(options)) - .map(Into::into) + runtime::block_on(self.async_client.start_session(options)).map(Into::into) } /// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The @@ -182,9 +180,7 @@ impl Client { pipeline: impl IntoIterator, options: impl Into>, ) -> Result>> { - RUNTIME - .block_on(self.async_client.watch(pipeline, options)) - .map(ChangeStream::new) + runtime::block_on(self.async_client.watch(pipeline, options)).map(ChangeStream::new) } /// Starts a new [`SessionChangeStream`] that receives events for all changes in the cluster @@ -195,12 +191,11 @@ impl Client { options: impl Into>, session: &mut ClientSession, ) -> Result>> { - RUNTIME - .block_on(self.async_client.watch_with_session( - pipeline, - options, - &mut session.async_client_session, - )) - .map(SessionChangeStream::new) + runtime::block_on(self.async_client.watch_with_session( + pipeline, + options, + &mut session.async_client_session, + )) + .map(SessionChangeStream::new) } } diff --git a/src/sync/client/session.rs b/src/sync/client/session.rs index 57602b6a5..2247c49f8 100644 --- a/src/sync/client/session.rs +++ b/src/sync/client/session.rs @@ -4,8 +4,8 @@ use crate::{ client::session::ClusterTime, error::Result, options::{SessionOptions, TransactionOptions}, + runtime, ClientSession as AsyncClientSession, - RUNTIME, }; /// A MongoDB client session. This struct represents a logical session used for ordering sequential @@ -76,7 +76,7 @@ impl ClientSession { &mut self, options: impl Into>, ) -> Result<()> { - RUNTIME.block_on(self.async_client_session.start_transaction(options)) + runtime::block_on(self.async_client_session.start_transaction(options)) } /// Commits the transaction that is currently active on this session. @@ -100,7 +100,7 @@ impl ClientSession { /// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on /// retryable writes. pub fn commit_transaction(&mut self) -> Result<()> { - RUNTIME.block_on(self.async_client_session.commit_transaction()) + runtime::block_on(self.async_client_session.commit_transaction()) } /// Aborts the transaction that is currently active on this session. Any open transaction will @@ -133,6 +133,6 @@ impl ClientSession { /// [here](https://docs.mongodb.com/manual/core/retryable-writes/) for more information on /// retryable writes. pub fn abort_transaction(&mut self) -> Result<()> { - RUNTIME.block_on(self.async_client_session.abort_transaction()) + runtime::block_on(self.async_client_session.abort_transaction()) } } diff --git a/src/sync/coll.rs b/src/sync/coll.rs index 39b901238..d0e8cf0ac 100644 --- a/src/sync/coll.rs +++ b/src/sync/coll.rs @@ -40,9 +40,9 @@ use crate::{ InsertOneResult, UpdateResult, }, + runtime, Collection as AsyncCollection, Namespace, - RUNTIME, }; /// `Collection` is the client-side abstraction of a MongoDB Collection. It can be used to @@ -128,7 +128,7 @@ impl Collection { /// Drops the collection, deleting all data, users, and indexes stored in it. pub fn drop(&self, options: impl Into>) -> Result<()> { - RUNTIME.block_on(self.async_collection.drop(options.into())) + runtime::block_on(self.async_collection.drop(options.into())) } /// Drops the collection, deleting all data, users, and indexes stored in it using the provided @@ -138,7 +138,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result<()> { - RUNTIME.block_on( + runtime::block_on( self.async_collection .drop_with_session(options.into(), &mut session.async_client_session), ) @@ -154,8 +154,7 @@ impl Collection { options: impl Into>, ) -> Result> { let pipeline: Vec = pipeline.into_iter().collect(); - RUNTIME - .block_on(self.async_collection.aggregate(pipeline, options.into())) + runtime::block_on(self.async_collection.aggregate(pipeline, options.into())) .map(Cursor::new) } @@ -170,13 +169,12 @@ impl Collection { session: &mut ClientSession, ) -> Result> { let pipeline: Vec = pipeline.into_iter().collect(); - RUNTIME - .block_on(self.async_collection.aggregate_with_session( - pipeline, - options.into(), - &mut session.async_client_session, - )) - .map(SessionCursor::new) + runtime::block_on(self.async_collection.aggregate_with_session( + pipeline, + options.into(), + &mut session.async_client_session, + )) + .map(SessionCursor::new) } /// Estimates the number of documents in the collection using collection metadata. @@ -184,7 +182,7 @@ impl Collection { &self, options: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_collection .estimated_document_count(options.into()), ) @@ -199,7 +197,7 @@ impl Collection { filter: impl Into>, options: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_collection .count_documents(filter.into(), options.into()), ) @@ -215,7 +213,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.count_documents_with_session( + runtime::block_on(self.async_collection.count_documents_with_session( filter.into(), options.into(), &mut session.async_client_session, @@ -228,7 +226,7 @@ impl Collection { index: IndexModel, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.create_index(index, options)) + runtime::block_on(self.async_collection.create_index(index, options)) } /// Creates the given index on this collection using the provided `ClientSession`. @@ -238,7 +236,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.create_index_with_session( + runtime::block_on(self.async_collection.create_index_with_session( index, options, &mut session.async_client_session, @@ -251,7 +249,7 @@ impl Collection { indexes: impl IntoIterator, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.create_indexes(indexes, options)) + runtime::block_on(self.async_collection.create_indexes(indexes, options)) } /// Creates the given indexes on this collection using the provided `ClientSession`. @@ -261,7 +259,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.create_indexes_with_session( + runtime::block_on(self.async_collection.create_indexes_with_session( indexes, options, &mut session.async_client_session, @@ -274,7 +272,7 @@ impl Collection { query: Document, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.delete_many(query, options.into())) + runtime::block_on(self.async_collection.delete_many(query, options.into())) } /// Deletes all documents stored in the collection matching `query` using the provided @@ -285,7 +283,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.delete_many_with_session( + runtime::block_on(self.async_collection.delete_many_with_session( query, options.into(), &mut session.async_client_session, @@ -303,7 +301,7 @@ impl Collection { query: Document, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.delete_one(query, options.into())) + runtime::block_on(self.async_collection.delete_one(query, options.into())) } /// Deletes up to one document found matching `query` using the provided `ClientSession`. @@ -318,7 +316,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.delete_one_with_session( + runtime::block_on(self.async_collection.delete_one_with_session( query, options.into(), &mut session.async_client_session, @@ -332,7 +330,7 @@ impl Collection { filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME.block_on(self.async_collection.distinct( + runtime::block_on(self.async_collection.distinct( field_name.as_ref(), filter.into(), options.into(), @@ -348,7 +346,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on(self.async_collection.distinct_with_session( + runtime::block_on(self.async_collection.distinct_with_session( field_name.as_ref(), filter.into(), options.into(), @@ -368,7 +366,7 @@ impl Collection { update: impl Into, options: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_collection .update_many(query, update.into(), options.into()), ) @@ -380,7 +378,7 @@ impl Collection { name: impl AsRef, options: impl Into>, ) -> Result<()> { - RUNTIME.block_on(self.async_collection.drop_index(name, options)) + runtime::block_on(self.async_collection.drop_index(name, options)) } /// Drops the index specified by `name` from this collection using the provided `ClientSession`. @@ -390,7 +388,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result<()> { - RUNTIME.block_on(self.async_collection.drop_index_with_session( + runtime::block_on(self.async_collection.drop_index_with_session( name, options, &mut session.async_client_session, @@ -399,7 +397,7 @@ impl Collection { /// Drops all indexes associated with this collection. pub fn drop_indexes(&self, options: impl Into>) -> Result<()> { - RUNTIME.block_on(self.async_collection.drop_indexes(options)) + runtime::block_on(self.async_collection.drop_indexes(options)) } /// Drops all indexes associated with this collection using the provided `ClientSession`. @@ -408,7 +406,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result<()> { - RUNTIME.block_on( + runtime::block_on( self.async_collection .drop_indexes_with_session(options, &mut session.async_client_session), ) @@ -419,9 +417,7 @@ impl Collection { &self, options: impl Into>, ) -> Result> { - RUNTIME - .block_on(self.async_collection.list_indexes(options)) - .map(Cursor::new) + runtime::block_on(self.async_collection.list_indexes(options)).map(Cursor::new) } /// Lists all indexes on this collection using the provided `ClientSession`. @@ -430,17 +426,16 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME - .block_on( - self.async_collection - .list_indexes_with_session(options, &mut session.async_client_session), - ) - .map(SessionCursor::new) + runtime::block_on( + self.async_collection + .list_indexes_with_session(options, &mut session.async_client_session), + ) + .map(SessionCursor::new) } /// Gets the names of all indexes on the collection. pub fn list_index_names(&self) -> Result> { - RUNTIME.block_on(self.async_collection.list_index_names()) + runtime::block_on(self.async_collection.list_index_names()) } /// Gets the names of all indexes on the collection using the provided `ClientSession`. @@ -448,7 +443,7 @@ impl Collection { &self, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_collection .list_index_names_with_session(&mut session.async_client_session), ) @@ -467,7 +462,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.update_many_with_session( + runtime::block_on(self.async_collection.update_many_with_session( query, update.into(), options.into(), @@ -492,7 +487,7 @@ impl Collection { update: impl Into, options: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_collection .update_one(query, update.into(), options.into()), ) @@ -517,7 +512,7 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.update_one_with_session( + runtime::block_on(self.async_collection.update_one_with_session( query, update.into(), options.into(), @@ -551,9 +546,7 @@ impl Collection { where T: DeserializeOwned + Unpin + Send + Sync, { - RUNTIME - .block_on(self.async_collection.watch(pipeline, options)) - .map(ChangeStream::new) + runtime::block_on(self.async_collection.watch(pipeline, options)).map(ChangeStream::new) } /// Starts a new [`SessionChangeStream`] that receives events for all changes in this collection @@ -568,13 +561,12 @@ impl Collection { where T: DeserializeOwned + Unpin + Send + Sync, { - RUNTIME - .block_on(self.async_collection.watch_with_session( - pipeline, - options, - &mut session.async_client_session, - )) - .map(SessionChangeStream::new) + runtime::block_on(self.async_collection.watch_with_session( + pipeline, + options, + &mut session.async_client_session, + )) + .map(SessionChangeStream::new) } /// Finds the documents in the collection matching `filter`. @@ -583,8 +575,7 @@ impl Collection { filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME - .block_on(self.async_collection.find(filter.into(), options.into())) + runtime::block_on(self.async_collection.find(filter.into(), options.into())) .map(Cursor::new) } @@ -595,13 +586,12 @@ impl Collection { options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME - .block_on(self.async_collection.find_with_session( - filter.into(), - options.into(), - &mut session.async_client_session, - )) - .map(SessionCursor::new) + runtime::block_on(self.async_collection.find_with_session( + filter.into(), + options.into(), + &mut session.async_client_session, + )) + .map(SessionCursor::new) } } @@ -615,7 +605,7 @@ where filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_collection .find_one(filter.into(), options.into()), ) @@ -629,7 +619,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_with_session( + runtime::block_on(self.async_collection.find_one_with_session( filter.into(), options.into(), &mut session.async_client_session, @@ -652,7 +642,7 @@ where filter: Document, options: impl Into>, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_collection .find_one_and_delete(filter, options.into()), ) @@ -671,7 +661,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_and_delete_with_session( + runtime::block_on(self.async_collection.find_one_and_delete_with_session( filter, options.into(), &mut session.async_client_session, @@ -693,7 +683,7 @@ where update: impl Into, options: impl Into>, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_and_update( + runtime::block_on(self.async_collection.find_one_and_update( filter, update.into(), options.into(), @@ -716,7 +706,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_and_update_with_session( + runtime::block_on(self.async_collection.find_one_and_update_with_session( filter, update.into(), options.into(), @@ -742,7 +732,7 @@ where replacement: T, options: impl Into>, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_and_replace( + runtime::block_on(self.async_collection.find_one_and_replace( filter, replacement, options.into(), @@ -763,7 +753,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on(self.async_collection.find_one_and_replace_with_session( + runtime::block_on(self.async_collection.find_one_and_replace_with_session( filter, replacement, options.into(), @@ -787,7 +777,7 @@ where docs: impl IntoIterator>, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.insert_many(docs, options.into())) + runtime::block_on(self.async_collection.insert_many(docs, options.into())) } /// Inserts the documents in `docs` into the collection using the provided `ClientSession`. @@ -802,7 +792,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.insert_many_with_session( + runtime::block_on(self.async_collection.insert_many_with_session( docs, options.into(), &mut session.async_client_session, @@ -820,7 +810,7 @@ where doc: impl Borrow, options: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_collection .insert_one(doc.borrow(), options.into()), ) @@ -838,7 +828,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.insert_one_with_session( + runtime::block_on(self.async_collection.insert_one_with_session( doc.borrow(), options.into(), &mut session.async_client_session, @@ -857,7 +847,7 @@ where replacement: impl Borrow, options: impl Into>, ) -> Result { - RUNTIME.block_on(self.async_collection.replace_one( + runtime::block_on(self.async_collection.replace_one( query, replacement.borrow(), options.into(), @@ -878,7 +868,7 @@ where options: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_collection.replace_one_with_session( + runtime::block_on(self.async_collection.replace_one_with_session( query, replacement.borrow(), options.into(), diff --git a/src/sync/cursor.rs b/src/sync/cursor.rs index 92ca91631..37fe85e8e 100644 --- a/src/sync/cursor.rs +++ b/src/sync/cursor.rs @@ -5,10 +5,10 @@ use super::ClientSession; use crate::{ bson::{Document, RawDocument}, error::Result, + runtime, Cursor as AsyncCursor, SessionCursor as AsyncSessionCursor, SessionCursorStream, - RUNTIME, }; /// A `Cursor` streams the result of a query. When a query is made, a `Cursor` will be returned with @@ -105,7 +105,7 @@ impl Cursor { /// # } /// ``` pub fn advance(&mut self) -> Result { - RUNTIME.block_on(self.async_cursor.advance()) + runtime::block_on(self.async_cursor.advance()) } /// Returns a reference to the current result in the cursor. @@ -174,7 +174,7 @@ where type Item = Result; fn next(&mut self) -> Option { - RUNTIME.block_on(self.async_cursor.next()) + runtime::block_on(self.async_cursor.next()) } } @@ -234,7 +234,7 @@ impl SessionCursor { /// # } /// ``` pub fn advance(&mut self, session: &mut ClientSession) -> Result { - RUNTIME.block_on(self.async_cursor.advance(&mut session.async_client_session)) + runtime::block_on(self.async_cursor.advance(&mut session.async_client_session)) } /// Returns a reference to the current result in the cursor. @@ -357,6 +357,6 @@ where type Item = Result; fn next(&mut self) -> Option { - RUNTIME.block_on(self.async_stream.next()) + runtime::block_on(self.async_stream.next()) } } diff --git a/src/sync/db.rs b/src/sync/db.rs index 4bb0cef96..2a2bbbf53 100644 --- a/src/sync/db.rs +++ b/src/sync/db.rs @@ -16,8 +16,8 @@ use crate::{ WriteConcern, }, results::CollectionSpecification, + runtime, Database as AsyncDatabase, - RUNTIME, }; /// `Database` is the client-side abstraction of a MongoDB database. It can be used to perform @@ -108,7 +108,7 @@ impl Database { /// Drops the database, deleting all data, collections, users, and indexes stored in it. pub fn drop(&self, options: impl Into>) -> Result<()> { - RUNTIME.block_on(self.async_database.drop(options.into())) + runtime::block_on(self.async_database.drop(options.into())) } /// Drops the database, deleting all data, collections, users, and indexes stored in it using @@ -118,7 +118,7 @@ impl Database { options: impl Into>, session: &mut ClientSession, ) -> Result<()> { - RUNTIME.block_on( + runtime::block_on( self.async_database .drop_with_session(options.into(), &mut session.async_client_session), ) @@ -131,12 +131,11 @@ impl Database { filter: impl Into>, options: impl Into>, ) -> Result> { - RUNTIME - .block_on( - self.async_database - .list_collections(filter.into(), options.into()), - ) - .map(Cursor::new) + runtime::block_on( + self.async_database + .list_collections(filter.into(), options.into()), + ) + .map(Cursor::new) } /// Gets information about each of the collections in the database using the provided @@ -148,13 +147,12 @@ impl Database { options: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME - .block_on(self.async_database.list_collections_with_session( - filter.into(), - options.into(), - &mut session.async_client_session, - )) - .map(SessionCursor::new) + runtime::block_on(self.async_database.list_collections_with_session( + filter.into(), + options.into(), + &mut session.async_client_session, + )) + .map(SessionCursor::new) } /// Gets the names of the collections in the database. @@ -162,7 +160,7 @@ impl Database { &self, filter: impl Into>, ) -> Result> { - RUNTIME.block_on(self.async_database.list_collection_names(filter.into())) + runtime::block_on(self.async_database.list_collection_names(filter.into())) } /// Gets the names of the collections in the database using the provided `ClientSession`. @@ -171,7 +169,7 @@ impl Database { filter: impl Into>, session: &mut ClientSession, ) -> Result> { - RUNTIME.block_on( + runtime::block_on( self.async_database.list_collection_names_with_session( filter.into(), &mut session.async_client_session, @@ -188,7 +186,7 @@ impl Database { name: impl AsRef, options: impl Into>, ) -> Result<()> { - RUNTIME.block_on( + runtime::block_on( self.async_database .create_collection(name.as_ref(), options.into()), ) @@ -205,7 +203,7 @@ impl Database { options: impl Into>, session: &mut ClientSession, ) -> Result<()> { - RUNTIME.block_on(self.async_database.create_collection_with_session( + runtime::block_on(self.async_database.create_collection_with_session( name.as_ref(), options.into(), &mut session.async_client_session, @@ -222,7 +220,7 @@ impl Database { command: Document, selection_criteria: impl Into>, ) -> Result { - RUNTIME.block_on( + runtime::block_on( self.async_database .run_command(command, selection_criteria.into()), ) @@ -239,7 +237,7 @@ impl Database { selection_criteria: impl Into>, session: &mut ClientSession, ) -> Result { - RUNTIME.block_on(self.async_database.run_command_with_session( + runtime::block_on(self.async_database.run_command_with_session( command, selection_criteria.into(), &mut session.async_client_session, @@ -256,9 +254,7 @@ impl Database { options: impl Into>, ) -> Result> { let pipeline: Vec = pipeline.into_iter().collect(); - RUNTIME - .block_on(self.async_database.aggregate(pipeline, options.into())) - .map(Cursor::new) + runtime::block_on(self.async_database.aggregate(pipeline, options.into())).map(Cursor::new) } /// Runs an aggregation operation using the provided `ClientSession`. @@ -272,13 +268,12 @@ impl Database { session: &mut ClientSession, ) -> Result> { let pipeline: Vec = pipeline.into_iter().collect(); - RUNTIME - .block_on(self.async_database.aggregate_with_session( - pipeline, - options.into(), - &mut session.async_client_session, - )) - .map(SessionCursor::new) + runtime::block_on(self.async_database.aggregate_with_session( + pipeline, + options.into(), + &mut session.async_client_session, + )) + .map(SessionCursor::new) } /// Starts a new [`ChangeStream`](change_stream/struct.ChangeStream.html) that receives events @@ -303,9 +298,7 @@ impl Database { pipeline: impl IntoIterator, options: impl Into>, ) -> Result>> { - RUNTIME - .block_on(self.async_database.watch(pipeline, options)) - .map(ChangeStream::new) + runtime::block_on(self.async_database.watch(pipeline, options)).map(ChangeStream::new) } /// Starts a new [`SessionChangeStream`] that receives events for all changes in this database @@ -316,12 +309,11 @@ impl Database { options: impl Into>, session: &mut ClientSession, ) -> Result>> { - RUNTIME - .block_on(self.async_database.watch_with_session( - pipeline, - options, - &mut session.async_client_session, - )) - .map(SessionChangeStream::new) + runtime::block_on(self.async_database.watch_with_session( + pipeline, + options, + &mut session.async_client_session, + )) + .map(SessionChangeStream::new) } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 7dd5bce99..1c897f4a7 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -14,3 +14,12 @@ pub use client::{session::ClientSession, Client}; pub use coll::Collection; pub use cursor::{Cursor, SessionCursor, SessionCursorIter}; pub use db::Database; + +lazy_static::lazy_static! { + pub(crate) static ref TOKIO_RUNTIME: tokio::runtime::Runtime = { + match tokio::runtime::Runtime::new() { + Ok(runtime) => runtime, + Err(err) => panic!("Error occurred when starting the underlying async runtime: {}", err) + } + }; +} diff --git a/src/sync/test.rs b/src/sync/test.rs index 00ac82a98..08f2d01a3 100644 --- a/src/sync/test.rs +++ b/src/sync/test.rs @@ -16,9 +16,9 @@ use crate::{ ServerAddress, WriteConcern, }, + runtime, sync::{Client, Collection}, test::{TestClient as AsyncTestClient, CLIENT_OPTIONS, LOCK}, - RUNTIME, }; fn init_db_and_coll(client: &Client, db_name: &str, coll_name: &str) -> Collection { @@ -35,7 +35,7 @@ fn init_db_and_typed_coll(client: &Client, db_name: &str, coll_name: &str) -> #[test] fn client_options() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let mut options = ClientOptions::parse("mongodb://localhost:27017/").unwrap(); @@ -55,7 +55,7 @@ fn client_options() { #[test] #[function_name::named] fn client() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let options = CLIENT_OPTIONS.clone(); let client = Client::with_options(options).expect("client creation should succeed"); @@ -76,7 +76,7 @@ fn client() { fn default_database() { // here we just test default database name matched, the database interactive logic // is tested in `database`. - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let options = CLIENT_OPTIONS.clone(); let client = Client::with_options(options).expect("client creation should succeed"); @@ -104,7 +104,7 @@ fn default_database() { #[test] #[function_name::named] fn database() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let options = CLIENT_OPTIONS.clone(); let client = Client::with_options(options).expect("client creation should succeed"); @@ -149,7 +149,7 @@ fn database() { #[test] #[function_name::named] fn collection() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let options = CLIENT_OPTIONS.clone(); let client = Client::with_options(options).expect("client creation should succeed"); @@ -202,7 +202,7 @@ fn collection() { #[test] #[function_name::named] fn typed_collection() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let options = CLIENT_OPTIONS.clone(); let client = Client::with_options(options).expect("client creation should succeed"); @@ -224,9 +224,9 @@ fn typed_collection() { #[test] #[function_name::named] fn transactions() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); - let should_skip = RUNTIME.block_on(async { + let should_skip = runtime::block_on(async { let test_client = AsyncTestClient::new().await; !test_client.supports_transactions() }); @@ -268,13 +268,16 @@ fn transactions() { #[test] #[function_name::named] fn collection_generic_bounds() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); #[derive(Deserialize)] struct Foo; + println!("1"); let options = CLIENT_OPTIONS.clone(); + println!("2"); let client = Client::with_options(options).expect("client creation should succeed"); + println!("3"); // ensure this code successfully compiles let coll: Collection = client @@ -294,7 +297,7 @@ fn collection_generic_bounds() { #[test] fn borrowed_deserialization() { - let _guard: RwLockReadGuard<()> = RUNTIME.block_on(async { LOCK.run_concurrently().await }); + let _guard: RwLockReadGuard<()> = runtime::block_on(async { LOCK.run_concurrently().await }); let client = Client::with_options(CLIENT_OPTIONS.clone()).expect("client creation should succeed"); diff --git a/src/test/client.rs b/src/test/client.rs index cb38d9680..e99725e59 100644 --- a/src/test/client.rs +++ b/src/test/client.rs @@ -8,10 +8,10 @@ use crate::{ bson::{doc, Bson}, error::{CommandError, Error, ErrorKind}, options::{AuthMechanism, ClientOptions, Credential, ListDatabasesOptions, ServerAddress}, + runtime, selection_criteria::{ReadPreference, ReadPreferenceOptions, SelectionCriteria}, test::{log_uncaptured, util::TestClient, CLIENT_OPTIONS, LOCK}, Client, - RUNTIME, }; #[derive(Debug, Deserialize)] @@ -83,22 +83,21 @@ async fn connection_drop_during_read() { .await .unwrap(); - let _: Result<_, _> = RUNTIME - .timeout( - Duration::from_millis(50), - db.run_command( - doc! { - "count": function_name!(), - "query": { - "$where": "sleep(100) && true" - } - }, - None, - ), - ) - .await; + let _: Result<_, _> = runtime::timeout( + Duration::from_millis(50), + db.run_command( + doc! { + "count": function_name!(), + "query": { + "$where": "sleep(100) && true" + } + }, + None, + ), + ) + .await; - RUNTIME.delay_for(Duration::from_millis(200)).await; + runtime::delay_for(Duration::from_millis(200)).await; let is_master_response = db.run_command(doc! { "isMaster": 1 }, None).await; diff --git a/src/test/coll.rs b/src/test/coll.rs index ce2ba8eba..068931984 100644 --- a/src/test/coll.rs +++ b/src/test/coll.rs @@ -27,6 +27,7 @@ use crate::{ WriteConcern, }, results::DeleteResult, + runtime, test::{ log_uncaptured, util::{drop_collection, EventClient, TestClient}, @@ -34,7 +35,6 @@ use crate::{ LOCK, }, Collection, - RUNTIME, }; #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -305,7 +305,7 @@ async fn kill_cursors_on_drop() { // The `Drop` implementation for `Cursor' spawns a back tasks that emits certain events. If the // task hasn't been scheduled yet, we may not see the event here. To account for this, we wait // for a small amount of time before checking. - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert!(kill_cursors_sent(&event_client)); } @@ -341,7 +341,7 @@ async fn no_kill_cursors_on_exhausted() { std::mem::drop(cursor); // wait for any tasks to get spawned from `Cursor`'s `Drop`. - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert!(!kill_cursors_sent(&event_client)); } diff --git a/src/test/cursor.rs b/src/test/cursor.rs index c414c3441..4c39198ec 100644 --- a/src/test/cursor.rs +++ b/src/test/cursor.rs @@ -7,8 +7,8 @@ use tokio::sync::RwLockReadGuard; use crate::{ bson::doc, options::{CreateCollectionOptions, CursorType, FindOptions}, + runtime, test::{util::EventClient, TestClient, LOCK}, - RUNTIME, }; #[cfg_attr(feature = "tokio-runtime", tokio::test)] @@ -54,7 +54,7 @@ async fn tailable_cursor() { ); } - let delay = RUNTIME.delay_for(await_time); + let delay = runtime::delay_for(await_time); let next_doc = cursor.next(); let next_doc = match futures::future::select(Box::pin(delay), Box::pin(next_doc)).await { @@ -66,11 +66,11 @@ async fn tailable_cursor() { ), }; - RUNTIME.execute(async move { + runtime::execute(async move { coll.insert_one(doc! { "_id": 5 }, None).await.unwrap(); }); - let delay = RUNTIME.delay_for(await_time); + let delay = runtime::delay_for(await_time); match futures::future::select(Box::pin(delay), Box::pin(next_doc)).await { Either::Left((..)) => panic!("should have gotten next document, but instead timed"), diff --git a/src/test/documentation_examples/mod.rs b/src/test/documentation_examples/mod.rs index 814baed2b..16ec85301 100644 --- a/src/test/documentation_examples/mod.rs +++ b/src/test/documentation_examples/mod.rs @@ -1372,7 +1372,7 @@ async fn delete_examples(collection: &Collection) -> Result<()> { type GenericResult = std::result::Result>; #[allow(unused_variables)] -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn stable_api_examples() -> GenericResult<()> { let setup_client = TestClient::new().await; if setup_client.server_version_lt(4, 9) { @@ -1755,7 +1755,7 @@ async fn index_examples() -> Result<()> { } async fn change_streams_examples() -> Result<()> { - use crate::{change_stream::options::FullDocumentType, options::ChangeStreamOptions, RUNTIME}; + use crate::{change_stream::options::FullDocumentType, options::ChangeStreamOptions, runtime}; use std::time::Duration; let client = TestClient::new().await; @@ -1772,20 +1772,18 @@ async fn change_streams_examples() -> Result<()> { // Background writer thread so that the `stream.next()` calls return something. let (tx, mut rx) = tokio::sync::oneshot::channel(); let writer_inventory = inventory.clone(); - let handle = RUNTIME - .spawn(async move { - let mut interval = RUNTIME.interval(Duration::from_millis(100)); - loop { - tokio::select! { - _ = interval.tick() => { - writer_inventory.insert_one(doc! {}, None).await?; - } - _ = &mut rx => break, + let handle = runtime::spawn(async move { + let mut interval = runtime::interval(Duration::from_millis(100)); + loop { + tokio::select! { + _ = interval.tick() => { + writer_inventory.insert_one(doc! {}, None).await?; } + _ = &mut rx => break, } - Result::Ok(()) - }) - .unwrap(); + } + Result::Ok(()) + }); #[allow(unused_variables, unused_imports)] { diff --git a/src/test/mod.rs b/src/test/mod.rs index c320500ae..00150cb75 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1,4 +1,4 @@ -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod atlas_connectivity; mod auth_aws; mod change_stream; @@ -6,7 +6,7 @@ mod client; mod coll; mod cursor; mod db; -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod documentation_examples; mod index_management; mod spec; diff --git a/src/test/spec/connection_stepdown.rs b/src/test/spec/connection_stepdown.rs index f0d8ef79f..7aeeb2451 100644 --- a/src/test/spec/connection_stepdown.rs +++ b/src/test/spec/connection_stepdown.rs @@ -15,10 +15,10 @@ use crate::{ InsertManyOptions, WriteConcern, }, + runtime, test::{log_uncaptured, util::EventClient, CLIENT_OPTIONS, LOCK}, Collection, Database, - RUNTIME, }; async fn run_test( @@ -112,7 +112,7 @@ async fn get_more() { .expect("cursor iteration should have succeeded"); } - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert_eq!(client.count_pool_cleared_events(), 0); } @@ -163,7 +163,7 @@ async fn not_master_keep_pool() { .await .expect("insert should have succeeded"); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert_eq!(client.count_pool_cleared_events(), 0); } @@ -210,7 +210,7 @@ async fn not_master_reset_pool() { "insert should have failed" ); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert_eq!(client.count_pool_cleared_events(), 1); coll.insert_one(doc! { "test": 1 }, None) @@ -260,7 +260,7 @@ async fn shutdown_in_progress() { "insert should have failed" ); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert_eq!(client.count_pool_cleared_events(), 1); coll.insert_one(doc! { "test": 1 }, None) @@ -310,14 +310,14 @@ async fn interrupted_at_shutdown() { "insert should have failed" ); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; assert_eq!(client.count_pool_cleared_events(), 1); coll.insert_one(doc! { "test": 1 }, None) .await .expect("insert should have succeeded"); - RUNTIME.delay_for(Duration::from_millis(250)).await; + runtime::delay_for(Duration::from_millis(250)).await; } run_test(function_name!(), interrupted_at_shutdown_test).await; diff --git a/src/test/spec/initial_dns_seedlist_discovery.rs b/src/test/spec/initial_dns_seedlist_discovery.rs index cd3cfec01..ab25fb1c9 100644 --- a/src/test/spec/initial_dns_seedlist_discovery.rs +++ b/src/test/spec/initial_dns_seedlist_discovery.rs @@ -7,8 +7,8 @@ use crate::{ bson::doc, client::{auth::AuthMechanism, Client}, options::{ClientOptions, ResolverConfig}, + runtime, test::{log_uncaptured, run_spec_test, TestClient, CLIENT_OPTIONS, LOCK}, - RUNTIME, }; #[derive(Debug, Deserialize)] @@ -188,7 +188,7 @@ async fn run_test(mut test_file: TestFile) { ) } - RUNTIME.delay_for(Duration::from_millis(500)).await; + runtime::delay_for(Duration::from_millis(500)).await; } } diff --git a/src/test/spec/mod.rs b/src/test/spec/mod.rs index 0d42a5f29..614a524b0 100644 --- a/src/test/spec/mod.rs +++ b/src/test/spec/mod.rs @@ -1,4 +1,4 @@ -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod auth; mod change_streams; mod collection_management; @@ -6,11 +6,11 @@ mod command_monitoring; mod connection_stepdown; mod crud; mod crud_v1; -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod initial_dns_seedlist_discovery; mod load_balancers; mod ocsp; -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod read_write_concern; mod retryable_reads; mod retryable_writes; diff --git a/src/test/spec/retryable_reads.rs b/src/test/spec/retryable_reads.rs index 7fb1fe534..f0b9a9f46 100644 --- a/src/test/spec/retryable_reads.rs +++ b/src/test/spec/retryable_reads.rs @@ -9,6 +9,7 @@ use crate::{ cmap::{CmapEventHandler, ConnectionCheckoutFailedReason}, command::CommandEventHandler, }, + runtime, runtime::AsyncJoinHandle, test::{ log_uncaptured, @@ -23,7 +24,6 @@ use crate::{ CLIENT_OPTIONS, LOCK, }, - RUNTIME, }; use super::run_v2_test; @@ -62,8 +62,7 @@ async fn retry_releases_connection() { let failpoint = FailPoint::fail_command(&["find"], FailPointMode::Times(1), Some(options)); let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap(); - RUNTIME - .timeout(Duration::from_secs(1), collection.find_one(doc! {}, None)) + runtime::timeout(Duration::from_secs(1), collection.find_one(doc! {}, None)) .await .expect("operation should not time out") .expect("find should succeed"); @@ -116,9 +115,7 @@ async fn retry_read_pool_cleared() { let mut tasks: Vec> = Vec::new(); for _ in 0..2 { let coll = collection.clone(); - let task = RUNTIME - .spawn(async move { coll.find_one(doc! {}, None).await }) - .unwrap(); + let task = runtime::spawn(async move { coll.find_one(doc! {}, None).await }); tasks.push(task); } diff --git a/src/test/spec/retryable_writes/mod.rs b/src/test/spec/retryable_writes/mod.rs index f50ff6245..ae0c5c08c 100644 --- a/src/test/spec/retryable_writes/mod.rs +++ b/src/test/spec/retryable_writes/mod.rs @@ -16,6 +16,7 @@ use crate::{ command::CommandEventHandler, }, options::{ClientOptions, FindOptions, InsertManyOptions}, + runtime, runtime::AsyncJoinHandle, test::{ assert_matches, @@ -33,7 +34,6 @@ use crate::{ CLIENT_OPTIONS, LOCK, }, - RUNTIME, }; use super::run_unified_format_test; @@ -459,9 +459,7 @@ async fn retry_write_pool_cleared() { let mut tasks: Vec> = Vec::new(); for _ in 0..2 { let coll = collection.clone(); - let task = RUNTIME - .spawn(async move { coll.insert_one(doc! {}, None).await }) - .unwrap(); + let task = runtime::spawn(async move { coll.insert_one(doc! {}, None).await }); tasks.push(task); } diff --git a/src/test/spec/unified_runner/mod.rs b/src/test/spec/unified_runner/mod.rs index 8758bf103..57981dd4e 100644 --- a/src/test/spec/unified_runner/mod.rs +++ b/src/test/spec/unified_runner/mod.rs @@ -14,8 +14,8 @@ use tokio::sync::RwLockWriteGuard; use crate::{ bson::{doc, Document}, options::{CollectionOptions, FindOptions, ReadConcern, ReadPreference, SelectionCriteria}, + runtime, test::{log_uncaptured, run_single_test, run_spec_test, LOCK}, - RUNTIME, }; pub use self::{ @@ -217,7 +217,7 @@ pub async fn run_unified_format_test_filtered( // implicit session used in the first operation is returned to the pool before // the second operation is executed. if test_case.description == "Server supports implicit sessions" { - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; } } diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 836041f69..b0d2bff90 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -42,12 +42,12 @@ use crate::{ UpdateModifications, UpdateOptions, }, + runtime, selection_criteria::ReadPreference, test::FailPoint, Collection, Database, IndexModel, - RUNTIME, }; pub trait TestOperation: Debug { @@ -1375,7 +1375,7 @@ impl TestOperation for EndSession { async move { let session = test_runner.get_mut_session(id).client_session.take(); drop(session); - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; Ok(None) } .boxed() diff --git a/src/test/spec/v2_runner/mod.rs b/src/test/spec/v2_runner/mod.rs index 91cfc2ed1..52bef507c 100644 --- a/src/test/spec/v2_runner/mod.rs +++ b/src/test/spec/v2_runner/mod.rs @@ -11,6 +11,7 @@ use crate::{ coll::options::{DistinctOptions, DropCollectionOptions}, concern::{Acknowledgment, WriteConcern}, options::{ClientOptions, CreateCollectionOptions, InsertManyOptions}, + runtime, sdam::ServerInfo, selection_criteria::SelectionCriteria, test::{ @@ -21,7 +22,6 @@ use crate::{ TestClient, SERVERLESS, }, - RUNTIME, }; use operation::{OperationObject, OperationResult}; @@ -196,7 +196,7 @@ pub async fn run_v2_test(test_file: TestFile) { // implicit session used in the first operation is returned to the pool before // the second operation is executed. if test.description == "Server supports implicit sessions" { - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; } result } @@ -208,7 +208,7 @@ pub async fn run_v2_test(test_file: TestFile) { if operation.name == "endSession" { let session = session0.take(); drop(session); - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; continue; } else { operation @@ -220,7 +220,7 @@ pub async fn run_v2_test(test_file: TestFile) { if operation.name == "endSession" { let session = session1.take(); drop(session); - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; continue; } else { operation @@ -336,7 +336,7 @@ pub async fn run_v2_test(test_file: TestFile) { // wait for the transaction in progress to be aborted implicitly when the session is dropped if test.description.as_str() == "implicit abort" { - RUNTIME.delay_for(Duration::from_secs(1)).await; + runtime::delay_for(Duration::from_secs(1)).await; } if let Some(expectations) = test.expectations { diff --git a/src/test/util/event.rs b/src/test/util/event.rs index 5195cd564..cddfab8bf 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -47,8 +47,8 @@ use crate::{ }, }, options::ClientOptions, + runtime, test::{spec::ExpectedEventType, LOCK}, - RUNTIME, }; pub type EventQueue = Arc>>; @@ -393,21 +393,20 @@ impl EventSubscriber<'_> { where F: Fn(&Event) -> bool, { - RUNTIME - .timeout(timeout, async { - loop { - match self.receiver.recv().await { - Ok(event) if filter(&event) => return event.into(), - // the channel hit capacity and the channel will skip a few to catch up. - Err(RecvError::Lagged(_)) => continue, - Err(_) => return None, - _ => continue, - } + runtime::timeout(timeout, async { + loop { + match self.receiver.recv().await { + Ok(event) if filter(&event) => return event.into(), + // the channel hit capacity and the channel will skip a few to catch up. + Err(RecvError::Lagged(_)) => continue, + Err(_) => return None, + _ => continue, } - }) - .await - .ok() - .flatten() + } + }) + .await + .ok() + .flatten() } } diff --git a/src/test/util/failpoint.rs b/src/test/util/failpoint.rs index f87dfb031..38017ad8e 100644 --- a/src/test/util/failpoint.rs +++ b/src/test/util/failpoint.rs @@ -6,9 +6,9 @@ use typed_builder::TypedBuilder; use crate::{ error::Result, operation::append_options, + runtime, selection_criteria::SelectionCriteria, Client, - RUNTIME, }; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -72,7 +72,7 @@ impl Drop for FailPointGuard { let client = self.client.clone(); let name = self.failpoint_name.clone(); - let result = RUNTIME.block_on(async move { + let result = runtime::block_on(async move { client .database("admin") .run_command( diff --git a/tests/connection_snippets.rs b/tests/connection_snippets.rs index 61913c02b..d82e470de 100644 --- a/tests/connection_snippets.rs +++ b/tests/connection_snippets.rs @@ -4,7 +4,7 @@ extern crate mongodb; -#[cfg(feature = "tokio-runtime")] +#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))] mod async_scram { // ASYNC SCRAM CONNECTION EXAMPLE STARTS HERE use mongodb::{options::ClientOptions, Client}; @@ -24,7 +24,7 @@ mod async_scram { // CONNECTION EXAMPLE ENDS HERE } -#[cfg(feature = "tokio-runtime")] +#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))] mod async_x509 { // ASYNC X509 CONNECTION EXAMPLE STARTS HERE use mongodb::{ @@ -57,7 +57,7 @@ mod async_x509 { // CONNECTION EXAMPLE ENDS HERE } -#[cfg(feature = "sync")] +#[cfg(any(feature = "sync", feature = "tokio-sync"))] mod sync_scram { // SYNC SCRAM CONNECTION EXAMPLE STARTS HERE use mongodb::{options::ClientOptions, sync::Client}; @@ -75,7 +75,7 @@ mod sync_scram { // CONNECTION EXAMPLE ENDS HERE } -#[cfg(feature = "sync")] +#[cfg(any(feature = "sync", feature = "tokio-sync"))] mod sync_x509 { // SYNC X509 CONNECTION EXAMPLE STARTS HERE use mongodb::{ diff --git a/tests/readme_examples.rs b/tests/readme_examples.rs index c6dac5481..e44620d6b 100644 --- a/tests/readme_examples.rs +++ b/tests/readme_examples.rs @@ -11,7 +11,7 @@ impl From for Err { #[allow(dead_code)] type Result = std::result::Result; -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _connecting() -> Result<()> { use mongodb::{options::ClientOptions, Client}; @@ -32,7 +32,7 @@ async fn _connecting() -> Result<()> { Ok(()) } -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _getting_handle_to_database(client: mongodb::Client) -> Result<()> { // Get a handle to a database. let db = client.database("mydb"); @@ -45,7 +45,7 @@ async fn _getting_handle_to_database(client: mongodb::Client) -> Result<()> { Ok(()) } -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _inserting_documents_into_a_collection(db: mongodb::Database) -> Result<()> { use mongodb::bson::{doc, Document}; @@ -71,7 +71,7 @@ struct Book { author: String, } -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _inserting_documents_into_a_typed_collection(db: mongodb::Database) -> Result<()> { // Get a handle to a collection of `Book`. let typed_collection = db.collection::("books"); @@ -93,7 +93,7 @@ async fn _inserting_documents_into_a_typed_collection(db: mongodb::Database) -> Ok(()) } -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _finding_documents_into_a_collection( typed_collection: mongodb::Collection, ) -> Result<()> { @@ -114,7 +114,7 @@ async fn _finding_documents_into_a_collection( Ok(()) } -#[cfg(feature = "sync")] +#[cfg(any(feature = "sync", feature = "tokio-sync"))] async fn _using_the_sync_api() -> Result<()> { use mongodb::{bson::doc, sync::Client}; @@ -148,7 +148,7 @@ async fn _using_the_sync_api() -> Result<()> { Ok(()) } -#[cfg(not(feature = "sync"))] +#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] async fn _windows_dns_note() -> Result<()> { use mongodb::{ options::{ClientOptions, ResolverConfig}, diff --git a/tests/transaction_examples.rs b/tests/transaction_examples.rs index aeef81192..aa37acca6 100644 --- a/tests/transaction_examples.rs +++ b/tests/transaction_examples.rs @@ -1,5 +1,5 @@ #![allow(dead_code)] -#![cfg(not(feature = "sync"))] +#![cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] // START TRANSACTIONS EXAMPLE use mongodb::{