Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): refactor how upstream fragment is handled when creating stream job #14510

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,17 @@ message DropViewResponse {
uint64 version = 2;
}

// An enum to distinguish different types of the Table streaming job.
// An enum to distinguish different types of the `Table` streaming job.
// - GENERAL: Table streaming jobs w/ or w/o a connector
// - SHARED_CDC_SOURCE: The table streaming job is created based on a shared CDC source job (risingwavelabs/rfcs#73).
//
// And one may add other types to support Table jobs that based on other backfill-able sources (risingwavelabs/rfcs#72).
//
// Currently, it's usages include:
// - When creating the streaming actor graph, different table jobs may need different treatment.
// - Some adhoc validation when creating the streaming job. e.g., `validate_cdc_table`.
//
// It's not included in `catalog.Table`, and thus not persisted. It's only used in the `CreateTableRequest`.
enum TableJobType {
TABLE_JOB_TYPE_UNSPECIFIED = 0;
// table streaming jobs excepts the `SHARED_CDC_SOURCE` type
Expand Down
3 changes: 2 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ message StreamActor {
plan_common.ExprContext expr_context = 10;
}

// Indicates whether the fragment contains some special kind of nodes.
enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_FRAGMENT_UNSPECIFIED = 0;
FRAGMENT_TYPE_FLAG_SOURCE = 1;
Expand All @@ -864,7 +865,7 @@ message StreamFragmentGraph {
uint32 fragment_id = 1;
// root stream node in this fragment.
StreamNode node = 2;
// Bitwise-OR of FragmentTypeFlags
// Bitwise-OR of `FragmentTypeFlag`s
uint32 fragment_type_mask = 3;
// Mark whether this fragment requires exactly one actor.
// Note: if this is `false`, the fragment may still be a singleton according to the scheduler.
Expand Down
24 changes: 11 additions & 13 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use risingwave_meta_model_v2::{
TableId, VnodeBitmap, WorkerId,
};
use risingwave_pb::common::PbParallelUnit;
use risingwave_pb::ddl_service::PbTableJobType;
use risingwave_pb::meta::subscribe_response::{
Info as NotificationInfo, Operation as NotificationOperation,
};
Expand Down Expand Up @@ -1034,31 +1033,30 @@ impl CatalogController {
Ok(actors)
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_job_ids: Vec<ObjectId>,
job_type: Option<PbTableJobType>,
) -> MetaResult<HashMap<ObjectId, PbFragment>> {
let inner = self.inner.read().await;

let mut fragments = Fragment::find()
let all_upstream_fragments = Fragment::find()
.filter(fragment::Column::JobId.is_in(upstream_job_ids))
.all(&inner.db)
.await?;
fragments.retain(|f| match job_type {
Some(PbTableJobType::SharedCdcSource) => {
f.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0
}
// MV on MV, and other kinds of table job
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
None | Some(PbTableJobType::General) | Some(PbTableJobType::Unspecified) => {
f.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0
// job_id -> fragment
let mut fragments = HashMap::<ObjectId, fragment::Model>::new();
for fragment in all_upstream_fragments {
if fragment.fragment_type_mask & PbFragmentTypeFlag::Mview as i32 != 0 {
_ = fragments.insert(fragment.job_id, fragment);
} else if fragment.fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
// look for Source fragment if there's no MView fragment
_ = fragments.try_insert(fragment.job_id, fragment);
Comment on lines +1052 to +1053
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I made a mistake, then I finally noticed it is try_insert 😅

}
});
}

let parallel_units_map = get_parallel_unit_mapping(&inner.db).await?;
let mut root_fragments = HashMap::new();
for fragment in fragments {
for (_, fragment) in fragments {
let actors = fragment.find_related(Actor).all(&inner.db).await?;
let actor_dispatchers = get_actor_dispatchers(
&inner.db,
Expand Down
21 changes: 6 additions & 15 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping}
use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont};
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
Expand Down Expand Up @@ -1398,11 +1397,9 @@ impl FragmentManager {
.mview_actor_ids())
}

/// Get and filter the upstream `Materialize` or `Source` fragments of the specified relations.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
let map = &self.core.read().await.table_fragments;
let mut fragments = HashMap::new();
Expand All @@ -1411,18 +1408,12 @@ impl FragmentManager {
let table_fragments = map
.get(&table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
match table_job_type.as_ref() {
Some(TableJobType::SharedCdcSource) => {
if let Some(fragment) = table_fragments.source_fragment() {
fragments.insert(table_id, fragment);
}
}
// MV on MV, and other kinds of table job
None | Some(TableJobType::General) | Some(TableJobType::Unspecified) => {
if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
}
}

if let Some(fragment) = table_fragments.mview_fragment() {
fragments.insert(table_id, fragment);
} else if let Some(fragment) = table_fragments.source_fragment() {
// look for Source fragment if there's no MView fragment
fragments.insert(table_id, fragment);
Comment on lines +1415 to +1416
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite counter-intu😄tive. May add some comments to show that this is for Source job?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's already on pub async fn get_upstream_root_fragments.

}
}

Expand Down
19 changes: 15 additions & 4 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use risingwave_common::catalog::{TableId, TableOption};
use risingwave_pb::catalog::PbSource;
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerType};
use risingwave_pb::ddl_service::TableJobType;
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
use risingwave_pb::meta::table_fragments::Fragment;
use risingwave_pb::stream_plan::PbStreamActor;
Expand Down Expand Up @@ -174,15 +173,28 @@ impl MetadataManager {
}
}

/// Get and filter the "**root**" fragments of the specified relations.
/// The root fragment is the bottom-most fragment of its fragment graph, and can be a `MView` or a `Source`.
///
/// ## What can be the root fragment
/// - For MV, it should have one `MView` fragment.
/// - For table, it should have one `MView` fragment and one or two `Source` fragments. `MView` should be the root.
/// - For source, it should have one `Source` fragment.
///
/// In other words, it's the `MView` fragment if it exists, otherwise it's the `Source` fragment.
///
/// ## What do we expect to get for different creating streaming job
/// - MV/Sink/Index should have MV upstream fragments for upstream MV/Tables, and Source upstream fragments for upstream backfill-able sources.
/// - CDC Table has a Source upstream fragment.
/// - Sources and other Tables shouldn't have an upstream fragment.
pub async fn get_upstream_root_fragments(
&self,
upstream_table_ids: &HashSet<TableId>,
table_job_type: Option<TableJobType>,
) -> MetaResult<HashMap<TableId, Fragment>> {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_upstream_root_fragments(upstream_table_ids, table_job_type)
.get_upstream_root_fragments(upstream_table_ids)
.await
}
MetadataManager::V2(mgr) => {
Expand All @@ -193,7 +205,6 @@ impl MetadataManager {
.iter()
.map(|id| id.table_id as _)
.collect(),
table_job_type,
)
.await?;
Ok(upstream_root_fragments
Expand Down
33 changes: 22 additions & 11 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use crate::model::FragmentId;
// This enum is used in order to re-use code in `DdlServiceImpl` for creating MaterializedView and
// Sink.
#[derive(Debug, Clone, EnumDiscriminants)]
#[strum_discriminants(name(DdlType))]
#[strum_discriminants(vis(pub))]
Comment on lines -29 to -30
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A step back... 🤣

pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Expand All @@ -36,13 +34,34 @@ pub enum StreamingJob {
Source(PbSource),
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum DdlType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}

impl From<&StreamingJob> for DdlType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => DdlType::MaterializedView,
StreamingJob::Sink(_, _) => DdlType::Sink,
StreamingJob::Table(_, _, ty) => DdlType::Table(*ty),
StreamingJob::Index(_, _) => DdlType::Index,
StreamingJob::Source(_) => DdlType::Source,
}
}
}

#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for DdlType {
fn default() -> Self {
// This should not be used by mock services,
// so we can just pick an arbitrary default variant.
DdlType::Table
DdlType::MaterializedView
}
}

