From e993445744c9f51da1e898a08efd3f63788c9f69 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 22 Mar 2023 17:29:24 +0800 Subject: [PATCH] also check watermark type Signed-off-by: Bugen Zhao --- .../src/executor/wrapper/schema_check.rs | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/wrapper/schema_check.rs b/src/stream/src/executor/wrapper/schema_check.rs index 4bc99d3b3c8b..d23eca2b455c 100644 --- a/src/stream/src/executor/wrapper/schema_check.rs +++ b/src/stream/src/executor/wrapper/schema_check.rs @@ -27,13 +27,25 @@ pub async fn schema_check(info: Arc, input: impl MessageStream) { for message in input { let message = message?; - if let Message::Chunk(chunk) = &message { - risingwave_common::util::schema_check::schema_check( + match &message { + Message::Chunk(chunk) => risingwave_common::util::schema_check::schema_check( info.schema.fields().iter().map(|f| &f.data_type), chunk.columns(), - ) - .unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e)); + ), + Message::Watermark(watermark) => { + let expected = info.schema.fields()[watermark.col_idx].data_type(); + let found = &watermark.data_type; + if &expected != found { + Err(format!( + "watermark type mismatched: expected {expected}, found {found}" + )) + } else { + Ok(()) + } + } + Message::Barrier(_) => Ok(()), } + .unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e)); yield message; }