Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce configurable DNS resolution timeout #1113

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl Cluster {

let mut metadata_reader = MetadataReader::new(
known_nodes,
pool_config.hostname_resolution_timeout,
control_connection_repair_sender,
pool_config.connection_config.clone(),
pool_config.keepalive_interval,
Expand Down
7 changes: 6 additions & 1 deletion scylla/src/transport/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub(crate) struct PoolConfig {
pub(crate) pool_size: PoolSize,
pub(crate) can_use_shard_aware_port: bool,
pub(crate) keepalive_interval: Option<Duration>,
pub(crate) hostname_resolution_timeout: Option<Duration>,
}

impl Default for PoolConfig {
Expand All @@ -69,6 +70,7 @@ impl Default for PoolConfig {
pool_size: Default::default(),
can_use_shard_aware_port: true,
keepalive_interval: None,
hostname_resolution_timeout: None,
}
}
}
Expand Down Expand Up @@ -865,6 +867,7 @@ impl PoolRefiller {
mut endpoint: UntranslatedEndpoint,
) -> impl Future<Output = UntranslatedEndpoint> {
let cloud_config = self.pool_config.connection_config.cloud_config.clone();
let hostname_resolution_timeout = self.pool_config.hostname_resolution_timeout;
async move {
if let Some(cloud_config) = cloud_config {
// If we operate in the serverless Cloud, then we substitute every node's address
Expand All @@ -879,7 +882,9 @@ impl PoolRefiller {
if let Some(dc) = datacenter.as_deref() {
if let Some(dc_config) = cloud_config.get_datacenters().get(dc) {
let hostname = dc_config.get_server();
if let Ok(resolved) = resolve_hostname(hostname).await {
if let Ok(resolved) =
resolve_hostname(hostname, hostname_resolution_timeout).await
{
*address = NodeAddr::Untranslatable(resolved)
} else {
warn!(
Expand Down
51 changes: 40 additions & 11 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use itertools::Itertools;
use tokio::net::lookup_host;
use thiserror::Error;
use tokio::net::{lookup_host, ToSocketAddrs};
use tracing::warn;
use uuid::Uuid;

Expand All @@ -13,6 +14,7 @@ use crate::transport::errors::{ConnectionPoolError, QueryError};
use std::fmt::Display;
use std::io;
use std::net::IpAddr;
use std::time::Duration;
use std::{
hash::{Hash, Hasher},
net::SocketAddr,
Expand Down Expand Up @@ -267,27 +269,53 @@ pub(crate) struct ResolvedContactPoint {
pub(crate) datacenter: Option<String>,
}

#[derive(Error, Debug)]
pub(crate) enum DnsLookupError {
#[error("Failed to perform DNS lookup within {0}ms")]
Timeout(u128),
#[error("Empty address list returned by DNS for {0}")]
EmptyAddressListForHost(String),
#[error(transparent)]
IoError(#[from] io::Error),
}

/// Performs a DNS lookup with provided optional timeout.
async fn lookup_host_with_timeout<T: ToSocketAddrs>(
host: T,
hostname_resolution_timeout: Option<Duration>,
) -> Result<impl Iterator<Item = SocketAddr>, DnsLookupError> {
Comment on lines +282 to +286
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use impl ToSocketAddrs, as it's more concise and we get nothing with explicit type parameter here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, I'm suprised that such a trait exists

if let Some(timeout) = hostname_resolution_timeout {
match tokio::time::timeout(timeout, lookup_host(host)).await {
Ok(res) => res.map_err(Into::into),
// Elapsed error from tokio library does not provide any context.
Err(_) => Err(DnsLookupError::Timeout(timeout.as_millis())),
}
} else {
lookup_host(host).await.map_err(Into::into)
}
}

// Resolve the given hostname using a DNS lookup if necessary.
// The resolution may return multiple IPs and the function returns one of them.
// It prefers to return IPv4s first, and only if there are none, IPv6s.
pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::Error> {
let addrs = match lookup_host(hostname).await {
pub(crate) async fn resolve_hostname(
hostname: &str,
hostname_resolution_timeout: Option<Duration>,
) -> Result<SocketAddr, DnsLookupError> {
let addrs = match lookup_host_with_timeout(hostname, hostname_resolution_timeout).await {
Ok(addrs) => itertools::Either::Left(addrs),
// Use a default port in case of error, but propagate the original error on failure
Err(e) => {
let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?;
let addrs = lookup_host_with_timeout((hostname, 9042), hostname_resolution_timeout)
.await
.or(Err(e))?;
itertools::Either::Right(addrs)
}
Comment on lines +305 to 313
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the timeout is effectively twice the value provided in the config...
I'm not convinced it's correct. This must be at least noted in documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... you are right. I wonder whether we should leave it as is, or rather recompute the timeout for the second try in case first failed.

    let deadline = hostname_resolution_timeout.map(|dur| tokio::time::Instant::now() + dur);
    let addrs = match lookup_host_with_timeout(hostname, hostname_resolution_timeout).await {
        Ok(addrs) => itertools::Either::Left(addrs),
        // Use a default port in case of error, but propagate the original error on failure
        Err(e) => {
            let new_timeout =
                deadline.map(|deadline| deadline.duration_since(tokio::time::Instant::now()));
            let addrs = lookup_host_with_timeout((hostname, 9042), new_timeout)
                .await
                .or(Err(e))?;
            itertools::Either::Right(addrs)
        }
    };

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first attempt failed from reasons different that a timeout, then such recomputation makes perfect sense. However, if it failed due to a timeout (because the wrong port somehow caused a timeout - idk how likely this scenario is), then we won't be able to do the second attempt.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas @Lorak-mmk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also execute resolve_hostname function with a given timeout (i.e. tokio::time::timeout(t, resolve_hostname)). It's much easier and cleaner than recomputing the timeout. However, in such case it's still true that if first attempt takes too long, we won't be able to do a second attempt.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can it happen that the port is not specified and what is the expected behavior of lookup_host in this case?

If it always fails immediately with a specific error, then we can check this error and perform lookup with default port with the same timeout.

If it can timeout, or take some time in general, then I'd perform another lookup with the same timeout and document it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my investigation, it looks like port is not used in the actual DNS lookup (which makes sense).

What's the difference between String and (String, u16), then?
When String is provided, both tokio and std::net expect that it is of form "host:port". If it's not the case, the error is returned immidiately. Otherwise, host part is used in DNS lookup which may timeout.

When (String, u16) is provided, the DNS lookup is performed based on the string - port is ignored during lookup (same as above).

This means that:

  • if hostname is of form "addr:port", then the first lookup timeouts iff the second lookup (with default port) timeouts
  • if hostname is not of this form, then the first lookup will fail immidiately. We can try with default port.

In that case, I think that we can match the error returned from lookup_host_with_timeout If it's something other than DnsLookupError::Timeout(_), then we can try again with the same timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let's describe this behavior in the docs.

};

addrs
.find_or_last(|addr| matches!(addr, SocketAddr::V4(_)))
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("Empty address list returned by DNS for {}", hostname),
)
})
.ok_or_else(|| DnsLookupError::EmptyAddressListForHost(hostname.to_owned()))
}

/// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s.
Expand All @@ -296,6 +324,7 @@ pub(crate) async fn resolve_hostname(hostname: &str) -> Result<SocketAddr, io::E
/// In case of a plain IP address, parses it and uses straight.
pub(crate) async fn resolve_contact_points(
known_nodes: &[InternalKnownNode],
hostname_resolution_timeout: Option<Duration>,
) -> (Vec<ResolvedContactPoint>, Vec<String>) {
// Find IP addresses of all known nodes passed in the config
let mut initial_peers: Vec<ResolvedContactPoint> = Vec::with_capacity(known_nodes.len());
Expand Down Expand Up @@ -323,7 +352,7 @@ pub(crate) async fn resolve_contact_points(
let resolve_futures = to_resolve
.into_iter()
.map(|(hostname, datacenter)| async move {
match resolve_hostname(hostname).await {
match resolve_hostname(hostname, hostname_resolution_timeout).await {
Ok(address) => Some(ResolvedContactPoint {
address,
datacenter,
Expand Down
6 changes: 6 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ pub struct SessionConfig {
/// It is true by default but can be disabled if successive schema-altering statements should be performed.
pub refresh_metadata_on_auto_schema_agreement: bool,

/// DNS hostname resolution timeout.
/// If `None`, the driver will wait for hostname resolution indefinitely.
pub hostname_resolution_timeout: Option<Duration>,

/// The address translator is used to translate addresses received from ScyllaDB nodes
/// (either with cluster metadata or with an event) to addresses that can be used to
/// actually connect to those nodes. This may be needed e.g. when there is NAT
Expand Down Expand Up @@ -380,6 +384,7 @@ impl SessionConfig {
ssl_context: None,
authenticator: None,
connect_timeout: Duration::from_secs(5),
hostname_resolution_timeout: Some(Duration::from_secs(5)),
connection_pool_size: Default::default(),
disallow_shard_aware_port: false,
keyspaces_to_fetch: Vec::new(),
Expand Down Expand Up @@ -1095,6 +1100,7 @@ where
pool_size: config.connection_pool_size,
can_use_shard_aware_port: !config.disallow_shard_aware_port,
keepalive_interval: config.keepalive_interval,
hostname_resolution_timeout: config.hostname_resolution_timeout,
};

let cluster = Cluster::new(
Expand Down
21 changes: 21 additions & 0 deletions scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,27 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
self
}

/// Changes DNS hostname resolution timeout.
/// The default is 5 seconds.
///
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # use std::time::Duration;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .hostname_resolution_timeout(Duration::from_secs(10))
/// .build() // Turns SessionBuilder into Session
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn hostname_resolution_timeout(mut self, duration: Duration) -> Self {
self.config.hostname_resolution_timeout = Some(duration);
self
}

Comment on lines +809 to +829
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this should accept Option<Duration>, especially that the default is Some. Now it's impossible to set no timeout with SessionBuilder's API.

Copy link
Contributor Author

@muzarski muzarski Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to be consistent with other optional timeout options. See for example SessionBuilder::keepalive_interval() It accepts Duration, and sets the corresponding option to Some(duration) as well.

If user wants to set any of these to None, he can do so by direct access to the field (fields are public):

sb.config.hostname_resolution_timeout = None;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I dislike the idea of SessionBuilder exposing only subset of possible configuration options using methods, with some being available only through fields.
WDYT @Lorak-mmk ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could expose another methods (disable_hostname_resolution_timeout, disable_keepalive) which would be a bit more descriptive.

/// Sets the host filter. The host filter decides whether any connections
/// should be opened to the node or not. The driver will also avoid
/// those nodes when re-establishing the control connection.
Expand Down
17 changes: 14 additions & 3 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint};
pub(crate) struct MetadataReader {
connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
hostname_resolution_timeout: Option<Duration>,

control_connection_endpoint: UntranslatedEndpoint,
control_connection: NodeConnectionPool,
Expand Down Expand Up @@ -470,6 +471,7 @@ impl MetadataReader {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn new(
initial_known_nodes: Vec<InternalKnownNode>,
hostname_resolution_timeout: Option<Duration>,
control_connection_repair_requester: broadcast::Sender<()>,
mut connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
Expand All @@ -479,7 +481,7 @@ impl MetadataReader {
host_filter: &Option<Arc<dyn HostFilter>>,
) -> Result<Self, NewSessionError> {
let (initial_peers, resolved_hostnames) =
resolve_contact_points(&initial_known_nodes).await;
resolve_contact_points(&initial_known_nodes, hostname_resolution_timeout).await;
// Ensure there is at least one resolved node
if initial_peers.is_empty() {
return Err(NewSessionError::FailedToResolveAnyHostname(
Expand All @@ -503,13 +505,15 @@ impl MetadataReader {
control_connection_endpoint.clone(),
connection_config.clone(),
keepalive_interval,
hostname_resolution_timeout,
control_connection_repair_requester.clone(),
);

Ok(MetadataReader {
control_connection_endpoint,
control_connection,
keepalive_interval,
hostname_resolution_timeout,
connection_config,
known_peers: initial_peers
.into_iter()
Expand Down Expand Up @@ -570,8 +574,11 @@ impl MetadataReader {
// If no known peer is reachable, try falling back to initial contact points, in hope that
// there are some hostnames there which will resolve to reachable new addresses.
warn!("Failed to establish control connection and fetch metadata on all known peers. Falling back to initial contact points.");
let (initial_peers, _hostnames) =
resolve_contact_points(&self.initial_known_nodes).await;
let (initial_peers, _hostnames) = resolve_contact_points(
&self.initial_known_nodes,
self.hostname_resolution_timeout,
)
.await;
result = self
.retry_fetch_metadata_on_nodes(
initial,
Expand Down Expand Up @@ -630,6 +637,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.hostname_resolution_timeout,
self.control_connection_repair_requester.clone(),
);

Expand Down Expand Up @@ -730,6 +738,7 @@ impl MetadataReader {
self.control_connection_endpoint.clone(),
self.connection_config.clone(),
self.keepalive_interval,
self.hostname_resolution_timeout,
self.control_connection_repair_requester.clone(),
);
}
Expand All @@ -741,11 +750,13 @@ impl MetadataReader {
endpoint: UntranslatedEndpoint,
connection_config: ConnectionConfig,
keepalive_interval: Option<Duration>,
hostname_resolution_timeout: Option<Duration>,
refresh_requester: broadcast::Sender<()>,
) -> NodeConnectionPool {
let pool_config = PoolConfig {
connection_config,
keepalive_interval,
hostname_resolution_timeout,

// We want to have only one connection to receive events from
pool_size: PoolSize::PerHost(NonZeroUsize::new(1).unwrap()),
Expand Down
Loading