Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: metasrv cannot be cloned #4834

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use crate::{error, Result};

#[derive(Clone)]
pub struct MetasrvInstance {
metasrv: Metasrv,
metasrv: Arc<Metasrv>,

httpsrv: Arc<HttpServer>,

Expand All @@ -83,8 +83,9 @@ impl MetasrvInstance {
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(Arc::new(metasrv.clone()));
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
Expand Down Expand Up @@ -178,13 +179,13 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}

pub fn router(metasrv: Metasrv) -> Router {
pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(metasrv.clone()))
.add_service(StoreServer::new(metasrv.clone()))
.add_service(ClusterServer::new(metasrv.clone()))
.add_service(ProcedureServiceServer::new(metasrv.clone()))
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
}

Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ impl Mailbox for HeartbeatMailbox {
}

/// The builder to build the group of heartbeat handlers.
#[derive(Clone)]
pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region failure.
region_failure_handler: Option<RegionFailureHandler>,
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/failure_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};

#[derive(Clone)]
pub struct RegionFailureHandler {
heartbeat_acceptor: HeartbeatAcceptor,
}
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::metasrv::Context;
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;

#[derive(Clone)]
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,
Expand Down
35 changes: 16 additions & 19 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub mod builder;

use std::fmt::Display;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use clap::ValueEnum;
Expand Down Expand Up @@ -337,7 +337,6 @@ impl MetaStateHandler {
}
}

#[derive(Clone)]
pub struct Metasrv {
state: StateRef,
started: Arc<AtomicBool>,
Expand All @@ -353,8 +352,8 @@ pub struct Metasrv {
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: Option<HeartbeatHandlerGroupRef>,
handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
election: Option<ElectionRef>,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
Expand All @@ -371,15 +370,7 @@ pub struct Metasrv {
}

impl Metasrv {
pub async fn try_start(&mut self) -> Result<()> {
let builder = self
.handler_group_builder
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
self.handler_group = Some(Arc::new(builder.build()?));

pub async fn try_start(&self) -> Result<()> {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
Expand All @@ -389,6 +380,16 @@ impl Metasrv {
return Ok(());
}

let handler_group_builder =
self.handler_group_builder
.lock()
.unwrap()
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
*self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));

// Creates default schema if not exists
self.table_metadata_manager
.init()
Expand Down Expand Up @@ -567,12 +568,8 @@ impl Metasrv {
&self.flow_selector
}

pub fn handler_group(&self) -> &Option<HeartbeatHandlerGroupRef> {
&self.handler_group
}

pub fn handler_group_builder(&mut self) -> &mut Option<HeartbeatHandlerGroupBuilder> {
&mut self.handler_group_builder
pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
self.handler_group.read().unwrap().clone()
}

pub fn election(&self) -> Option<&ElectionRef> {
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;

use client::client_manager::NodeClients;
Expand Down Expand Up @@ -371,8 +371,8 @@ impl MetasrvBuilder {
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group: None,
handler_group_builder: Some(handler_group_builder),
handler_group: RwLock::new(None),
handler_group_builder: Mutex::new(Some(handler_group_builder)),
election,
procedure_manager,
mailbox,
Expand Down
11 changes: 6 additions & 5 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
}

pub async fn mock_with_memstore() -> MockInfo {
Expand Down Expand Up @@ -74,16 +74,17 @@ pub async fn mock(
None => builder,
};

let mut metasrv = builder.build().await.unwrap();
let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();

let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(service.clone()))
.add_service(StoreServer::new(service.clone()))
.add_service(ProcedureServiceServer::new(service.clone()))
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tonic::server::NamedService;

use crate::metasrv::Metasrv;

pub fn make_admin_service(metasrv: Metasrv) -> Admin {
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);

let router = router.route(
Expand Down
9 changes: 3 additions & 6 deletions src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,9 @@ impl heartbeat_server::Heartbeat for Metasrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self
.handler_group()
.clone()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;
let handler_group = self.handler_group().context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;

let ctx = self.new_ctx();
let _handle = common_runtime::spawn_global(async move {
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub struct GreptimeDbCluster {

pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
pub frontend: Arc<FeInstance>,
}

Expand Down