Skip to content

Commit

Permalink
fix: frontend registration address is wrong, GreptimeTeam#4186
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Jun 24, 2024
1 parent 5566dd7 commit 00f163f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 48 deletions.
49 changes: 4 additions & 45 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use servers::addrs;
use snafu::ResultExt;
use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;
Expand All @@ -47,8 +48,7 @@ pub(crate) mod task_tracker;
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
server_addr: String,
server_hostname: Option<String>,
peer_addr: String,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
region_server: RegionServer,
Expand Down Expand Up @@ -84,8 +84,7 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr: opts.grpc.addr.clone(),
server_hostname: Some(opts.grpc.hostname.clone()),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
region_server,
Expand Down Expand Up @@ -183,7 +182,7 @@ impl HeartbeatTask {
let interval = self.interval;
let node_id = self.node_id;
let node_epoch = self.node_epoch;
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let addr = &self.peer_addr;
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

let meta_client = self.meta_client.clone();
Expand Down Expand Up @@ -350,25 +349,6 @@ impl HeartbeatTask {
}
}

/// Resolves hostname:port address for meta registration
///
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.clone()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_owned(),
}
}

/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
node_id: u64,
Expand Down Expand Up @@ -404,24 +384,3 @@ pub async fn new_metasrv_client(
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", &None)
);
}
}
7 changes: 4 additions & 3 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand All @@ -37,7 +38,7 @@ pub mod handler;
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {
server_addr: String,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
Expand All @@ -53,7 +54,7 @@ impl HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
server_addr: opts.grpc.addr.clone(),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
Expand Down Expand Up @@ -129,7 +130,7 @@ impl HeartbeatTask {
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
id: 0,
addr: self.server_addr.clone(),
addr: self.peer_addr.clone(),
});

common_runtime::spawn_hb(async move {
Expand Down
40 changes: 40 additions & 0 deletions src/servers/src/addrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/// Resolves hostname:port address for meta registration.
/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise.
///
pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.to_string()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_string(),
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", Some("tomcat"))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002"))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", None)
);
}
}
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use datatypes::schema::Schema;
use query::plan::LogicalPlan;

pub mod addrs;
pub mod configurator;
pub mod error;
pub mod export_metrics;
Expand Down

0 comments on commit 00f163f

Please sign in to comment.