From a985cdcf0b973d66e143b4c442d89d37d1141a9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Wed, 24 Apr 2024 17:40:15 +0800 Subject: [PATCH] Add ballista-mvp part4 blog --- .../2024-04-24-ballista-mvp-part4/index.md | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 content/blog/2024-04-24-ballista-mvp-part4/index.md diff --git a/content/blog/2024-04-24-ballista-mvp-part4/index.md b/content/blog/2024-04-24-ballista-mvp-part4/index.md new file mode 100644 index 0000000..d84f91a --- /dev/null +++ b/content/blog/2024-04-24-ballista-mvp-part4/index.md @@ -0,0 +1,121 @@ ++++ +title = "Ballista 分布式查询引擎 - 集群共享状态" +date = 2024-04-24 ++++ + +集群共享状态会被存储在共享 kv 中,被所有 Scheduler 节点读写。 + +```rust +/// 集群节点状态 +#[tonic::async_trait] +pub trait ClusterState: Send + Sync + 'static { + /// 启动时初始化 + async fn init(&self) -> Result<()> { + Ok(()) + } + + /// 将可执行的 tasks 分配到对应的 executor 计算资源槽上 + async fn bind_schedulable_tasks( + &self, + distribution: TaskDistributionPolicy, + active_jobs: Arc>, + executors: Option>, + ) -> Result>; + + /// 解绑 tasks,释放 executor 计算资源 + async fn unbind_tasks(&self, executor_slots: Vec) -> Result<()>; + + /// 注册 executor + async fn register_executor(&self, metadata: ExecutorMetadata, spec: ExecutorData) + -> Result<()>; + + /// 保存 executor 元信息 + async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()>; + + /// 读取 executor 元信息 + async fn get_executor_metadata(&self, executor_id: &str) -> Result; + + /// 保存 executor 最新心跳信息 + async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()>; + + /// 从集群中移除 executor + async fn remove_executor(&self, executor_id: &str) -> Result<()>; + + /// 返回所有活跃 executor 的最新心跳信息 + fn executor_heartbeats(&self) -> HashMap; + + /// 读取 executor 最新心跳信息 + fn get_executor_heartbeat(&self, executor_id: &str) -> Option; +} + + +/// 集群作业状态 +#[tonic::async_trait] +pub trait JobState: Send + Sync { + /// 接收作业进本地队列(并没有存储在共享 kv 中) + fn accept_job(&self, job_id: &str, queued_at: u64) -> Result<()>; + + /// 提交作业 + async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()>; + + /// 获取所有活跃 job id + async fn get_jobs(&self) -> Result>; + + /// 获取作业状态 + async fn get_job_status(&self, job_id: &str) -> Result>; + + /// 获取作业的执行图(分布式执行计划) + async fn get_execution_graph(&self, job_id: &str) -> Result>; + + /// 保存作业执行图状态 + async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> Result<()>; + + /// 标记作业失败(planning阶段失败) + async fn fail_unscheduled_job(&self, job_id: &str, reason: String) -> Result<()>; + + /// 移除作业 + async fn remove_job(&self, job_id: &str) -> Result<()>; + + /// 读取会话 + async fn get_session(&self, session_id: &str) -> Result>; + + /// 创建一个会话 + async fn create_session(&self, config: &BallistaConfig) -> Result>; +} +``` + +**Executor 元信息** + +主要包括 Executor 的 id、ip 和 端口等信息。在 Executor 节点启动时,会往 Scheduler 注册,Scheduler 会将 Executor 元信息存储到共享 kv 中。 + +**Executor 心跳信息** + +主要包括 Executor 的 id、此次心跳时间戳 和 executor 状态。 +1. 在 Executor 启动注册时,会往共享 kv 中存入心跳信息 +2. 在 Executor 定时发送心跳时,会往共享 kv 中覆盖心跳信息 +3. 在检查 Executor 心跳是否过期的定时任务中,会移除共享 kv 中过期的 Executor 的心跳信息 + +**Executor 资源信息** + +主要包括 Executor 还有多少计算资源(一个任务执行占据一个资源槽位)。 +1. 在注册 executor 时,会增加资源槽位总量 +2. 在分配可执行 tasks 到 executor 时,会消耗资源槽位 +3. 在 tasks 执行完毕时,释放资源槽位 + +**会话信息** + +会话信息主要包括作业配置(如配置多少并行度),Client 连接 Scheduler 时,会创建一个会话并存储在共享 kv 中。 + +**作业执行图(分布式执行计划)** + +主要包括整个分布式执行计划,以及每个 stage 的执行状态。 +1. 在提交作业时,会将整个执行图保存到共享 kv 中 +2. 在作业执行成功/终止作业/作业执行有更新时,会更新整个执行图到共享 kv 中 +3. 在作业执行完毕后,会从共享 kv 中移除 + +**作业状态** + +作业状态因为读取频繁,被额外单独存储(作业状态可以从执行图中获取)。 +1. 在提交作业时,会将作业状态保存到共享 kv 中 +2. 在作业执行成功/终止作业/作业执行有更新时,会更新作业状态到共享 kv 中 +3. 在作业执行完毕后,会从共享 kv 中移除 \ No newline at end of file