Skip to content

Commit

Permalink
feat: flownode use Inserter to write to database (#4323)
Browse files Browse the repository at this point in the history
* feat: use `Inserter` as Frontend

* fix: enable procedure in flownode

* docs: remove `frontend_addr` opts

* chore: rm fe addr in test runner

* refactor: int test also use inserter invoker

* feat: flow shutdown&refactor: remove `Frontendinvoker`

* refactor: rename `RemoteFrontendInvoker` to `FrontendInvoker`

* refactor: per review

* refactor: remove a layer of  box

* fix: standalone use `node_manager`

* fix: remove a `Arc` cycle
  • Loading branch information
discord9 authored Jul 9, 2024
1 parent 185953e commit 1ddf19d
Show file tree
Hide file tree
Showing 17 changed files with 287 additions and 280 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | `None` | The flownode identifier and should be unique in the cluster. |
| `frontend_addr` | String | `http://127.0.0.1:4001` | Frontend grpc address. Used by flownode to write result back to frontend. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,<br/>and used for connections from outside the host |
Expand Down
3 changes: 0 additions & 3 deletions config/flownode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ mode = "distributed"
## +toml2docs:none-default
node_id = 14

## Frontend grpc address. Used by flownode to write result back to frontend.
frontend_addr = "http://127.0.0.1:4001"

## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
Expand Down
55 changes: 26 additions & 29 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@ use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use tonic::transport::Endpoint;
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, TonicTransportSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -132,10 +133,6 @@ struct StartCommand {
/// Metasrv address list;
#[clap(long, value_delimiter = ',', num_args = 1..)]
metasrv_addrs: Option<Vec<String>>,
/// The gprc address of the frontend server used for writing results back to the database.
/// Need prefix i.e. "http://"
#[clap(long)]
frontend_addr: Option<String>,
/// The configuration file for flownode
#[clap(short, long)]
config_file: Option<String>,
Expand Down Expand Up @@ -186,10 +183,6 @@ impl StartCommand {
opts.grpc.hostname.clone_from(hostname);
}

if let Some(fe_addr) = &self.frontend_addr {
opts.frontend_addr = Some(fe_addr.clone());
}

if let Some(node_id) = self.node_id {
opts.node_id = Some(node_id);
}
Expand Down Expand Up @@ -228,10 +221,6 @@ impl StartCommand {

let opts = opts.component;

let frontend_addr = opts.frontend_addr.clone().context(MissingConfigSnafu {
msg: "'frontend_addr'",
})?;

// TODO(discord9): make it not optionale after cluster id is required
let cluster_id = opts.cluster_id.unwrap_or(0);

Expand Down Expand Up @@ -286,7 +275,8 @@ impl StartCommand {
layered_cache_registry.clone(),
);

let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend));
let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
table_metadata_manager
.init()
.await
Expand All @@ -310,26 +300,33 @@ impl StartCommand {
opts,
Plugins::new(),
table_metadata_manager,
catalog_manager,
catalog_manager.clone(),
)
.with_heartbeat_task(heartbeat_task);

let flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;

// set up the lazy connection to the frontend server
// TODO(discord9): consider move this to start() or pre_start()?
let endpoint =
Endpoint::from_shared(frontend_addr.clone()).context(TonicTransportSnafu {
msg: Some(format!("Fail to create from addr={}", frontend_addr)),
})?;
let chnl = endpoint.connect().await.context(TonicTransportSnafu {
msg: Some("Fail to connect to frontend".to_string()),
})?;
info!("Connected to frontend server: {:?}", frontend_addr);
let client = flow::FrontendClient::new(chnl);
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));

let invoker = FrontendInvoker::build_from(
flownode.flow_worker_manager().clone(),
catalog_manager.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
meta_client.clone(),
client,
)
.await
.context(StartFlownodeSnafu)?;
flownode
.flow_worker_manager()
.set_frontend_invoker(Box::new(client))
.set_frontend_invoker(invoker)
.await;

