Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-995 Add tokio-sync feature flag #578

Merged
merged 12 commits into from
Mar 9, 2022
1 change: 1 addition & 0 deletions .evergreen/check-clippy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ cargo clippy --all-targets -p mongodb -- -D warnings
cargo clippy --all-targets -p mongodb --features zstd-compression,snappy-compression,zlib-compression -- -D warnings
cargo clippy --all-targets --no-default-features --features async-std-runtime -p mongodb -- -D warnings
cargo clippy --all-targets --no-default-features --features sync -p mongodb -- -D warnings
cargo clippy --all-targets --features tokio-sync -p mongodb -- -D warnings
1 change: 1 addition & 0 deletions .evergreen/check-rustdoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ set -o errexit
cargo +nightly rustdoc -- -D warnings --cfg docsrs
cargo +nightly rustdoc --no-default-features --features async-std-runtime -- -D warnings --cfg docsrs
cargo +nightly rustdoc --no-default-features --features sync -- -D warnings --cfg docsrs
cargo +nightly rustdoc --features tokio-sync -- -D warnings --cfg docsrs
1 change: 1 addition & 0 deletions .evergreen/compile-only-tokio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ if [[ $RUST_VERSION == "nightly" ]]; then
fi

rustup run $RUST_VERSION cargo build --features $FEATURE_FLAGS
rustup run $RUST_VERSION cargo build --features tokio-sync,$FEATURE_FLAGS
8 changes: 7 additions & 1 deletion .evergreen/run-tokio-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ FEATURE_FLAGS="zstd-compression,snappy-compression,zlib-compression"
echo "cargo test options: --features $FEATURE_FLAGS ${OPTIONS}"

RUST_BACKTRACE=1 cargo test --features $FEATURE_FLAGS $OPTIONS | tee results.json
cat results.json | cargo2junit > results.xml
cat results.json | cargo2junit > async-tests.xml
RUST_BACKTRACE=1 cargo test sync --features tokio-sync,$FEATURE_FLAGS $OPTIONS | tee sync-tests.json
cat sync-tests.json | cargo2junit > sync-tests.xml
RUST_BACKTRACE=1 cargo test --doc sync --features tokio-sync,$FEATURE_FLAGS $OPTIONS | tee sync-doc-tests.json
cat sync-doc-tests.json | cargo2junit > sync-doc-tests.xml

junit-report-merger results.xml async-tests.xml sync-tests.xml sync-doc-tests.xml
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ default = ["tokio-runtime"]
tokio-runtime = ["tokio/macros", "tokio/net", "tokio/rt", "tokio/time", "serde_bytes"]
async-std-runtime = ["async-std", "async-std/attributes", "async-std-resolver", "tokio-util/compat"]
sync = ["async-std-runtime"]
tokio-sync = ["tokio-runtime"]

# Enable support for v0.4 of the chrono crate in the public API of the BSON library.
bson-chrono-0_4 = ["bson/chrono-0_4"]
Expand Down
2 changes: 1 addition & 1 deletion src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
/// A `ChangeStream` can be iterated like any other [`Stream`]:
///
/// ```
/// # #[cfg(not(feature = "sync"))]
/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))]
/// # use futures::stream::StreamExt;
/// # use mongodb::{Client, error::Result, bson::doc,
/// # change_stream::event::ChangeStreamEvent};
Expand Down
2 changes: 1 addition & 1 deletion src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ pub struct Credential {
}

impl Credential {
#[cfg(all(test, not(feature = "sync")))]
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) fn into_document(mut self) -> Document {
use crate::bson::Bson;

Expand Down
6 changes: 3 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
/// so it can safely be shared across threads or async tasks. For example:
///
/// ```rust
/// # #[cfg(not(feature = "sync"))]
/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))]
/// # use mongodb::{bson::Document, Client, error::Result};
/// # #[cfg(feature = "async-std-runtime")]
/// # use async_std::task;
/// # #[cfg(feature = "tokio-runtime")]
/// # use tokio::task;
/// #
/// # #[cfg(not(feature = "sync"))]
/// # #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))]
/// # async fn start_workers() -> Result<()> {
/// let client = Client::with_uri_str("mongodb://example.com").await?;
///
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Client {
}
}

