Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): Meta node should be identified by its meta_endpoint parameter for cluster membership #7527

Merged
merged 5 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ pub struct MetaNodeOpts {
#[clap(long, default_value = "127.0.0.1:5690")]
listen_addr: String,

/// The endpoint for this meta node, which also serves as its unique identifier in cluster
/// membership and leader election.
#[clap(long)]
host: Option<String>,

#[clap(long)]
endpoint: Option<String>,
meta_endpoint: Option<String>,

#[clap(long)]
dashboard_host: Option<String>,
Expand Down Expand Up @@ -133,24 +132,30 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let config = load_config(&opts.config_path);
tracing::info!("Starting meta node with config {:?}", config);
tracing::info!("Starting meta node with options {:?}", opts);
let meta_addr = opts.host.unwrap_or_else(|| opts.listen_addr.clone());
let endpoint = opts.endpoint.unwrap_or_else(|| opts.listen_addr.clone());
let listen_addr = opts.listen_addr.parse().unwrap();
let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap());
let prometheus_addr = opts.prometheus_host.map(|x| x.parse().unwrap());
let backend = match opts.backend {
Backend::Etcd => MetaStoreBackend::Etcd {
endpoints: opts
.etcd_endpoints
.split(',')
.map(|x| x.to_string())
.collect(),
credentials: match opts.etcd_auth {
true => Some((opts.etcd_username, opts.etcd_password)),
false => None,
let (meta_endpoint, backend) = match opts.backend {
Backend::Etcd => (
opts.meta_endpoint
.expect("meta_endpoint must be specified when using etcd"),
MetaStoreBackend::Etcd {
endpoints: opts
.etcd_endpoints
.split(',')
.map(|x| x.to_string())
.collect(),
credentials: match opts.etcd_auth {
true => Some((opts.etcd_username, opts.etcd_password)),
false => None,
},
},
},
Backend::Mem => MetaStoreBackend::Mem,
),
Backend::Mem => (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still allow defaulting to listen_addr for no etcd? Anw, leader service should be useless in this case but...

opts.meta_endpoint
.unwrap_or_else(|| opts.listen_addr.clone()),
MetaStoreBackend::Mem,
),
};

let max_heartbeat_interval =
Expand All @@ -162,8 +167,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {

tracing::info!("Meta server listening at {}", listen_addr);
let add_info = AddressInfo {
endpoint,
addr: meta_addr,
meta_endpoint,
listen_addr,
prometheus_addr,
dashboard_addr,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/follower_svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn start_follower_srv(
let leader_srv = LeaderServiceImpl::new(
election_client,
MetaLeaderInfo {
node_address: address_info.listen_addr.to_string(),
node_address: address_info.meta_endpoint.to_string(),
lease_id: 0,
},
);
Expand Down
28 changes: 16 additions & 12 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ pub enum MetaStoreBackend {

#[derive(Clone)]
pub struct AddressInfo {
pub endpoint: String,
pub addr: String,
pub meta_endpoint: String,
pub listen_addr: SocketAddr,
pub prometheus_addr: Option<SocketAddr>,
pub dashboard_addr: Option<SocketAddr>,
Expand All @@ -52,8 +51,7 @@ pub struct AddressInfo {
impl Default for AddressInfo {
fn default() -> Self {
Self {
endpoint: "".to_string(),
addr: "127.0.0.1:0000".to_string(),
meta_endpoint: "".to_string(),
listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()),
prometheus_addr: None,
dashboard_addr: None,
Expand Down Expand Up @@ -91,8 +89,12 @@ pub async fn rpc_serve(
let meta_store = Arc::new(EtcdMetaStore::new(client));

let election_client = Arc::new(
EtcdElectionClient::new(endpoints, Some(options), address_info.endpoint.clone())
.await?,
EtcdElectionClient::new(
endpoints,
Some(options),
address_info.meta_endpoint.clone(),
)
.await?,
);

rpc_serve_with_store(
Expand Down Expand Up @@ -149,10 +151,12 @@ pub async fn rpc_serve_with_store<S: MetaStore>(

let join_handle = tokio::spawn(async move {
if let Some(election_client) = election_client.clone() {
let mut state_watcher = election_client.subscribe();
let mut is_leader_watcher = election_client.subscribe();
let svc_shutdown_rx_clone = svc_shutdown_rx.clone();
let (follower_shutdown_tx, follower_shutdown_rx) = OneChannel::<()>();
let follower_handle: Option<JoinHandle<()>> = if !*state_watcher.borrow() {

// If not the leader, spawn a follower.
let follower_handle: Option<JoinHandle<()>> = if !*is_leader_watcher.borrow() {
let address_info_clone = address_info.clone();

let election_client_ = election_client.clone();
Expand All @@ -170,9 +174,9 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
None
};

while !*state_watcher.borrow_and_update() {
if let Err(e) = state_watcher.changed().await {
tracing::error!("state watcher recv failed {}", e.to_string());
while !*is_leader_watcher.borrow_and_update() {
if let Err(e) = is_leader_watcher.changed().await {
tracing::error!("leader watcher recv failed {}", e.to_string());
}
}

Expand All @@ -186,7 +190,7 @@ pub async fn rpc_serve_with_store<S: MetaStore>(
election_client.leader().await.unwrap().unwrap().into()
} else {
MetaLeaderInfo {
node_address: address_info.listen_addr.clone().to_string(),
node_address: address_info.meta_endpoint.clone().to_string(),
lease_id: 0,
}
};
Expand Down
18 changes: 12 additions & 6 deletions src/meta/src/rpc/service/leader_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ use crate::rpc::server::ElectionClientRef;
#[derive(Clone)]
pub struct LeaderServiceImpl {
election_client: Option<ElectionClientRef>,
current_leader: MetaLeaderInfo,
default_self_as_leader: MetaLeaderInfo,
}

impl LeaderServiceImpl {
pub fn new(election_client: Option<ElectionClientRef>, current_leader: MetaLeaderInfo) -> Self {
pub fn new(
election_client: Option<ElectionClientRef>,
default_self_as_leader: MetaLeaderInfo,
) -> Self {
LeaderServiceImpl {
election_client,
current_leader,
default_self_as_leader,
}
}
}
Expand All @@ -48,7 +51,7 @@ impl LeaderService for LeaderServiceImpl {
_request: Request<LeaderRequest>,
) -> Result<Response<LeaderResponse>, Status> {
let leader = match self.election_client.borrow() {
None => Ok(Some(self.current_leader.clone())),
None => Ok(Some(self.default_self_as_leader.clone())),
Some(client) => client.leader().await.map(|member| member.map(Into::into)),
}?;

Expand Down Expand Up @@ -83,13 +86,16 @@ impl LeaderService for LeaderServiceImpl {

members
} else {
let host_addr = self.current_leader.node_address.parse::<HostAddr>()?;
let host_addr = self
.default_self_as_leader
.node_address
.parse::<HostAddr>()?;
vec![Member {
member_addr: Some(HostAddress {
host: host_addr.host,
port: host_addr.port.into(),
}),
lease_id: self.current_leader.lease_id as i64,
lease_id: self.default_self_as_leader.lease_id as i64,
}]
};

Expand Down
2 changes: 2 additions & 0 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ impl Cluster {
&conf.config_path,
"--listen-addr",
"0.0.0.0:5690",
"--meta_endpoint",
"192.168.1.1:5690",
"--backend",
"etcd",
"--etcd-endpoints",
Expand Down