Ok(Instance::new(flownode, guard))
Expand Down
67 changes: 47 additions & 20 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use common_wal::config::StandaloneWalConfig;
use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeBuilder;
use flow::{FlowWorkerManager, FlownodeBuilder, FrontendInvoker};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
Expand All @@ -61,13 +61,15 @@ use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use tokio::sync::broadcast;
use tracing_appender::non_blocking::WorkerGuard;

use crate::error::{
BuildCacheRegistrySnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu,
InitMetadataSnafu, InitTimezoneSnafu, LoadLayeredConfigSnafu, OtherSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
ShutdownDatanodeSnafu, ShutdownFlownodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu,
StartFlownodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
Expand Down Expand Up @@ -214,6 +216,9 @@ impl StandaloneOptions {
pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
// TODO(discord9): wrapped it in flownode instance instead
flow_worker_manager: Arc<FlowWorkerManager>,
flow_shutdown: broadcast::Sender<()>,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,

Expand Down Expand Up @@ -245,6 +250,9 @@ impl App for Instance {
.context(StartFrontendSnafu)?;

self.frontend.start().await.context(StartFrontendSnafu)?;
self.flow_worker_manager
.clone()
.run_background(Some(self.flow_shutdown.subscribe()));
Ok(())
}

Expand All @@ -263,6 +271,15 @@ impl App for Instance {
.shutdown()
.await
.context(ShutdownDatanodeSnafu)?;
self.flow_shutdown
.send(())
.map_err(|_e| {
flow::error::InternalSnafu {
reason: "Failed to send shutdown signal to flow worker manager, all receiver end already closed".to_string(),
}
.build()
})
.context(ShutdownFlownodeSnafu)?;
info!("Datanode instance stopped.");

Ok(())
Expand Down Expand Up @@ -447,6 +464,12 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

let flow_builder = FlownodeBuilder::new(
Default::default(),
fe_plugins.clone(),
Expand All @@ -461,12 +484,6 @@ impl StartCommand {
.context(OtherSnafu)?,
);

let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_worker_manager(),
Expand Down Expand Up @@ -510,24 +527,32 @@ impl StartCommand {

let mut frontend = FrontendBuilder::new(
fe_opts.clone(),
kv_backend,
layered_cache_registry,
catalog_manager,
node_manager,
ddl_task_executor,
kv_backend.clone(),
layered_cache_registry.clone(),
catalog_manager.clone(),
node_manager.clone(),
ddl_task_executor.clone(),
)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;

// flow server need to be able to use frontend to write insert requests back
let flow_worker_manager = flownode.flow_worker_manager();
flow_worker_manager
.set_frontend_invoker(Box::new(frontend.clone()))
.await;
// TODO(discord9): unify with adding `start` and `shutdown` method to flownode too.
let _handle = flow_worker_manager.run_background();
// flow server need to be able to use frontend to write insert requests back
let invoker = FrontendInvoker::build_from(
flow_worker_manager.clone(),
catalog_manager.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
ddl_task_executor.clone(),
node_manager,
)
.await
.context(StartFlownodeSnafu)?;
flow_worker_manager.set_frontend_invoker(invoker).await;

let (tx, _rx) = broadcast::channel(1);

let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins)
.build()
Expand All @@ -540,6 +565,8 @@ impl StartCommand {
Ok(Instance {
datanode,
frontend,
flow_worker_manager,
flow_shutdown: tx,
procedure_manager,
wal_options_allocator,
_guard: guard,
Expand Down
38 changes: 0 additions & 38 deletions src/common/frontend/src/handler.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/common/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@
// limitations under the License.

pub mod error;
pub mod handler;
3 changes: 3 additions & 0 deletions src/flow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true
Expand Down Expand Up @@ -47,6 +48,8 @@ meta-client.workspace = true
minstant = "0.1.7"
nom = "7.1.3"
num-traits = "0.2"
operator.workspace = true
partition.workspace = true
prost.workspace = true
query.workspace = true
serde.workspace = true
Expand Down
Loading

0 comments on commit 1ddf19d

Please sign in to comment.