#[cfg(all(test, not(feature = "sync")))]
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) async fn get_hosts(&self) -> Vec<String> {
let servers = self.inner.topology.servers().await;
servers
Expand Down
25 changes: 14 additions & 11 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(all(test, not(feature = "sync")))]
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
mod test;

mod resolver_config;
Expand Down Expand Up @@ -54,6 +54,9 @@ use crate::{
srv::{OriginalSrvInfo, SrvResolver},
};

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
use crate::runtime;

pub use resolver_config::ResolverConfig;

const DEFAULT_PORT: u16 = 27017;
Expand Down Expand Up @@ -244,7 +247,7 @@ impl ServerAddress {
})
}

#[cfg(all(test, not(feature = "sync")))]
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) fn into_document(self) -> Document {
match self {
Self::Tcp { host, port } => {
Expand Down Expand Up @@ -1038,17 +1041,17 @@ impl ClientOptions {
///
/// Note: if the `sync` feature is enabled, then this method will be replaced with [the sync
/// version](#method.parse-1).
#[cfg(not(feature = "sync"))]
#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))]
pub async fn parse(s: impl AsRef<str>) -> Result<Self> {
Self::parse_uri(s, None).await
}

/// This method will be present if the `sync` feature is enabled. It's otherwise identical to
/// [the async version](#method.parse)
#[cfg(any(feature = "sync", docsrs))]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
#[cfg(any(feature = "sync", feature = "tokio-sync", docsrs))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "tokio-sync"))))]
pub fn parse(s: impl AsRef<str>) -> Result<Self> {
crate::RUNTIME.block_on(Self::parse_uri(s.as_ref(), None))
runtime::block_on(Self::parse_uri(s.as_ref(), None))
}

/// Parses a MongoDB connection string into a `ClientOptions` struct.
Expand All @@ -1065,7 +1068,7 @@ impl ClientOptions {
///
/// Note: if the `sync` feature is enabled, then this method will be replaced with [the sync
/// version](#method.parse_with_resolver_config-1).
#[cfg(not(feature = "sync"))]
#[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))]
pub async fn parse_with_resolver_config(
uri: impl AsRef<str>,
resolver_config: ResolverConfig,
Expand All @@ -1075,10 +1078,10 @@ impl ClientOptions {

/// This method will be present if the `sync` feature is enabled. It's otherwise identical to
/// [the async version](#method.parse_with_resolver_config)
#[cfg(any(feature = "sync", docsrs))]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
#[cfg(any(feature = "sync", feature = "tokio-sync", docsrs))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "tokio-sync"))))]
pub fn parse_with_resolver_config(uri: &str, resolver_config: ResolverConfig) -> Result<Self> {
crate::RUNTIME.block_on(Self::parse_uri(uri, Some(resolver_config)))
runtime::block_on(Self::parse_uri(uri, Some(resolver_config)))
}

/// Populate this `ClientOptions` from the given URI, optionally using the resolver config for
Expand Down Expand Up @@ -2015,7 +2018,7 @@ impl ClientOptionsParser {
}
}

#[cfg(all(test, not(feature = "sync")))]
#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
mod tests {
use std::time::Duration;

Expand Down
6 changes: 3 additions & 3 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use crate::{
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
runtime,
sdam::{ServerInfo, TransactionSupportStatus},
selection_criteria::SelectionCriteria,
Client,
RUNTIME,
};
pub use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;
Expand Down Expand Up @@ -607,14 +607,14 @@ impl Drop for ClientSession {
snapshot_time: self.snapshot_time,
operation_time: self.operation_time,
};
RUNTIME.execute(async move {
runtime::execute(async move {
let mut session: ClientSession = dropped_session.into();
let _result = session.abort_transaction().await;
});
} else {
let client = self.client.clone();
let server_session = self.server_session.clone();
RUNTIME.execute(async move {
runtime::execute(async move {
client.check_in_server_session(server_session).await;
});
}
Expand Down
14 changes: 7 additions & 7 deletions src/client/session/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use crate::{
coll::options::{CountOptions, InsertManyOptions},
error::Result,
options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern},
runtime,
sdam::ServerInfo,
selection_criteria::SelectionCriteria,
test::{log_uncaptured, EventClient, TestClient, CLIENT_OPTIONS, LOCK},
Collection,
RUNTIME,
};

/// Macro defining a closure that returns a future populated by an operation on the
Expand Down Expand Up @@ -219,10 +219,10 @@ async fn pool_is_lifo() {
// End both sessions, waiting after each to ensure the background task got scheduled
// in the Drop impls.
drop(a);
RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;

drop(b);
RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;

let s1 = client.start_session(None).await.unwrap();
assert_eq!(s1.id(), &b_id);
Expand Down Expand Up @@ -368,7 +368,7 @@ async fn implicit_session_returned_after_immediate_exhaust() {
.expect("insert should succeed");

// wait for sessions to be returned to the pool and clear them out.
RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;
client.clear_session_pool().await;

let mut cursor = coll.find(doc! {}, None).await.expect("find should succeed");
Expand All @@ -382,7 +382,7 @@ async fn implicit_session_returned_after_immediate_exhaust() {
.as_document()
.expect("session id should be a document");

RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;
assert!(
client.is_session_checked_in(session_id).await,
"session not checked back in"
Expand Down Expand Up @@ -413,7 +413,7 @@ async fn implicit_session_returned_after_exhaust_by_get_more() {
}

// wait for sessions to be returned to the pool and clear them out.
RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;
client.clear_session_pool().await;

let options = FindOptions::builder().batch_size(3).build();
Expand All @@ -434,7 +434,7 @@ async fn implicit_session_returned_after_exhaust_by_get_more() {
.as_document()
.expect("session id should be a document");

RUNTIME.delay_for(Duration::from_millis(250)).await;
runtime::delay_for(Duration::from_millis(250)).await;
assert!(
client.is_session_checked_in(session_id).await,
"session not checked back in"
Expand Down
5 changes: 4 additions & 1 deletion src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
sdam::Topology,
};

#[cfg(feature = "tokio-runtime")]
#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))]
const RUNTIME_NAME: &str = "tokio";

#[cfg(all(feature = "async-std-runtime", not(feature = "sync")))]
Expand All @@ -27,6 +27,9 @@ const RUNTIME_NAME: &str = "async-std";
#[cfg(feature = "sync")]
const RUNTIME_NAME: &str = "sync (with async-std)";

#[cfg(feature = "tokio-sync")]
const RUNTIME_NAME: &str = "sync (with tokio)";

#[derive(Clone, Debug)]
struct ClientMetadata {
application: Option<AppMetadata>,
Expand Down
29 changes: 14 additions & 15 deletions src/cmap/test/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use serde::{de::Unexpected, Deserialize, Deserializer};

use crate::{event::cmap::*, options::ServerAddress, RUNTIME};
use crate::{event::cmap::*, options::ServerAddress, runtime};
use tokio::sync::broadcast::error::{RecvError, SendError};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -98,21 +98,20 @@ impl EventSubscriber<'_> {
where
F: Fn(&Event) -> bool,
{
RUNTIME
.timeout(timeout, async {
loop {
match self.receiver.recv().await {
Ok(event) if filter(&event) => return event.into(),
// the channel hit capacity and the channnel will skip a few to catch up.
Err(RecvError::Lagged(_)) => continue,
Err(_) => return None,
_ => continue,
}
runtime::timeout(timeout, async {
loop {
match self.receiver.recv().await {
Ok(event) if filter(&event) => return event.into(),
// the channel hit capacity and the channnel will skip a few to catch up.
Err(RecvError::Lagged(_)) => continue,
Err(_) => return None,
_ => continue,
}
})
.await
.ok()
.flatten()
}
})
.await
.ok()
.flatten()
}

/// Returns the received events without waiting for any more.
Expand Down
10 changes: 4 additions & 6 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
cmap::{options::ConnectionPoolOptions, Command, ConnectionPool},
event::cmap::{CmapEventHandler, ConnectionClosedReason},
operation::CommandResponse,
runtime,
sdam::ServerUpdateSender,
selection_criteria::ReadPreference,
test::{
Expand All @@ -21,7 +22,6 @@ use crate::{
CLIENT_OPTIONS,
LOCK,
},
RUNTIME,
};
use semver::VersionReq;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -130,11 +130,9 @@ async fn concurrent_connections() {

let tasks = (0..2).map(|_| {
let pool_clone = pool.clone();
RUNTIME
.spawn(async move {
pool_clone.check_out().await.unwrap();
})
.unwrap()
runtime::spawn(async move {
pool_clone.check_out().await.unwrap();
})
});
futures::future::join_all(tasks).await;

Expand Down
Loading