diff --git a/Makefile.toml b/Makefile.toml index ed39ea0fc942..494db8903aec 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -86,9 +86,9 @@ if ${is_not_ci} no_rust_log = not ${rust_log} if ${no_rust_log} - set_env RUST_LOG "pgwire_query_log=info" + set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info" else - set_env RUST_LOG "pgwire_query_log=info,${rust_log}" + set_env RUST_LOG "pgwire_query_log=info,hyper::client::connect::http=info,${rust_log}" end end ''', diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 83e8d8542960..83a3e83e492a 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -243,6 +243,10 @@ pub struct ConfigMap { #[parameter(default = false)] background_ddl: bool, + /// Run DDL statements in background + #[parameter(default = false)] + enable_reusable_source: bool, + /// Shows the server-side character set encoding. At present, this parameter can be shown but not set, because the encoding is determined at database creation time. #[parameter(default = SERVER_ENCODING)] server_encoding: String, diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 19c36baf09c6..b4cea42637ab 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -317,7 +317,7 @@ macro_rules! impl_set_system_param { $( key_of!($field) => { let v = if let Some(v) = value { - v.as_ref().parse().map_err(|_| format!("cannot parse parameter value"))? + v.as_ref().parse().map_err(|e| format!("cannot parse parameter value: {e}"))? } else { $default.ok_or_else(|| format!("{} does not have a default value", key))? }; diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 3eebacca09f9..c0e15bc7554d 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -71,9 +71,9 @@ impl BrokerAddrRewriter { role: PrivateLinkContextRole, broker_rewrite_map: Option>, ) -> ConnectorResult { - tracing::info!("[{}] rewrite map {:?}", role, broker_rewrite_map); let rewrite_map: ConnectorResult> = broker_rewrite_map .map_or(Ok(BTreeMap::new()), |addr_map| { + tracing::info!("[{}] rewrite map {:?}", role, addr_map); addr_map .into_iter() .map(|(old_addr, new_addr)| { diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 96eeb7c1c43e..b6c754031357 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -1417,7 +1417,7 @@ pub async fn handle_create_source( let catalog_writer = session.catalog_writer()?; - if has_streaming_job { + if has_streaming_job && session.config().enable_reusable_source() { let graph = { let context = OptimizerContext::from_handler_args(handler_args); let source_node = LogicalSource::with_catalog( diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index a80973cf6e7f..08bac3bfbb32 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -87,7 +87,7 @@ impl Planner { } pub(super) fn plan_source(&mut self, source: BoundSource) -> Result { - if source.can_backfill() { + if source.can_backfill() && self.ctx().session_ctx().config().enable_reusable_source() { Ok(LogicalSourceBackfill::new(Rc::new(source.catalog), self.ctx())?.into()) } else { Ok(LogicalSource::with_catalog(