Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 25, 2024
1 parent 7219e56 commit 6e62e3f
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 36 deletions.
8 changes: 6 additions & 2 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,14 @@ pub enum Error {
InvalidRole { role: i32, location: Location },

#[snafu(display("Delimiter not found, key: {}", key))]
DelimiterNotFound { key: String },
DelimiterNotFound { key: String, location: Location },

#[snafu(display("Invalid prefix: {}, key: {}", prefix, key))]
MismatchPrefix { prefix: String, key: String },
MismatchPrefix {
prefix: String,
key: String,
location: Location,
},

#[snafu(display("Failed to move values: {err_msg}"))]
MoveValues { err_msg: String, location: Location },
Expand Down
35 changes: 30 additions & 5 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,34 @@
//! All keys have related managers. The managers take care of the serialization and deserialization
//! of keys and values, and the interaction with the underlying KV store backend.
//!
//! To simplify the managers used in struct fields and function parameters, we define a "unify"
//! table metadata manager: [TableMetadataManager]. It contains all the managers defined above.
//! It's recommended to just use this manager only.
//! To simplify the managers used in struct fields and function parameters, we define "unify"
//! table metadata manager: [TableMetadataManager]
//! and flow task metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager).
//! It contains all the managers defined above. It's recommended to just use this manager only.
//!
//! The whole picture of flow task keys will be like this:
//!
//! __flow_task/
//! {catalog}/
//! info/
//! {tsak_id}
//!
//! name/
//! {task_name}
//!
//! flownode/
//! flownode_id/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}
//!
//! source_table/
//! flow_task/
//! {table_id}/
//! {flownode_id}/
//! {task_id}/
//! {partition_id}


pub mod catalog_name;
pub mod datanode_table;
Expand Down Expand Up @@ -138,11 +163,11 @@ pub type RegionDistribution = BTreeMap<DatanodeId, Vec<RegionNumber>>;
/// The id of flow task.
pub type FlowTaskId = u32;
/// The partition of flow task.
pub type PartitionId = u32;
pub type FlowTaskPartitionId = u32;

lazy_static! {
static ref DATANODE_TABLE_KEY_PATTERN: Regex =
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]*)/([0-9]*)$")).unwrap();
Regex::new(&format!("^{DATANODE_TABLE_KEY_PREFIX}/([0-9]+)/([0-9]+)$")).unwrap();
}

lazy_static! {
Expand Down
35 changes: 27 additions & 8 deletions src/common/meta/src/key/flow_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) mod table_task;

use std::ops::Deref;

use common_telemetry::info;
use snafu::{ensure, OptionExt};

use self::flow_task_info::FlowTaskInfoValue;
Expand Down Expand Up @@ -159,36 +160,54 @@ impl FlowTaskMetadataManager {
create_flownode_task_txn,
create_table_task_txn,
]);
info!(
"Creating flow task {}.{}({}), with {} txn operations",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
txn.max_operations()
);

let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_flow_task_name = on_create_flow_task_name_failure(&mut set)?
.context(error::UnexpectedSnafu {
.with_context(||error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task name during the creating flow task, flow_task_id: {flow_task_id}"
),
})?;
ensure!(
remote_flow_task_name.flow_task_id() == flow_task_id,
error::TaskAlreadyExistsSnafu {

if remote_flow_task_name.flow_task_id() != flow_task_id {
info!(
"Trying to create flow task {}.{}({}), but flow task({}) already exists",
flow_task_value.catalog_name,
flow_task_value.task_name,
flow_task_id,
remote_flow_task_name.flow_task_id()
);

return error::TaskAlreadyExistsSnafu {
task_name: format!(
"{}.{}",
flow_task_value.catalog_name, flow_task_value.task_name
),
flow_task_id,
}
);
.fail();
}

let remote_flow_task = on_create_flow_task_failure(&mut set)?
.context(error::UnexpectedSnafu {
let remote_flow_task = on_create_flow_task_failure(&mut set)?.with_context(|| {
error::UnexpectedSnafu {
err_msg: format!(
"Reads the empty flow task during the creating flow task, flow_task_id: {flow_task_id}"
),
})?;
}
})?;
let op_name = "creating flow task";
ensure_values!(*remote_flow_task, flow_task_value, op_name);
}

