Skip to content

Commit

Permalink
support _rw_timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Jan 10, 2023
1 parent b0c36c1 commit 3340b25
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ pub struct SourceMessage {
pub payload: Option<Bytes>,
pub offset: String,
pub split_id: SplitId,
pub timestamp: Option<i64>,
}

/// The message pumped from the external source service.
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl From<CdcMessage> for SourceMessage {
payload: Some(Bytes::from(message.payload)),
offset: message.offset,
split_id: message.partition.into(),
timestamp: None,
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl DatagenEventGenerator {
payload: Some(Bytes::from(value.to_string())),
offset: self.offset.to_string(),
split_id: self.split_id.clone(),
timestamp: None,
});
self.offset += 1;
rows_generated_this_second += 1;
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/source/google_pubsub/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ impl From<TaggedReceivedMessage> for SourceMessage {
_ => Some(Bytes::from(payload)),
}
},

offset: timestamp.timestamp_nanos().to_string(),

split_id,
timestamp: None,
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl<'a> From<BorrowedMessage<'a>> for SourceMessage {
payload: message.payload().map(Bytes::copy_from_slice),
offset: message.offset().to_string(),
split_id: message.partition().to_string().into(),
timestamp: message.timestamp().to_millis(),
}
}
}
1 change: 1 addition & 0 deletions src/connector/src/source/kinesis/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl From<KinesisMessage> for SourceMessage {
payload: Some(msg.payload),
offset: msg.sequence_number.clone(),
split_id: msg.shard_id,
timestamp: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/nexmark/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl From<NexmarkMessage> for SourceMessage {
payload: Some(msg.payload),
offset: msg.sequence_number.clone(),
split_id: msg.split_id,
timestamp: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/pulsar/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl From<Message<Vec<u8>>> for SourceMessage {
message_id.batch_index.unwrap_or(-1)
),
split_id: msg.topic.into(),
timestamp: None,
}
}
}
9 changes: 9 additions & 0 deletions src/source/benches/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,62 +58,71 @@ fn get_descs() -> Vec<SourceColumnDesc> {
data_type: DataType::Int32,
column_id: ColumnId::from(0),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "bool".to_string(),
data_type: DataType::Boolean,
column_id: ColumnId::from(2),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "i16".to_string(),
data_type: DataType::Int16,
column_id: ColumnId::from(3),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "i64".to_string(),
data_type: DataType::Int64,
column_id: ColumnId::from(4),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "f32".to_string(),
data_type: DataType::Float32,
column_id: ColumnId::from(5),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "f64".to_string(),
data_type: DataType::Float64,
column_id: ColumnId::from(6),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "varchar".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(7),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "date".to_string(),
data_type: DataType::Date,
column_id: ColumnId::from(8),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "timestamp".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(9),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
]
Expand Down
9 changes: 9 additions & 0 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_connector::source::{
SplitReaderImpl,
};
use risingwave_connector::ConnectorParams;
use risingwave_expr::vector_op::cast::i64_to_timestamptz;
use risingwave_pb::catalog::{
ColumnIndex as ProstColumnIndex, StreamSourceInfo as ProstStreamSourceInfo,
};
Expand Down Expand Up @@ -179,6 +180,14 @@ impl ConnectorSourceReader {
tracing::warn!("message parsing failed {}, skipping", e.to_string());
continue;
}
// fulfill the timestamp column if it exists
if let Some(ts) = msg.timestamp {
builder
.row_writer()
.insert_timestamp(Some(i64_to_timestamptz(ts).unwrap().into()));
} else {
builder.row_writer().insert_timestamp(None);
}
}
}
yield StreamChunkWithState {
Expand Down
4 changes: 4 additions & 0 deletions src/source/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct SourceColumnDesc {
pub fields: Vec<ColumnDesc>,
/// Now `skip_parse` is used to indicate whether the column is a row id column.
pub skip_parse: bool,
pub is_timestamp: bool,
}

impl SourceColumnDesc {
Expand All @@ -43,18 +44,21 @@ impl SourceColumnDesc {
column_id,
fields: vec![],
skip_parse: false,
is_timestamp: false,
}
}
}