Expand Down Expand Up @@ -259,14 +278,6 @@ impl StreamingJob {
}
}

pub fn table_job_type(&self) -> Option<TableJobType> {
if let Self::Table(.., sub_type) = self {
Some(*sub_type)
} else {
None
}
}

// TODO: record all objects instead.
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
Expand Down
12 changes: 6 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,7 @@ impl DdlController {
}

/// Builds the actor graph:
/// - Add the upstream fragments to the fragment graph
/// - Schedule the fragments based on their distribution
/// - Expand each fragment into one or several actors
pub(crate) async fn build_stream_job(
Expand All @@ -1274,10 +1275,7 @@ impl DdlController {

let upstream_root_fragments = self
.metadata_manager
.get_upstream_root_fragments(
fragment_graph.dependent_table_ids(),
stream_job.table_job_type(),
)
.get_upstream_root_fragments(fragment_graph.dependent_table_ids())
.await?;

let upstream_actors: HashMap<_, _> = upstream_root_fragments
Expand All @@ -1293,7 +1291,7 @@ impl DdlController {
let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
fragment_graph,
upstream_root_fragments,
stream_job.table_job_type(),
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1713,6 +1711,7 @@ impl DdlController {
fragment_graph,
original_table_fragment.fragment_id,
downstream_fragments,
stream_job.into(),
)?;

// 2. Build the actor graph.
Expand Down Expand Up @@ -1975,7 +1974,8 @@ impl DdlController {
}
}

/// Fill in necessary information for table stream graph.
/// Fill in necessary information for `Table` stream graph.
/// e.g., fill source id for table with connector, fill external table id for CDC table.
pub fn fill_table_stream_graph_info(
source: &mut PbSource,
table: &mut PbTable,
Expand Down
Loading
Loading