Ok(())
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/common/meta/src/key/flow_task/flow_task_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{CatalogScoped, MetaKey};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{txn_helper, DeserializedValueWithBytes, FlowTaskId, PartitionId, TableMetaValue};
use crate::key::{
txn_helper, DeserializedValueWithBytes, FlowTaskId, FlowTaskPartitionId, TableMetaValue,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
Expand All @@ -33,7 +35,7 @@ const FLOW_TASK_INFO_KEY_PREFIX: &str = "info";

lazy_static! {
static ref FLOW_TASK_INFO_KEY_PATTERN: Regex =
Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]*)$")).unwrap();
Regex::new(&format!("^{FLOW_TASK_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap();
}

/// The key stores the metadata of the task.
Expand Down Expand Up @@ -119,7 +121,7 @@ pub struct FlowTaskInfoValue {
/// The sink table used by the task.
pub(crate) sink_table: TableId,
/// Which flow nodes this task is running on.
pub(crate) flownode_ids: BTreeMap<PartitionId, FlownodeId>,
pub(crate) flownode_ids: BTreeMap<FlowTaskPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The task name.
Expand All @@ -136,7 +138,7 @@ pub struct FlowTaskInfoValue {

impl FlowTaskInfoValue {
/// Returns the `flownode_id`.
pub fn flownode_ids(&self) -> &BTreeMap<PartitionId, FlownodeId> {
pub fn flownode_ids(&self) -> &BTreeMap<FlowTaskPartitionId, FlownodeId> {
&self.flownode_ids
}

Expand Down
18 changes: 9 additions & 9 deletions src/common/meta/src/key/flow_task/flownode_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, PartitionId};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand All @@ -33,7 +33,7 @@ use crate::FlownodeId;

lazy_static! {
static ref FLOWNODE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]*)/([0-9]*)/([0-9]*)$"
"^{FLOWNODE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
Expand Down Expand Up @@ -63,7 +63,7 @@ impl FlownodeTaskKey {
catalog: String,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
) -> FlownodeTaskKey {
let inner = FlownodeTaskKeyInner::new(flownode_id, flow_task_id, partition_id);
FlownodeTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
Expand Down Expand Up @@ -95,7 +95,7 @@ impl FlownodeTaskKey {
}

/// Returns the [PartitionId].
pub fn partition_id(&self) -> PartitionId {
pub fn partition_id(&self) -> FlowTaskPartitionId {
self.0.partition_id
}
}
Expand All @@ -104,15 +104,15 @@ impl FlownodeTaskKey {
pub struct FlownodeTaskKeyInner {
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
}

impl FlownodeTaskKeyInner {
/// Returns a [FlownodeTaskKey] with the specified `flownode_id`, `flow_task_id` and `partition_id`.
pub fn new(
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
) -> Self {
Self {
flownode_id,
Expand Down Expand Up @@ -159,7 +159,7 @@ impl MetaKey<FlownodeTaskKeyInner> for FlownodeTaskKeyInner {
// Safety: pass the regex check above
let flownode_id = captures[1].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[2].parse::<FlowTaskId>().unwrap();
let partition_id = captures[3].parse::<PartitionId>().unwrap();
let partition_id = captures[3].parse::<FlowTaskPartitionId>().unwrap();

Ok(FlownodeTaskKeyInner {
flownode_id,
Expand Down Expand Up @@ -190,7 +190,7 @@ impl FlownodeTaskManager {
&self,
catalog: &str,
flownode_id: FlownodeId,
) -> BoxStream<'static, Result<(FlowTaskId, PartitionId)>> {
) -> BoxStream<'static, Result<(FlowTaskId, FlowTaskPartitionId)>> {
let start_key = FlownodeTaskKey::range_start_key(catalog.to_string(), flownode_id);
let req = RangeRequest::new().with_prefix(start_key);

Expand All @@ -207,7 +207,7 @@ impl FlownodeTaskManager {
/// Builds a create flownode task transaction.
///
/// Puts `__flownode_task/{flownode_id}/{flow_task_id}/{partition_id}` keys.
pub(crate) fn build_create_txn<I: IntoIterator<Item = (PartitionId, FlownodeId)>>(
pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
Expand Down
16 changes: 8 additions & 8 deletions src/common/meta/src/key/flow_task/table_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use table::metadata::TableId;
use crate::error::{self, Result};
use crate::key::flow_task::FlowTaskScoped;
use crate::key::scope::{BytesAdapter, CatalogScoped, MetaKey};
use crate::key::{FlowTaskId, PartitionId};
use crate::key::{FlowTaskId, FlowTaskPartitionId};
use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
Expand All @@ -35,7 +35,7 @@ const TABLE_TASK_KEY_PREFIX: &str = "source_table";

lazy_static! {
static ref TABLE_TASK_KEY_PATTERN: Regex = Regex::new(&format!(
"^{TABLE_TASK_KEY_PREFIX}/([0-9]*)/([0-9]*)/([0-9]*)/([0-9]*)$"
"^{TABLE_TASK_KEY_PREFIX}/([0-9]+)/([0-9]+)/([0-9]+)/([0-9]+)$"
))
.unwrap();
}
Expand All @@ -46,7 +46,7 @@ struct TableTaskKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
}

/// The key of mapping [TableId] to [FlownodeId] and [FlowTaskId].
Expand Down Expand Up @@ -74,7 +74,7 @@ impl TableTaskKey {
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKey {
let inner = TableTaskKeyInner::new(table_id, flownode_id, flow_task_id, partition_id);
TableTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner)))
Expand Down Expand Up @@ -111,7 +111,7 @@ impl TableTaskKey {
}

/// Returns the [PartitionId].
pub fn partition_id(&self) -> PartitionId {
pub fn partition_id(&self) -> FlowTaskPartitionId {
self.0.partition_id
}
}
Expand All @@ -122,7 +122,7 @@ impl TableTaskKeyInner {
table_id: TableId,
flownode_id: FlownodeId,
flow_task_id: FlowTaskId,
partition_id: PartitionId,
partition_id: FlowTaskPartitionId,
) -> TableTaskKeyInner {
Self {
table_id,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl MetaKey<TableTaskKeyInner> for TableTaskKeyInner {
let table_id = captures[1].parse::<TableId>().unwrap();
let flownode_id = captures[2].parse::<FlownodeId>().unwrap();
let flow_task_id = captures[3].parse::<FlowTaskId>().unwrap();
let partition_id = captures[4].parse::<PartitionId>().unwrap();
let partition_id = captures[4].parse::<FlowTaskPartitionId>().unwrap();
Ok(TableTaskKeyInner::new(
table_id,
flownode_id,
Expand Down Expand Up @@ -218,7 +218,7 @@ impl TableTaskManager {
/// Builds a create table task transaction.
///
/// Puts `__table_task/{table_id}/{node_id}/{partition_id}` keys.
pub fn build_create_txn<I: IntoIterator<Item = (PartitionId, FlownodeId)>>(
pub fn build_create_txn<I: IntoIterator<Item = (FlowTaskPartitionId, FlownodeId)>>(
&self,
catalog: &str,
flow_task_id: FlowTaskId,
Expand Down

0 comments on commit 6e62e3f

Please sign in to comment.