Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wrong frontend registration address #4199

Merged
merged 6 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>Support the following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
Expand Down Expand Up @@ -314,7 +315,7 @@
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname to advertise to the metasrv. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
Expand Down
3 changes: 2 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ rpc_max_send_message_size = "512MB"
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:3001"
## The hostname to advertise to the metasrv.
## The hostname advertised to the metasrv,
## and used for connections from outside the host
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 8
Expand Down
3 changes: 3 additions & 0 deletions config/frontend.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ body_limit = "64MB"
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:4001"
## The hostname advertised to the metasrv,
## and used for connections from outside the host
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 8

Expand Down
49 changes: 4 additions & 45 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use servers::addrs;
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
use snafu::ResultExt;
use tokio::sync::{mpsc, Notify};
use tokio::time::Instant;
Expand All @@ -47,8 +48,7 @@ pub(crate) mod task_tracker;
pub struct HeartbeatTask {
node_id: u64,
node_epoch: u64,
server_addr: String,
server_hostname: Option<String>,
peer_addr: String,
running: Arc<AtomicBool>,
meta_client: Arc<MetaClient>,
region_server: RegionServer,
Expand Down Expand Up @@ -84,8 +84,7 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr: opts.grpc.addr.clone(),
server_hostname: Some(opts.grpc.hostname.clone()),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
region_server,
Expand Down Expand Up @@ -183,7 +182,7 @@ impl HeartbeatTask {
let interval = self.interval;
let node_id = self.node_id;
let node_epoch = self.node_epoch;
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let addr = &self.peer_addr;
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");

let meta_client = self.meta_client.clone();
Expand Down Expand Up @@ -350,25 +349,6 @@ impl HeartbeatTask {
}
}

/// Resolves hostname:port address for meta registration
///
fn resolve_addr(bind_addr: &str, hostname_addr: &Option<String>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.clone()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_owned(),
}
}

/// Create metasrv client instance and spawn heartbeat loop.
pub async fn new_metasrv_client(
node_id: u64,
Expand Down Expand Up @@ -404,24 +384,3 @@ pub async fn new_metasrv_client(
.context(MetaClientInitSnafu)?;
Ok(meta_client)
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat".to_owned()))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", &Some("tomcat:3002".to_owned()))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", &None)
);
}
}
7 changes: 4 additions & 3 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMess
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
use servers::heartbeat_options::HeartbeatOptions;
use snafu::ResultExt;
use tokio::sync::mpsc;
Expand All @@ -37,7 +38,7 @@ pub mod handler;
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {
server_addr: String,
peer_addr: String,
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
Expand All @@ -53,7 +54,7 @@ impl HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
server_addr: opts.grpc.addr.clone(),
peer_addr: addrs::resolve_addr(&opts.grpc.addr, Some(&opts.grpc.hostname)),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
Expand Down Expand Up @@ -129,7 +130,7 @@ impl HeartbeatTask {
let self_peer = Some(Peer {
// The peer id doesn't make sense for frontend, so we just set it 0.
id: 0,
addr: self.server_addr.clone(),
addr: self.peer_addr.clone(),
});

common_runtime::spawn_hb(async move {
Expand Down
54 changes: 54 additions & 0 deletions src/servers/src/addrs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Resolves hostname:port address for meta registration.
/// If `hostname_addr` is present, prefer to use it, `bind_addr` otherwise.
///
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
pub fn resolve_addr(bind_addr: &str, hostname_addr: Option<&str>) -> String {
match hostname_addr {
Some(hostname_addr) => {
// it has port configured
if hostname_addr.contains(':') {
hostname_addr.to_string()
} else {
// otherwise, resolve port from bind_addr
// should be safe to unwrap here because bind_addr is already validated
let port = bind_addr.split(':').nth(1).unwrap();
format!("{hostname_addr}:{port}")
}
}
None => bind_addr.to_string(),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
mod tests {
#[test]
fn test_resolve_addr() {
assert_eq!(
"tomcat:3001",
super::resolve_addr("127.0.0.1:3001", Some("tomcat"))
);

assert_eq!(
"tomcat:3002",
super::resolve_addr("127.0.0.1:3001", Some("tomcat:3002"))
);

assert_eq!(
"127.0.0.1:3001",
super::resolve_addr("127.0.0.1:3001", None)
);
}
}
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use datatypes::schema::Schema;
use query::plan::LogicalPlan;

pub mod addrs;
pub mod configurator;
pub mod error;
pub mod export_metrics;
Expand Down