diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index b3d4baea765c..ff872b7959dd 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; +use servers::addrs; use snafu::ResultExt; use tokio::sync::{mpsc, Notify}; use tokio::time::Instant; @@ -47,8 +48,7 @@ pub(crate) mod task_tracker; pub struct HeartbeatTask { node_id: u64, node_epoch: u64, - server_addr: String, - server_hostname: Option, + peer_addr: String, running: Arc, meta_client: Arc, region_server: RegionServer, @@ -84,8 +84,7 @@ impl HeartbeatTask { node_id: opts.node_id.unwrap_or(0), // We use datanode's start time millis as the node's epoch. node_epoch: common_time::util::current_time_millis() as u64, - server_addr: opts.grpc.addr.clone(), - server_hostname: Some(opts.grpc.hostname.clone()), + peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), running: Arc::new(AtomicBool::new(false)), meta_client: Arc::new(meta_client), region_server, @@ -183,7 +182,7 @@ impl HeartbeatTask { let interval = self.interval; let node_id = self.node_id; let node_epoch = self.node_epoch; - let addr = resolve_addr(&self.server_addr, &self.server_hostname); + let addr = &self.peer_addr; info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); let meta_client = self.meta_client.clone(); @@ -350,25 +349,6 @@ impl HeartbeatTask { } } -/// Resolves hostname:port address for meta registration -/// -fn resolve_addr(bind_addr: &str, hostname_addr: &Option) -> String { - match hostname_addr { - Some(hostname_addr) => { - // it has port configured - if hostname_addr.contains(':') { - hostname_addr.clone() - } else { - // otherwise, resolve port from bind_addr - // should be safe to unwrap here because bind_addr is already validated - let port = bind_addr.split(':').nth(1).unwrap(); - format!("{hostname_addr}:{port}") - } - } - None => bind_addr.to_owned(), - } -} - /// Create metasrv client instance and spawn heartbeat loop. pub async fn new_metasrv_client( node_id: u64, @@ -404,24 +384,3 @@ pub async fn new_metasrv_client( .context(MetaClientInitSnafu)?; Ok(meta_client) } - -#[cfg(test)] -mod tests { - #[test] - fn test_resolve_addr() { - assert_eq!( - "tomcat:3001", - super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned())) - ); - - assert_eq!( - "tomcat:3002", - super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned())) - ); - - assert_eq!( - "127.0.0.1:3001", - super::resolve_addr("127.0.0.1:3001", &None) - ); - } -} diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 0ccbed35a6b3..48c09bacd5ce 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; @@ -37,7 +38,7 @@ pub mod handler; /// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. #[derive(Clone)] pub struct HeartbeatTask { - server_addr: String, + peer_addr: String, meta_client: Arc, report_interval: u64, retry_interval: u64, @@ -53,7 +54,7 @@ impl HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { HeartbeatTask { - server_addr: opts.grpc.addr.clone(), + peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)), meta_client, report_interval: heartbeat_opts.interval.as_millis() as u64, retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, @@ -129,7 +130,7 @@ impl HeartbeatTask { let self_peer = Some(Peer { // The peer id doesn't make sense for frontend, so we just set it 0. id: 0, - addr: self.server_addr.clone(), + addr: self.peer_addr.clone(), }); common_runtime::spawn_hb(async move { diff --git a/src/servers/src/addrs.rs b/src/servers/src/addrs.rs new file mode 100644 index 000000000000..8a0c478caf6b --- /dev/null +++ b/src/servers/src/addrs.rs @@ -0,0 +1,40 @@ +/// Resolves hostname:port address for meta registration. +/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise. +/// +pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String { + match hostname_addr { + Some(hostname_addr) => { + // it has port configured + if hostname_addr.contains(':') { + hostname_addr.to_string() + } else { + // otherwise, resolve port from bind_addr + // should be safe to unwrap here because bind_addr is already validated + let port = bind_addr.split(':').nth(1).unwrap(); + format!("{hostname_addr}:{port}") + } + } + None => bind_addr.to_string(), + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_resolve_addr() { + assert_eq!( + "tomcat:3001", + super::resolve_addr("127.0.0.1:3001", Some("tomcat")) + ); + + assert_eq!( + "tomcat:3002", + super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002")) + ); + + assert_eq!( + "127.0.0.1:3001", + super::resolve_addr("127.0.0.1:3001", None) + ); + } +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index da7490a16d82..85ebea4ca8a3 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -20,6 +20,7 @@ use datatypes::schema::Schema; use query::plan::LogicalPlan; +pub mod addrs; pub mod configurator; pub mod error; pub mod export_metrics;