diff --git a/Cargo.lock b/Cargo.lock index 9d5a9a6ab2a4..594cf82088bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3955,10 +3955,12 @@ dependencies = [ "catalog", "common-base", "common-catalog", + "common-config", "common-decimal", "common-error", "common-frontend", "common-function", + "common-grpc", "common-macro", "common-meta", "common-query", diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index ac5a62a46546..d5a35c6837e8 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -17,7 +17,7 @@ use clap::{Parser, Subcommand}; use cmd::error::Result; use cmd::options::GlobalOptions; -use cmd::{cli, datanode, frontend, metasrv, standalone, App}; +use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App}; use common_version::version; #[derive(Parser)] @@ -37,6 +37,10 @@ enum SubCommand { #[clap(name = "datanode")] Datanode(datanode::Command), + /// Start flownode service. + #[clap(name = "flownode")] + Flownode(flownode::Command), + /// Start frontend service. #[clap(name = "frontend")] Frontend(frontend::Command), @@ -72,6 +76,12 @@ async fn start(cli: Command) -> Result<()> { .run() .await } + SubCommand::Flownode(cmd) => { + cmd.build(cmd.load_options(&cli.global_options)?) + .await? + .run() + .await + } SubCommand::Frontend(cmd) => { cmd.build(cmd.load_options(&cli.global_options)?) .await? diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index fa5371545fcb..d11e2421435c 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -87,6 +87,20 @@ pub enum Error { source: datanode::error::Error, }, + #[snafu(display("Failed to start flownode"))] + StartFlownode { + #[snafu(implicit)] + location: Location, + source: flow::Error, + }, + + #[snafu(display("Failed to shutdown flownode"))] + ShutdownFlownode { + #[snafu(implicit)] + location: Location, + source: flow::Error, + }, + #[snafu(display("Failed to start frontend"))] StartFrontend { #[snafu(implicit)] @@ -380,6 +394,9 @@ impl ErrorExt for Error { Error::BuildRuntime { source, .. } => source.status_code(), Error::CacheRequired { .. } | Error::BuildCacheRegistry { .. } => StatusCode::Internal, + Self::StartFlownode { source, .. } | Self::ShutdownFlownode { source, .. } => { + source.status_code() + } } } diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs new file mode 100644 index 000000000000..9c7cd6695d59 --- /dev/null +++ b/src/cmd/src/flownode.rs @@ -0,0 +1,301 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use clap::Parser; +use common_base::Plugins; +use common_config::Configurable; +use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::TableMetadataManager; +use common_telemetry::info; +use common_telemetry::logging::TracingOptions; +use common_version::{short_version, version}; +use flow::{FlownodeBuilder, FlownodeInstance}; +use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; +use meta_client::MetaClientOptions; +use servers::Mode; +use snafu::{OptionExt, ResultExt}; +use tracing_appender::non_blocking::WorkerGuard; + +use crate::error::{ + BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, + ShutdownFlownodeSnafu, StartFlownodeSnafu, +}; +use crate::options::{GlobalOptions, GreptimeOptions}; +use crate::{log_versions, App}; + +pub const APP_NAME: &str = "greptime-flownode"; + +type FlownodeOptions = GreptimeOptions; + +pub struct Instance { + flownode: FlownodeInstance, + + // Keep the logging guard to prevent the worker from being dropped. + _guard: Vec, +} + +impl Instance { + pub fn new(flownode: FlownodeInstance, guard: Vec) -> Self { + Self { + flownode, + _guard: guard, + } + } + + pub fn flownode_mut(&mut self) -> &mut FlownodeInstance { + &mut self.flownode + } + + pub fn flownode(&self) -> &FlownodeInstance { + &self.flownode + } +} + +#[async_trait::async_trait] +impl App for Instance { + fn name(&self) -> &str { + APP_NAME + } + + async fn start(&mut self) -> Result<()> { + self.flownode.start().await.context(StartFlownodeSnafu) + } + + async fn stop(&self) -> Result<()> { + self.flownode + .shutdown() + .await + .context(ShutdownFlownodeSnafu) + } +} + +#[derive(Parser)] +pub struct Command { + #[clap(subcommand)] + subcmd: SubCommand, +} + +impl Command { + pub async fn build(&self, opts: FlownodeOptions) -> Result { + self.subcmd.build(opts).await + } + + pub fn load_options(&self, global_options: &GlobalOptions) -> Result { + match &self.subcmd { + SubCommand::Start(cmd) => cmd.load_options(global_options), + } + } +} + +#[derive(Parser)] +enum SubCommand { + Start(StartCommand), +} + +impl SubCommand { + async fn build(&self, opts: FlownodeOptions) -> Result { + match self { + SubCommand::Start(cmd) => cmd.build(opts).await, + } + } +} + +#[derive(Debug, Parser, Default)] +struct StartCommand { + #[clap(long)] + node_id: Option, + #[clap(long)] + rpc_addr: Option, + #[clap(long)] + rpc_hostname: Option, + #[clap(long, value_delimiter = ',', num_args = 1..)] + metasrv_addrs: Option>, + #[clap(short, long)] + config_file: Option, + #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")] + env_prefix: String, +} + +impl StartCommand { + fn load_options(&self, global_options: &GlobalOptions) -> Result { + let mut opts = FlownodeOptions::load_layered_options( + self.config_file.as_deref(), + self.env_prefix.as_ref(), + ) + .context(LoadLayeredConfigSnafu)?; + + self.merge_with_cli_options(global_options, &mut opts)?; + + Ok(opts) + } + + // The precedence order is: cli > config file > environment variables > default values. + fn merge_with_cli_options( + &self, + global_options: &GlobalOptions, + opts: &mut FlownodeOptions, + ) -> Result<()> { + let opts = &mut opts.component; + + if let Some(dir) = &global_options.log_dir { + opts.logging.dir.clone_from(dir); + } + + if global_options.log_level.is_some() { + opts.logging.level.clone_from(&global_options.log_level); + } + + opts.tracing = TracingOptions { + #[cfg(feature = "tokio-console")] + tokio_console_addr: global_options.tokio_console_addr.clone(), + }; + + if let Some(addr) = &self.rpc_addr { + opts.grpc.addr.clone_from(addr); + } + + if let Some(hostname) = &self.rpc_hostname { + opts.grpc.hostname.clone_from(hostname); + } + + if let Some(node_id) = self.node_id { + opts.node_id = Some(node_id); + } + + if let Some(metasrv_addrs) = &self.metasrv_addrs { + opts.meta_client + .get_or_insert_with(MetaClientOptions::default) + .metasrv_addrs + .clone_from(metasrv_addrs); + opts.mode = Mode::Distributed; + } + + if let (Mode::Distributed, None) = (&opts.mode, &opts.node_id) { + return MissingConfigSnafu { + msg: "Missing node id option", + } + .fail(); + } + + Ok(()) + } + + async fn build(&self, opts: FlownodeOptions) -> Result { + common_runtime::init_global_runtimes(&opts.runtime); + + let guard = common_telemetry::init_global_logging( + APP_NAME, + &opts.component.logging, + &opts.component.tracing, + opts.component.node_id.map(|x| x.to_string()), + ); + log_versions(version!(), short_version!()); + + info!("Flownode start command: {:#?}", self); + info!("Flownode options: {:#?}", opts); + + let opts = opts.component; + + let cluster_id = opts.cluster_id.context(MissingConfigSnafu { + msg: "'cluster_id'", + })?; + + let node_id = opts + .node_id + .context(MissingConfigSnafu { msg: "'node_id'" })?; + + let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu { + msg: "'meta_client_options'", + })?; + + let meta_client = Arc::new( + flow::heartbeat::new_metasrv_client(cluster_id, node_id, meta_config) + .await + .context(StartFlownodeSnafu)?, + ); + + let cache_max_capacity = meta_config.metadata_cache_max_capacity; + let cache_ttl = meta_config.metadata_cache_ttl; + let cache_tti = meta_config.metadata_cache_tti; + + // TODO(discord9): add helper function to ease the creation of cache registry&such + let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); + let cached_meta_backend = Arc::new(cached_meta_backend); + + // Builds cache registry + let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( + CacheRegistryBuilder::default() + .add_cache(cached_meta_backend.clone()) + .build(), + ); + let fundamental_cache_registry = + build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let layered_cache_registry = Arc::new( + with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(BuildCacheRegistrySnafu)? + .build(), + ); + + let catalog_manager = KvBackendCatalogManager::new( + opts.mode, + Some(meta_client.clone()), + cached_meta_backend.clone(), + layered_cache_registry.clone(), + ); + + let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend)); + table_metadata_manager + .init() + .await + .context(InitMetadataSnafu)?; + + let executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler), + Arc::new(InvalidateTableCacheHandler::new( + layered_cache_registry.clone(), + )), + ]); + + let heartbeat_task = flow::heartbeat::HeartbeatTask::new( + &opts, + meta_client.clone(), + opts.heartbeat.clone(), + Arc::new(executor), + ); + + let flownode_builder = FlownodeBuilder::new( + opts, + Plugins::new(), + table_metadata_manager, + catalog_manager, + ) + .with_heartbeat_task(heartbeat_task); + + let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; + + Ok(Instance::new(flownode, guard)) + } +} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 31f4a108481b..3081f4b75f77 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -283,6 +283,7 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + // TODO(discord9): add helper function to ease the creation of cache registry&such let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) .cache_max_capacity(cache_max_capacity) .cache_ttl(cache_ttl) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index f9f9e699650b..85b848c62835 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -22,6 +22,7 @@ use crate::error::Result; pub mod cli; pub mod datanode; pub mod error; +pub mod flownode; pub mod frontend; pub mod metasrv; pub mod options; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 316aa6db7f25..75be49a6233d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -21,6 +21,7 @@ use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, Configurable, KvBackendConfig}; +use common_error::ext::BoxedError; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; @@ -65,9 +66,9 @@ use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, - InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, Result, ShutdownDatanodeSnafu, - ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, - StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, + InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result, + ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, + StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{log_versions, App}; @@ -448,13 +449,18 @@ impl StartCommand { Self::create_table_metadata_manager(kv_backend.clone()).await?; let flow_builder = FlownodeBuilder::new( - 1, Default::default(), fe_plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), ); - let flownode = Arc::new(flow_builder.build().await); + let flownode = Arc::new( + flow_builder + .build() + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ); let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone()) .with_kv_backend(kv_backend.clone()) @@ -464,7 +470,7 @@ impl StartCommand { let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), - flow_server: flownode.clone(), + flow_server: flownode.flow_worker_manager(), }); let table_id_sequence = Arc::new( @@ -516,11 +522,12 @@ impl StartCommand { .context(StartFrontendSnafu)?; // flow server need to be able to use frontend to write insert requests back - flownode + let flow_worker_manager = flownode.flow_worker_manager(); + flow_worker_manager .set_frontend_invoker(Box::new(frontend.clone())) .await; // TODO(discord9): unify with adding `start` and `shutdown` method to flownode too. - let _handle = flownode.clone().run_background(); + let _handle = flow_worker_manager.run_background(); let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index ebf276069502..395f9cf07510 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -15,10 +15,12 @@ async-trait.workspace = true bytes.workspace = true catalog.workspace = true common-base.workspace = true +common-config.workspace = true common-decimal.workspace = true common-error.workspace = true common-frontend.workspace = true common-function.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-meta.workspace = true common-query.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 373f1fb35558..c9d1ca570a76 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -21,40 +21,41 @@ use std::sync::Arc; use std::time::{Instant, SystemTime}; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; -use catalog::CatalogManagerRef; -use common_base::Plugins; +use common_config::Configurable; use common_error::ext::BoxedError; use common_frontend::handler::FrontendInvoker; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; +use common_telemetry::logging::{LoggingOptions, TracingOptions}; use common_telemetry::{debug, info}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; use itertools::Itertools; -use query::{QueryEngine, QueryEngineFactory}; +use meta_client::MetaClientOptions; +use query::QueryEngine; use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; +use servers::heartbeat_options::HeartbeatOptions; +use servers::Mode; use session::context::QueryContext; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ConcreteDataType, RegionId}; use table::metadata::TableId; -use tokio::sync::{oneshot, watch, Mutex, RwLock}; +use tokio::sync::{watch, Mutex, RwLock}; -use crate::adapter::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; pub(crate) use crate::adapter::node_context::FlownodeContext; use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; +use crate::error::{ExternalSnafu, InternalSnafu, TableNotFoundSnafu, UnexpectedSnafu}; use crate::expr::GlobalId; use crate::repr::{self, DiffRow, Row}; -use crate::transform::{register_function_to_query_engine, sql_to_flow_plan}; +use crate::transform::sql_to_flow_plan; -pub(crate) mod error; mod flownode_impl; mod parse_expr; -mod server; #[cfg(test)] mod tests; mod util; @@ -63,7 +64,7 @@ mod worker; pub(crate) mod node_context; mod table_source; -use error::Error; +use crate::error::Error; // TODO(discord9): replace this with `GREPTIME_TIMESTAMP` before v0.9 pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; @@ -76,79 +77,43 @@ pub type FlowId = u64; pub type TableName = [String; 3]; /// Options for flow node -#[derive(Clone, Default, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct FlownodeOptions { + pub mode: Mode, + pub cluster_id: Option, pub node_id: Option, pub grpc: GrpcOptions, + pub meta_client: Option, + pub logging: LoggingOptions, + pub tracing: TracingOptions, + pub heartbeat: HeartbeatOptions, } -/// Flownode Builder -pub struct FlownodeBuilder { - flow_node_id: u32, - opts: FlownodeOptions, - plugins: Plugins, - table_meta: TableMetadataManagerRef, - catalog_manager: CatalogManagerRef, -} - -impl FlownodeBuilder { - /// init flownode builder - pub fn new( - flow_node_id: u32, - opts: FlownodeOptions, - plugins: Plugins, - table_meta: TableMetadataManagerRef, - catalog_manager: CatalogManagerRef, - ) -> Self { +impl Default for FlownodeOptions { + fn default() -> Self { Self { - flow_node_id, - opts, - plugins, - table_meta, - catalog_manager, + mode: servers::Mode::Standalone, + cluster_id: None, + node_id: None, + grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"), + meta_client: None, + logging: LoggingOptions::default(), + tracing: TracingOptions::default(), + heartbeat: HeartbeatOptions::default(), } } - - /// TODO(discord9): error handling - pub async fn build(self) -> FlownodeManager { - let query_engine_factory = QueryEngineFactory::new_with_plugins( - // query engine in flownode only translate plan with resolved table source. - self.catalog_manager.clone(), - None, - None, - None, - false, - self.plugins.clone(), - ); - let query_engine = query_engine_factory.query_engine(); - - register_function_to_query_engine(&query_engine); - - let (tx, rx) = oneshot::channel(); - - let node_id = Some(self.flow_node_id); - - let _handle = std::thread::spawn(move || { - let (flow_node_manager, mut worker) = - FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone()); - let _ = tx.send(flow_node_manager); - info!("Flow Worker started in new thread"); - worker.run(); - }); - let man = rx.await.unwrap(); - info!("Flow Node Manager started"); - man - } } +impl Configurable for FlownodeOptions {} + /// Arc-ed FlowNodeManager, cheaper to clone -pub type FlownodeManagerRef = Arc; +pub type FlowWorkerManagerRef = Arc; /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// /// The choice of timestamp is just using current system timestamp for now -pub struct FlownodeManager { +pub struct FlowWorkerManager { /// The handler to the worker that will run the dataflow /// which is `!Send` so a handle is used pub worker_handles: Vec>, @@ -166,7 +131,7 @@ pub struct FlownodeManager { } /// Building FlownodeManager -impl FlownodeManager { +impl FlowWorkerManager { /// set frontend invoker pub async fn set_frontend_invoker( self: &Arc, @@ -188,7 +153,7 @@ impl FlownodeManager { let node_context = FlownodeContext::default(); let tick_manager = FlowTickManager::new(); let worker_handles = Vec::new(); - FlownodeManager { + FlowWorkerManager { worker_handles, query_engine, table_info_source: srv_map, @@ -248,7 +213,7 @@ pub fn diff_row_to_request(rows: Vec) -> Vec { } /// This impl block contains methods to send writeback requests to frontend -impl FlownodeManager { +impl FlowWorkerManager { /// TODO(discord9): merge all same type of diff row into one requests /// /// Return the number of requests it made @@ -494,7 +459,7 @@ impl FlownodeManager { } /// Flow Runtime related methods -impl FlownodeManager { +impl FlowWorkerManager { /// run in common_runtime background runtime pub fn run_background(self: Arc) -> JoinHandle<()> { info!("Starting flownode manager's background task"); @@ -604,7 +569,7 @@ impl FlownodeManager { } /// Create&Remove flow -impl FlownodeManager { +impl FlowWorkerManager { /// remove a flow by it's id pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index e337e96a0ab3..f780745f621e 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -26,11 +26,11 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use crate::adapter::error::InternalSnafu; -use crate::adapter::FlownodeManager; +use crate::adapter::FlowWorkerManager; +use crate::error::InternalSnafu; use crate::repr::{self, DiffRow}; -fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { +fn to_meta_err(err: crate::error::Error) -> common_meta::error::Error { // TODO(discord9): refactor this Err::<(), _>(BoxedError::new(err)) .with_context(|_| ExternalSnafu) @@ -38,7 +38,7 @@ fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { } #[async_trait::async_trait] -impl Flownode for FlownodeManager { +impl Flownode for FlowWorkerManager { async fn handle(&self, request: FlowRequest) -> Result { let query_ctx = request .header diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 40c5169f5ed8..e8defc7652a6 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -23,8 +23,8 @@ use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use tokio::sync::{broadcast, mpsc, RwLock}; -use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; +use crate::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP}; @@ -317,7 +317,6 @@ impl FlownodeContext { /// Assign a schema to a table /// - /// TODO(discord9): error handling pub fn assign_table_schema( &mut self, table_name: &TableName, @@ -327,7 +326,10 @@ impl FlownodeContext { .table_repr .get_by_name(table_name) .map(|(_, gid)| gid) - .unwrap(); + .context(TableNotFoundSnafu { + name: format!("Table not found: {:?} in flownode cache", table_name), + })?; + self.schema.insert(gid, schema); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 24cf05c4b649..0454ab16b1d3 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -20,10 +20,10 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use crate::adapter::error::{ +use crate::adapter::TableName; +use crate::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; -use crate::adapter::TableName; use crate::repr::{self, ColumnType, RelationDesc, RelationType}; /// mapping of table name <-> table id should be query from tableinfo manager diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 1946d4265d3f..0a23a86167aa 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -19,7 +19,7 @@ use datatypes::schema::ColumnSchema; use itertools::Itertools; use snafu::ResultExt; -use crate::adapter::error::{Error, ExternalSnafu}; +use crate::error::{Error, ExternalSnafu}; /// convert `ColumnSchema` lists to it's corresponding proto type pub fn column_schemas_to_proto( diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index f69a396cda27..e5819a7f0437 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -24,9 +24,9 @@ use hydroflow::scheduled::graph::Hydroflow; use snafu::ensure; use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; -use crate::adapter::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu}; use crate::adapter::FlowId; use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::error::{Error, FlowAlreadyExistSnafu, InternalSnafu, UnexpectedSnafu}; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::{self, DiffRow}; diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index 4a4704d28e57..618f9654257d 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -32,9 +32,9 @@ use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; use super::state::Scheduler; -use crate::adapter::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu}; use crate::compute::state::DataflowState; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::error::{Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu}; use crate::expr::error::{DataTypeSnafu, InternalSnafu}; use crate::expr::{ self, EvalError, GlobalId, LocalId, MapFilterProject, MfpPlan, SafeMfpPlan, ScalarExpr, diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 2a6b49cb7c75..d2278dc3b358 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -19,10 +19,10 @@ use hydroflow::scheduled::port::{PortCtx, SEND}; use itertools::Itertools; use snafu::OptionExt; -use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::error::{Error, PlanSnafu}; use crate::expr::{EvalError, MapFilterProject, MfpPlan, ScalarExpr}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, DiffRow, KeyValDiffRow, Row}; diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index b55ed58f900a..d44c290d9474 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -22,10 +22,10 @@ use hydroflow::scheduled::port::{PortCtx, SEND}; use itertools::Itertools; use snafu::{ensure, OptionExt, ResultExt}; -use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::{Context, SubgraphArg}; use crate::compute::state::Scheduler; use crate::compute::types::{Arranged, Collection, CollectionBundle, ErrCollector, Toff}; +use crate::error::{Error, PlanSnafu}; use crate::expr::error::{DataAlreadyExpiredSnafu, DataTypeSnafu, InternalSnafu}; use crate::expr::{AggregateExpr, EvalError, ScalarExpr}; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 2b9fd5e601f0..fd757852ca70 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -23,9 +23,9 @@ use snafu::OptionExt; use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast, mpsc}; -use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; +use crate::error::{Error, PlanSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{EvalError, GlobalId}; use crate::repr::{DiffRow, Row, BROADCAST_CAP}; diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/error.rs similarity index 82% rename from src/flow/src/adapter/error.rs rename to src/flow/src/error.rs index 9d5692aa1ab4..6d84b6be05b1 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/error.rs @@ -162,6 +162,34 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to start server"))] + StartServer { + #[snafu(implicit)] + location: Location, + source: servers::error::Error, + }, + + #[snafu(display("Failed to shutdown server"))] + ShutdownServer { + #[snafu(implicit)] + location: Location, + source: servers::error::Error, + }, + + #[snafu(display("Failed to initialize meta client"))] + MetaClientInit { + #[snafu(implicit)] + location: Location, + source: meta_client::error::Error, + }, + + #[snafu(display("Failed to parse address {}", addr))] + ParseAddr { + addr: String, + #[snafu(source)] + error: std::net::AddrParseError, + }, } /// Result type for flow module @@ -184,11 +212,16 @@ impl ErrorExt for Error { | &Self::Plan { .. } | &Self::Datatypes { .. } => StatusCode::PlanQuery, Self::NoProtoType { .. } | Self::Unexpected { .. } => StatusCode::Unexpected, - &Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { + Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported } - &Self::External { .. } => StatusCode::Unknown, + Self::External { source, .. } => source.status_code(), Self::Internal { .. } => StatusCode::Internal, + Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { + source.status_code() + } + Self::MetaClientInit { source, .. } => source.status_code(), + Self::ParseAddr { .. } => StatusCode::InvalidArguments, } } diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index c30b67dbffa4..39b469207169 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; use substrait::df_logical_plan::consumer::name_to_op; -use crate::adapter::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu}; +use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu}; use crate::expr::error::{ CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu, TryFromValueSnafu, TypeMismatchSnafu, diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index b0e32c94d87b..5eaf3ebd3547 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; -use crate::adapter::error::{Error, InvalidQuerySnafu}; +use crate::error::{Error, InvalidQuerySnafu}; use crate::expr::error::EvalError; use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr}; use crate::repr::{self, value_to_internal_ts, Diff, Row}; diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index 6aa53c80ca9d..7923f8149353 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -24,7 +24,7 @@ use smallvec::smallvec; use snafu::{OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; -use crate::adapter::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; +use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu}; use crate::expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}; use crate::expr::relation::accum::{Accum, Accumulator}; use crate::expr::signature::{GenericFn, Signature}; diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 591d2c246fc1..0bf7c4dea4f0 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -32,7 +32,7 @@ use substrait::error::{DecodeRelSnafu, EncodeRelSnafu}; use substrait::substrait_proto_df::proto::expression::{RexType, ScalarFunction}; use substrait::substrait_proto_df::proto::Expression; -use crate::adapter::error::{ +use crate::error::{ DatafusionSnafu, Error, InvalidQuerySnafu, UnexpectedSnafu, UnsupportedTemporalFilterSnafu, }; use crate::expr::error::{ @@ -284,7 +284,7 @@ impl RawDfScalarFn { f.encode(&mut buf) .context(EncodeRelSnafu) .map_err(BoxedError::new) - .context(crate::adapter::error::ExternalSnafu)?; + .context(crate::error::ExternalSnafu)?; Ok(Self { f: buf, input_schema, @@ -295,7 +295,7 @@ impl RawDfScalarFn { let f = ScalarFunction::decode(&mut self.f.as_ref()) .context(DecodeRelSnafu) .map_err(BoxedError::new) - .context(crate::adapter::error::ExternalSnafu)?; + .context(crate::error::ExternalSnafu)?; let input_schema = &self.input_schema; let extensions = &self.extensions; @@ -371,7 +371,7 @@ impl ScalarExpr { })?; let typ = ConcreteDataType::try_from(&arrow_typ) .map_err(BoxedError::new) - .context(crate::adapter::error::ExternalSnafu)?; + .context(crate::error::ExternalSnafu)?; Ok(ColumnType::new_nullable(typ)) } } diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index ed3fe66a8651..339f53520d0d 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -14,25 +14,29 @@ //! Send heartbeat from flownode to metasrv +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Peer}; use common_error::ext::BoxedError; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ - HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, + HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; -use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient, MetaClientBuilder}; +use meta_client::MetaClientOptions; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; -use crate::adapter::error::ExternalSnafu; +use crate::error::{ExternalSnafu, MetaClientInitSnafu}; use crate::{Error, FlownodeOptions}; /// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background. @@ -45,6 +49,7 @@ pub struct HeartbeatTask { retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, + running: Arc, } impl HeartbeatTask { @@ -62,10 +67,19 @@ impl HeartbeatTask { retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, + running: Arc::new(AtomicBool::new(false)), } } pub async fn start(&self) -> Result<(), Error> { + if self + .running + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + warn!("Heartbeat task started multiple times"); + return Ok(()); + } info!("Start to establish the heartbeat connection to metasrv."); let (req_sender, resp_stream) = self .meta_client @@ -86,6 +100,17 @@ impl HeartbeatTask { Ok(()) } + pub fn shutdown(&self) { + info!("Close heartbeat task for flownode"); + if self + .running + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + warn!("Call close heartbeat task multiple times"); + } + } + fn create_heartbeat_request( message: Option, peer: Option, @@ -208,3 +233,38 @@ impl HeartbeatTask { } } } + +/// Create metasrv client instance and spawn heartbeat loop. +pub async fn new_metasrv_client( + cluster_id: u64, + node_id: u64, + meta_config: &MetaClientOptions, +) -> Result { + let member_id = node_id; + let config = ChannelConfig::new() + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout) + .tcp_nodelay(meta_config.tcp_nodelay); + let channel_manager = ChannelManager::with_config(config.clone()); + let heartbeat_channel_manager = ChannelManager::with_config( + config + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout), + ); + + let mut meta_client = MetaClientBuilder::flownode_default_options(cluster_id, member_id) + .channel_manager(channel_manager) + .heartbeat_channel_manager(heartbeat_channel_manager) + .build(); + meta_client + .start(&meta_config.metasrv_addrs) + .await + .context(MetaClientInitSnafu)?; + + // required only when the heartbeat_client is enabled + meta_client + .ask_leader() + .await + .context(MetaClientInitSnafu)?; + Ok(meta_client) +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 606ae65144ed..636a722b04c2 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -25,12 +25,15 @@ // allow unused for now because it should be use later mod adapter; mod compute; +mod error; mod expr; -mod heartbeat; +pub mod heartbeat; mod plan; mod repr; +mod server; mod transform; mod utils; -pub use adapter::error::{Error, Result}; -pub use adapter::{FlownodeBuilder, FlownodeManager, FlownodeManagerRef, FlownodeOptions}; +pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; +pub use error::{Error, Result}; +pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer}; diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 95816b17cb03..c31ddb652e3b 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -23,7 +23,7 @@ use std::collections::BTreeSet; use datatypes::arrow::ipc::Map; use serde::{Deserialize, Serialize}; -use crate::adapter::error::Error; +use crate::error::Error; use crate::expr::{ AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr, diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 43947dc47236..382c7a63c3df 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -21,9 +21,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::adapter::error::{ - DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu, -}; +use crate::error::{DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu}; use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr}; /// a set of column indices that are "keys" for the collection. diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/server.rs similarity index 50% rename from src/flow/src/adapter/server.rs rename to src/flow/src/server.rs index c0d0854572c7..166f6b5f5f63 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/server.rs @@ -15,26 +15,48 @@ //! Implementation of grpc service for flow node use std::net::SocketAddr; +use std::sync::Arc; +use catalog::CatalogManagerRef; +use common_base::Plugins; +use common_meta::ddl::table_meta; +use common_meta::heartbeat::handler::HandlerGroupExecutor; +use common_meta::key::TableMetadataManagerRef; +use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::Flownode; use common_telemetry::tracing::info; use futures::FutureExt; use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; +use meta_client::client::MetaClient; +use query::QueryEngineFactory; +use serde::de::Unexpected; use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; +use servers::heartbeat_options::HeartbeatOptions; +use servers::server::Server; use snafu::{ensure, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{oneshot, Mutex}; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; -use crate::adapter::FlownodeManagerRef; +use crate::adapter::FlowWorkerManagerRef; +use crate::error::{ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu}; +use crate::heartbeat::HeartbeatTask; +use crate::transform::register_function_to_query_engine; +use crate::{Error, FlowWorkerManager, FlownodeOptions}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> #[derive(Clone)] pub struct FlowService { - pub manager: FlownodeManagerRef, + pub manager: FlowWorkerManagerRef, +} + +impl FlowService { + pub fn new(manager: FlowWorkerManagerRef) -> Self { + Self { manager } + } } #[async_trait::async_trait] @@ -82,8 +104,17 @@ impl flow_server::Flow for FlowService { } pub struct FlownodeServer { - pub shutdown_tx: Mutex>>, - pub flow_service: FlowService, + shutdown_tx: Mutex>>, + flow_service: FlowService, +} + +impl FlownodeServer { + pub fn new(flow_service: FlowService) -> Self { + Self { + flow_service, + shutdown_tx: Mutex::new(None), + } + } } impl FlownodeServer { @@ -134,7 +165,6 @@ impl servers::server::Server for FlownodeServer { .context(StartGrpcSnafu); }); - // TODO(discord9): better place for dataflow to run per second let manager_ref = self.flow_service.manager.clone(); let _handle = manager_ref.clone().run_background(); @@ -145,3 +175,126 @@ impl servers::server::Server for FlownodeServer { FLOW_NODE_SERVER_NAME } } + +/// The flownode server instance. +pub struct FlownodeInstance { + server: FlownodeServer, + addr: SocketAddr, + heartbeat_task: Option, +} + +impl FlownodeInstance { + pub async fn start(&mut self) -> Result<(), crate::Error> { + if let Some(task) = &self.heartbeat_task { + task.start().await?; + } + + self.addr = self + .server + .start(self.addr) + .await + .context(StartServerSnafu)?; + Ok(()) + } + pub async fn shutdown(&self) -> Result<(), crate::Error> { + self.server.shutdown().await.context(ShutdownServerSnafu)?; + + if let Some(task) = &self.heartbeat_task { + task.shutdown(); + } + + Ok(()) + } + + pub fn flow_worker_manager(&self) -> FlowWorkerManagerRef { + self.server.flow_service.manager.clone() + } +} + +/// [`FlownodeInstance`] Builder +pub struct FlownodeBuilder { + opts: FlownodeOptions, + plugins: Plugins, + table_meta: TableMetadataManagerRef, + catalog_manager: CatalogManagerRef, + heartbeat_task: Option, +} + +impl FlownodeBuilder { + /// init flownode builder + pub fn new( + opts: FlownodeOptions, + plugins: Plugins, + table_meta: TableMetadataManagerRef, + catalog_manager: CatalogManagerRef, + ) -> Self { + Self { + opts, + plugins, + table_meta, + catalog_manager, + heartbeat_task: None, + } + } + + pub fn with_heartbeat_task(self, heartbeat_task: HeartbeatTask) -> Self { + Self { + heartbeat_task: Some(heartbeat_task), + ..self + } + } + + pub async fn build(self) -> Result { + let manager = Arc::new(self.build_manager().await?); + let server = FlownodeServer::new(FlowService::new(manager.clone())); + + let heartbeat_task = self.heartbeat_task; + + let addr = self.opts.grpc.addr; + let instance = FlownodeInstance { + server, + addr: addr.parse().context(ParseAddrSnafu { addr })?, + heartbeat_task, + }; + Ok(instance) + } + + /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, + /// nor does it actually start running the worker. + async fn build_manager(&self) -> Result { + let catalog_manager = self.catalog_manager.clone(); + let table_meta = self.table_meta.clone(); + + let query_engine_factory = QueryEngineFactory::new_with_plugins( + // query engine in flownode is only used for translate plan with resolved table source. + catalog_manager, + None, + None, + None, + false, + self.plugins.clone(), + ); + let query_engine = query_engine_factory.query_engine(); + + register_function_to_query_engine(&query_engine); + + let (tx, rx) = oneshot::channel(); + + let node_id = self.opts.node_id.map(|id| id as u32); + let _handle = std::thread::spawn(move || { + let (flow_node_manager, mut worker) = + FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta); + let _ = tx.send(flow_node_manager); + info!("Flow Worker started in new thread"); + worker.run(); + }); + let man = rx.await.map_err(|_e| { + UnexpectedSnafu { + reason: "sender is dropped, failed to create flow node manager", + } + .build() + })?; + info!("Flow Node Manager started"); + Ok(man) + } +} diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index e86dac85fb7f..ab3fdd87c001 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -37,11 +37,11 @@ use substrait::{ use substrait_proto::proto::extensions::simple_extension_declaration::MappingType; use substrait_proto::proto::extensions::SimpleExtensionDeclaration; -use crate::adapter::error::{ +use crate::adapter::FlownodeContext; +use crate::error::{ Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; -use crate::adapter::FlownodeContext; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::RelationType; diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 6456f00a5c75..64ecc3eec506 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -45,7 +45,7 @@ use substrait_proto::proto::read_rel::ReadType; use substrait_proto::proto::rel::RelType; use substrait_proto::proto::{self, plan_rel, Expression, Plan as SubPlan, Rel}; -use crate::adapter::error::{ +use crate::error::{ DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, TableNotFoundSnafu, }; diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index a10e9b121f8c..524ab9b546cc 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -25,7 +25,7 @@ use substrait_proto::proto::expression::{IfThen, RexType, ScalarFunction}; use substrait_proto::proto::function_argument::ArgType; use substrait_proto::proto::Expression; -use crate::adapter::error::{ +use crate::error::{ DatafusionSnafu, DatatypesSnafu, Error, EvalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, }; diff --git a/src/flow/src/transform/literal.rs b/src/flow/src/transform/literal.rs index 1fa5bc86a81c..bd0f041dd825 100644 --- a/src/flow/src/transform/literal.rs +++ b/src/flow/src/transform/literal.rs @@ -26,7 +26,7 @@ use substrait_proto::proto::expression::literal::LiteralType; use substrait_proto::proto::expression::Literal; use substrait_proto::proto::r#type::Kind; -use crate::adapter::error::{Error, NotImplementedSnafu, PlanSnafu}; +use crate::error::{Error, NotImplementedSnafu, PlanSnafu}; use crate::transform::substrait_proto; /// Convert a Substrait literal into a Value and its ConcreteDataType (So that we can know type even if the value is null) diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index f1f6ba53dd35..200226fb352a 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -22,7 +22,7 @@ use substrait_proto::proto::read_rel::ReadType; use substrait_proto::proto::rel::RelType; use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel}; -use crate::adapter::error::{ +use crate::error::{ Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, }; use crate::expr::{MapFilterProject, ScalarExpr, TypedExpr, UnaryFunc}; diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 93b999635d62..d43840e73ad2 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -100,6 +100,13 @@ impl MetaClientBuilder { .enable_heartbeat() } + /// Returns the role of Flownode's default options. + pub fn flownode_default_options(cluster_id: ClusterId, member_id: u64) -> Self { + Self::new(cluster_id, member_id, Role::Flownode) + .enable_store() + .enable_heartbeat() + } + pub fn enable_heartbeat(self) -> Self { Self { enable_heartbeat: true, diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index c14a20965a02..86b11ae007db 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -57,6 +57,21 @@ impl MetasrvCacheInvalidator { .broadcast(&BroadcastChannel::Frontend, msg) .await .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + + let msg = &MailboxMessage::json_message( + subject, + &format!("Metasrv@{}", self.info.server_addr), + "Flownode broadcast", + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| meta_error::SerdeJsonSnafu)?; + + self.mailbox + .broadcast(&BroadcastChannel::Flownode, msg) + .await + .map_err(BoxedError::new) .context(meta_error::ExternalSnafu) } } diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index 97aad7427a29..6d34401d8da3 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -33,6 +33,7 @@ pub type MessageId = u64; pub enum Channel { Datanode(u64), Frontend(u64), + Flownode(u64), } impl Display for Channel { @@ -44,6 +45,9 @@ impl Display for Channel { Channel::Frontend(id) => { write!(f, "Frontend-{}", id) } + Channel::Flownode(id) => { + write!(f, "Flownode-{}", id) + } } } } @@ -53,12 +57,14 @@ impl Channel { match self { Channel::Datanode(id) => format!("{}-{}", Role::Datanode as i32, id), Channel::Frontend(id) => format!("{}-{}", Role::Frontend as i32, id), + Channel::Flownode(id) => format!("{}-{}", Role::Flownode as i32, id), } } } pub enum BroadcastChannel { Datanode, Frontend, + Flownode, } impl BroadcastChannel { @@ -70,7 +76,11 @@ impl BroadcastChannel { }, BroadcastChannel::Frontend => Range { start: format!("{}-", Role::Frontend as i32), - end: format!("{}-", Role::Frontend as i32 + 1), + end: format!("{}-", Role::Flownode as i32), + }, + BroadcastChannel::Flownode => Range { + start: format!("{}-", Role::Flownode as i32), + end: format!("{}-", Role::Flownode as i32 + 1), }, } } @@ -144,5 +154,9 @@ mod tests { BroadcastChannel::Frontend.pusher_range(), ("1-".to_string().."2-".to_string()) ); + assert_eq!( + BroadcastChannel::Flownode.pusher_range(), + ("2-".to_string().."3-".to_string()) + ); } } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 458f51b0948a..77a31fb75fbf 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -151,17 +151,16 @@ impl GreptimeDbStandaloneBuilder { ); let flow_builder = FlownodeBuilder::new( - 1, // for standalone mode this value is default to one Default::default(), plugins.clone(), table_metadata_manager.clone(), catalog_manager.clone(), ); - let flownode = Arc::new(flow_builder.build().await); + let flownode = Arc::new(flow_builder.build().await.unwrap()); let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), - flow_server: flownode.clone(), + flow_server: flownode.flow_worker_manager(), }); let table_id_sequence = Arc::new( @@ -219,10 +218,11 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - flownode + let flow_manager = flownode.flow_worker_manager(); + flow_manager .set_frontend_invoker(Box::new(instance.clone())) .await; - let _node_handle = flownode.run_background(); + let _node_handle = flow_manager.run_background(); procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap();