Skip to content

Commit

Permalink
fix(pgwire): allow configuring tcp keepalive idle time (#18136)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao authored Aug 22, 2024
1 parent 7ebe0a2 commit fe56ce2
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ mod test {
frontend_opts: Some(
FrontendOpts {
listen_addr: "0.0.0.0:4566",
tcp_keepalive_idle_secs: 300,
advertise_addr: None,
meta_addr: List(
[
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ risingwave_expr_impl::enable!();
mod catalog;

use std::collections::HashSet;
use std::time::Duration;

pub use catalog::TableCatalog;
mod binder;
Expand All @@ -55,6 +56,7 @@ mod observer;
pub mod optimizer;
pub use optimizer::{Explain, OptimizerContext, OptimizerContextRef, PlanRef};
mod planner;
use pgwire::net::TcpKeepalive;
pub use planner::Planner;
mod scheduler;
pub mod session;
Expand Down Expand Up @@ -97,6 +99,11 @@ pub struct FrontendOpts {
#[clap(long, env = "RW_LISTEN_ADDR", default_value = "0.0.0.0:4566")]
pub listen_addr: String,

/// The amount of time with no network activity after which the server will send a
/// TCP keepalive message to the client.
#[clap(long, env = "RW_TCP_KEEPALIVE_IDLE_SECS", default_value = "300")]
pub tcp_keepalive_idle_secs: usize,

/// The address for contacting this instance of the service.
/// This would be synonymous with the service's "public address"
/// or "identifying address".
Expand Down Expand Up @@ -187,6 +194,9 @@ pub fn start(
// slow compile in release mode.
Box::pin(async move {
let listen_addr = opts.listen_addr.clone();
let tcp_keepalive =
TcpKeepalive::new().with_time(Duration::from_secs(opts.tcp_keepalive_idle_secs as _));

let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap());
SESSION_MANAGER.get_or_init(|| session_mgr.clone());
let redact_sql_option_keywords = Arc::new(
Expand All @@ -201,6 +211,7 @@ pub fn start(

pg_serve(
&listen_addr,
tcp_keepalive,
session_mgr.clone(),
TlsConfig::new_default(),
Some(redact_sql_option_keywords),
Expand Down
1 change: 1 addition & 0 deletions src/utils/pgwire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ risingwave_common = { workspace = true }
risingwave_sqlparser = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
socket2 = "0.5"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand Down
11 changes: 10 additions & 1 deletion src/utils/pgwire/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,18 @@ impl Listener {
/// Accepts a new incoming connection from this listener.
///
/// Returns a tuple of the stream and the string representation of the peer address.
pub async fn accept(&self) -> io::Result<(Stream, Address)> {
pub async fn accept(&self, tcp_keepalive: &TcpKeepalive) -> io::Result<(Stream, Address)> {
match self {
Self::Tcp(listener) => {
let (stream, addr) = listener.accept().await?;
stream.set_nodelay(true)?;
// Set TCP keepalive to 5 minutes, which is less than the connection idle timeout of 350 seconds in AWS ELB.
// https://docs.aws.amazon.com/elasticloadbalancing/latest/network/network-load-balancers.html#connection-idle-timeout
#[cfg(not(madsim))]
{
let r = socket2::SockRef::from(&stream);
r.set_tcp_keepalive(tcp_keepalive)?;
}
Ok((Stream::Tcp(stream), Address::Tcp(addr)))
}
Self::Unix(listener) => {
Expand All @@ -88,3 +95,5 @@ impl Listener {
}
}
}

pub use socket2::TcpKeepalive;
6 changes: 4 additions & 2 deletions src/utils/pgwire/src/pg_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use thiserror_ext::AsReport;
use tokio::io::{AsyncRead, AsyncWrite};

use crate::error::{PsqlError, PsqlResult};
use crate::net::{AddressRef, Listener};
use crate::net::{AddressRef, Listener, TcpKeepalive};
use crate::pg_field_descriptor::PgFieldDescriptor;
use crate::pg_message::TransactionStatus;
use crate::pg_protocol::{PgProtocol, TlsConfig};
Expand Down Expand Up @@ -265,6 +265,7 @@ impl UserAuthenticator {
/// Returns when the `shutdown` token is triggered.
pub async fn pg_serve(
addr: &str,
tcp_keepalive: TcpKeepalive,
session_mgr: Arc<impl SessionManager>,
tls_config: Option<TlsConfig>,
redact_sql_option_keywords: Option<RedactSqlOptionKeywordsRef>,
Expand All @@ -291,7 +292,7 @@ pub async fn pg_serve(
let session_mgr_clone = session_mgr.clone();
let f = async move {
loop {
let conn_ret = listener.accept().await;
let conn_ret = listener.accept(&tcp_keepalive).await;
match conn_ret {
Ok((stream, peer_addr)) => {
tracing::info!(%peer_addr, "accept connection");
Expand Down Expand Up @@ -534,6 +535,7 @@ mod tests {
tokio::spawn(async move {
pg_serve(
&bind_addr,
socket2::TcpKeepalive::new(),
Arc::new(session_mgr),
None,
None,
Expand Down

0 comments on commit fe56ce2

Please sign in to comment.