impl From<&ColumnDesc> for SourceColumnDesc {
fn from(c: &ColumnDesc) -> Self {
let is_timestamp = c.name.ends_with("timestamp") && c.name.starts_with("_rw");
Self {
name: c.name.clone(),
data_type: c.data_type.clone(),
column_id: c.column_id,
fields: c.field_descs.clone(),
skip_parse: false,
is_timestamp,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/source/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,69 +491,79 @@ mod test {
data_type: DataType::Int32,
column_id: ColumnId::from(0),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "sequence_id".to_string(),
data_type: DataType::Int64,
column_id: ColumnId::from(1),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "name".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(2),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "score".to_string(),
data_type: DataType::Float32,
column_id: ColumnId::from(3),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "avg_score".to_string(),
data_type: DataType::Float64,
column_id: ColumnId::from(4),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "is_lasted".to_string(),
data_type: DataType::Boolean,
column_id: ColumnId::from(5),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "entrance_date".to_string(),
data_type: DataType::Date,
column_id: ColumnId::from(6),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "birthday".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(7),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "anniversary".to_string(),
data_type: DataType::Timestamp,
column_id: ColumnId::from(8),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "passed".to_string(),
data_type: DataType::Interval,
column_id: ColumnId::from(9),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
]
Expand Down
4 changes: 4 additions & 0 deletions src/source/src/parser/debezium/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,31 @@ mod test {
data_type: DataType::Int32,
column_id: ColumnId::from(0),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "name".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(1),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "description".to_string(),
data_type: DataType::Varchar,
column_id: ColumnId::from(2),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
SourceColumnDesc {
name: "weight".to_string(),
data_type: DataType::Float64,
column_id: ColumnId::from(3),
skip_parse: false,
is_timestamp: false,
fields: vec![],
},
];
Expand Down
29 changes: 21 additions & 8 deletions src/source/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,31 +189,31 @@ impl SourceStreamChunkRowWriter<'_> {
&mut self,
mut f: impl FnMut(&SourceColumnDesc) -> Result<A::Output>,
) -> Result<WriteGuard> {
// The closure `f` may fail so that a part of builders were appended incompletely.
// Loop invariant: `builders[0..appended_idx)` has been appended on every iter ended or loop
// exited.
let mut appended_idx = 0;
let mut modify_col = vec![];

self.descs
.iter()
.zip_eq(self.builders.iter_mut())
.enumerate()
.try_for_each(|(idx, (desc, builder))| -> Result<()> {
if desc.is_timestamp {
return Ok(());
}
let output = if desc.skip_parse {
A::DEFAULT_OUTPUT
} else {
f(desc)?
};
A::apply(builder, output);
appended_idx = idx + 1;
modify_col.push(idx);

Ok(())
})
.inspect_err(|e| {
tracing::warn!("failed to parse source data: {}", e);
self.builders[..appended_idx]
.iter_mut()
.for_each(A::rollback);
modify_col.iter().for_each(|idx| {
A::rollback(&mut self.builders[*idx]);
});
})?;

A::finish(self);
Expand All @@ -234,6 +234,19 @@ impl SourceStreamChunkRowWriter<'_> {
self.do_action::<OpActionInsert>(f)
}

pub fn insert_timestamp(&mut self, ts: Datum) {
if self.op_builder.last() == Some(&Op::Insert) {
if let Some((_, builder)) = self
.descs
.iter()
.zip_eq(self.builders.iter_mut())
.find(|(desc, _)| desc.is_timestamp)
{
builder.append_datum(&ts)
}
}
}

/// Write a `Delete` record to the [`StreamChunk`].
///
/// # Arguments
Expand Down

0 comments on commit 3340b25

Please sign in to comment.