Skip to content

Commit

Permalink
fix: wrong frontend registration address (#4199)
Browse files Browse the repository at this point in the history
* fix: frontend registration address is wrong, #4186

* fix: license header

* chore: adds hostname to frontend grpc

* fix: forgot run make config-docs

* chore: warn when using bind_addr

* fix: flow node heartbeat carrying address
  • Loading branch information
killme2008 authored Jun 26, 2024
1 parent 948c869 commit a779cb3
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 53 deletions.
3 changes: 2 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>Support the following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
Expand Down Expand Up @@ -314,7 +315,7 @@
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname to advertise to the metasrv. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
Expand Down
3 changes: 2 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ rpc_max_send_message_size = "512MB"
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:3001"
## The hostname to advertise to the metasrv.
## The hostname advertised to the metasrv,
## and used for connections from outside the host
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 8
Expand Down
3 changes: 3 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ body_limit = "64MB"
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:4001"
## The hostname advertised to the metasrv,
## and used for connections from outside the host
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 8

Expand Down
49 changes: 4 additions & 45 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use servers::addrs;
use snafu::ResultExt;
use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;
Expand All @@ -47,8 +48,7 @@ pub(crate) mod task_tracker;
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
server_addr: String,
server_hostname: Option<String>,
peer_addr: String,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
region_server: RegionServer,
Expand Down Expand Up @@ -84,8 +84,7 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr: opts.grpc.addr.clone(),
server_hostname: Some(opts.grpc.hostname.clone()),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
region_server,
Expand Down Expand Up @@ -183,7 +182,7 @@ impl HeartbeatTask {
let interval = self.interval;
let node_id = self.node_id;
let node_epoch = self.node_epoch;
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let addr = &self.peer_addr;
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

let meta_client = self.meta_client.clone();
Expand Down Expand Up @@ -350,25 +349,6 @@ impl HeartbeatTask {
}
}

/// Resolves hostname:port address for meta registration
///
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.clone()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_owned(),
}
}

/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
node_id: u64,
Expand Down Expand Up @@ -404,24 +384,3 @@ pub async fn new_metasrv_client(
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", &None)
);
}
}
7 changes: 4 additions & 3 deletions src/flow/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand All @@ -37,7 +38,7 @@ use crate::{Error, FlownodeOptions};
#[derive(Clone)]
pub struct HeartbeatTask {
node_id: u64,
server_addr: String,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: Duration,
retry_interval: Duration,
Expand All @@ -53,7 +54,7 @@ impl HeartbeatTask {
) -> Self {
Self {
node_id: opts.node_id.unwrap_or(0),
server_addr: opts.grpc.addr.clone(),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
meta_client,
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
Expand Down Expand Up @@ -110,7 +111,7 @@ impl HeartbeatTask {
let report_interval = self.report_interval;
let self_peer = Some(Peer {
id: self.node_id,
addr: self.server_addr.clone(),
addr: self.peer_addr.clone(),
});

common_runtime::spawn_hb(async move {
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand All @@ -37,7 +38,7 @@ pub mod handler;
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {
server_addr: String,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
Expand All @@ -53,7 +54,7 @@ impl HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
server_addr: opts.grpc.addr.clone(),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
Expand Down Expand Up @@ -129,7 +130,7 @@ impl HeartbeatTask {
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
id: 0,
addr: self.server_addr.clone(),
addr: self.peer_addr.clone(),
});

common_runtime::spawn_hb(async move {
Expand Down
58 changes: 58 additions & 0 deletions src/servers/src/addrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::warn;

/// Resolves hostname:port address for meta registration.
/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise.
pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.to_string()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => {
warn!("hostname not set, using bind_addr: {bind_addr} instead.");
bind_addr.to_string()
}
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", Some("tomcat"))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002"))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", None)
);
}
}
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use datatypes::schema::Schema;
use query::plan::LogicalPlan;

pub mod addrs;
pub mod configurator;
pub mod error;
pub mod export_metrics;
Expand Down

0 comments on commit a779cb3

Please sign in to comment.