Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source):support _rw_timestamp #7275

Merged
merged 5 commits into from
Jan 14, 2023
Merged

feat(source):support _rw_timestamp #7275

merged 5 commits into from
Jan 14, 2023

Conversation

ZENOTME
Copy link
Contributor

@ZENOTME ZENOTME commented Jan 9, 2023

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

a draft of #7211

The design of this PR:

  1. specify a special 'timestamp' column, which indicate by a 'is_timstamp' field in the SourceColumnDesc.
    pub is_timestamp: bool,
  2. add a specify field 'timestamp' in the SourceMessage,
    pub timestamp: Option<i64>,

    we use it to store timestamp if the source provide it, e.g. kafka
  3. When read data from source, we will try to fill the timestamp column with the timestamp in message.

And the expect behaviour is like following:

for kafka, we have the timestamp column and message with timestamp

select _rw_kafka_timestamp from source_abc;
----
| _rw_kafka_timestamp  |
| '2022-01-01 12:00:00'|

for pulsar, we only have the timestamp column (We can suppot it right now, just a assume)

select _rw_pulsar_timestamp from source_abc;
----
| _rw_pulsar_timestamp  |
|                      |

We will just return a empty result if the source can't support get the timestamp of each record. As #7211 (comment), we should declare it clearly in someplace.
BTW, maybe returnning a error here will be better, e.g. 'Can't support to select timestamp for pulsar' , we can do this in binder.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

@ZENOTME ZENOTME changed the title support _rw_timestamp feat:support _rw_timestamp Jan 9, 2023
@ZENOTME ZENOTME changed the title feat:support _rw_timestamp (draft)feat:support _rw_timestamp Jan 9, 2023
@ZENOTME ZENOTME changed the title (draft)feat:support _rw_timestamp feat(source):support _rw_timestamp Jan 10, 2023
@@ -287,6 +287,7 @@ pub struct SourceMessage {
pub payload: Option<Bytes>,
pub offset: String,
pub split_id: SplitId,
pub timestamp: Option<i64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this is the time when the message gets into the mq. may find a better naming

@@ -23,6 +23,7 @@ impl From<CdcMessage> for SourceMessage {
payload: Some(Bytes::from(message.payload)),
offset: message.offset,
split_id: message.partition.into(),
timestamp: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there may have one. PTAL @StrikeW

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected semantic of the timestamp field?

Copy link
Contributor

@StrikeW StrikeW Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdc source will always be materialized to a table, which means it can be read by batch query. IMO we don't need this timestamp for cdc source. What do you think? @liurenjie1024

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected semantic of the timestamp field?

This is the timestamp when message gets into the mq.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdc source will always be materialized to a table, which means it can be read by batch query.

That's true, we can not run a batch query on a CDC source. So leaving None here is ok and we will not hit this branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected semantic of the timestamp field?

This is the timestamp when message gets into the mq.

cdc source load data directly from connector node by rpc and there is no mq in this process.

@@ -78,6 +78,7 @@ impl DatagenEventGenerator {
payload: Some(Bytes::from(value.to_string())),
offset: self.offset.to_string(),
split_id: self.split_id.clone(),
timestamp: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can always find when the data is generated. Maybe we use the current timestamp for the current chunk

split_id,
timestamp: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp has the same meaning as offset in pubsub

@@ -31,6 +31,7 @@ impl From<NexmarkMessage> for SourceMessage {
payload: Some(msg.payload),
offset: msg.sequence_number.clone(),
split_id: msg.split_id,
timestamp: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. we use the timestamp when data is generated

@@ -58,62 +58,71 @@ fn get_descs() -> Vec<SourceColumnDesc> {
data_type: DataType::Int32,
column_id: ColumnId::from(0),
skip_parse: false,
is_timestamp: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to put this property here. we may leave empty for the mqs that have no timestamp with messages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you means, do you means we can have a interface to create SourceColumnDesc, like
SourceColumnDesc::new(data_type,column_id,skip_parse) so that we don't need to specify the is_timstamp field for the mqs that have no timestamp with messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for one connector, there is one virtual column, such as _rw_kafka_timestamp, right? Timestamps can only be filled into the column.
So I suggest make it part of parser logic and this field is no longer needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that may cause parser have some 'hacky' code like:

if column.name == '_rw_kafka_timestamp' || 
   column.name == '_rw_**_timestamp' {
    ...
}

is that accpectable?

Comment on lines 184 to 187
if let Some(ts) = msg.timestamp {
builder
.row_writer()
.insert_timestamp(Some(i64_to_timestamptz(ts).unwrap().into()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it part of parser?

@tabVersion
Copy link
Contributor

tabVersion commented Jan 10, 2023

hold this pr for a while. Waiting for parser refractory. @waruto210

@waruto210
Copy link
Contributor

hold this pr for a while. Waiting for parser refractory. @waruto210

The first step of parser refactoring should not conflict with here, and I suggest you merge this pr asap to prevent conflicts with subsequent parser refactoring work.

@ZENOTME ZENOTME marked this pull request as ready for review January 10, 2023 11:16
@codecov
Copy link

codecov bot commented Jan 10, 2023

Codecov Report

Merging #7275 (fce2b5e) into main (092be23) will decrease coverage by 0.02%.
The diff coverage is 51.72%.

@@            Coverage Diff             @@
##             main    #7275      +/-   ##
==========================================
- Coverage   73.08%   73.05%   -0.03%     
==========================================
  Files        1069     1069              
  Lines      171777   171864      +87     
==========================================
+ Hits       125539   125561      +22     
- Misses      46238    46303      +65     
Flag Coverage Δ
rust 73.05% <51.72%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/batch/src/executor/source.rs 0.00% <0.00%> (ø)
src/connector/src/source/cdc/source/message.rs 0.00% <0.00%> (ø)
...nnector/src/source/google_pubsub/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/kafka/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/kinesis/source/message.rs 0.00% <0.00%> (ø)
src/connector/src/source/pulsar/source/message.rs 0.00% <0.00%> (ø)
src/source/src/connector_source.rs 67.02% <5.88%> (-3.43%) ⬇️
src/source/src/parser/mod.rs 63.21% <40.74%> (-5.36%) ⬇️
src/connector/src/source/base.rs 66.66% <85.71%> (+0.52%) ⬆️
src/connector/src/source/nexmark/source/message.rs 60.97% <88.88%> (+7.85%) ⬆️
... and 9 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more


self.descs
.iter()
.zip_eq(self.builders.iter_mut())
.enumerate()
.try_for_each(|(idx, (desc, builder))| -> Result<()> {
if desc.is_timestamp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can check if the column name starts with _rw and fill the timestamp here.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 11, 2023

So there is a question here: what is the definition of rw**_timestamp?

actually, this is the time when the message gets into the mq. may find a better naming

For current implementation, our definition is the time when the message gets into the mq.

cdc source load data directly from connector node by rpc and there is no mq in this process.

For cdc, this timestamp is None.

For kafka there are two kinds of timestamp.

CreateTime - timestamp is assigned when producer record is created, so before sending. There can be retries, so there is no guarantee that ordering is preserved.
LogAppendTime - timestamp is assigned when record is appended to the log on the broker. In that case ordering per partition is preserved. Multiple messages might get the same timestamp assigned.

According our definition, when the timestamp is a CreateTime, it should be consider as None, right? @tabVersion @liurenjie1024

pub struct SourceMessage {
pub payload: Option<Bytes>,
pub offset: String,
pub split_id: SplitId,

/// milliseconds timestamp of message entering the mq
pub enter_mq_timestamp: Option<i64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different source may have different meta columns, and it's inappropricate to simply add a timestamp field here. A better design should be adding another enum:

enum SourceMeta {
   KafkaMeta {
     timestamp: i64,
     /// We can add other fields later when necessay, for example offset, topic
   }
}

This way we don't need to add timestamp for every source, but only add necessary fields in future.

Comment on lines 343 to 344
builder: &mut SourceStreamChunkBuilder,
msg_timestamp: Option<i64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are some problems with this design, since it can't expand to other meta fields.

@@ -173,7 +173,7 @@ impl ConnectorSourceReader {
split_offset_mapping.insert(msg.split_id, msg.offset);
if let Err(e) = self
.parser
.parse(content.as_ref(), builder.row_writer())
.parse(content.as_ref(), &mut builder, msg.enter_mq_timestamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handling meta column should happen here rather than parser. Let's use timestamp as an example here, when we found a meta column required in source desc, we should extract it from source message and insert it to result column.

@liurenjie1024
Copy link
Contributor

liurenjie1024 commented Jan 11, 2023

Generally I think there are some problems with current approach, and the main problem is that we can extend it to other meta columns in future, for example topic/partition column. My suggestions are following:

  1. Instead of adding a simple timestamp field in SourceMessage, we should extend it to SourceMeta to make it type safe for different sources.
  2. We should not change current parser, and put extracting logic in reader.
  3. Add is_meta field in SourceColumnDesc to that we can skip it in parser.

@ZENOTME ZENOTME force-pushed the zj/support_timestamp branch 3 times, most recently from 7b2d8ed to 82d1f57 Compare January 11, 2023 11:16
timestamp: if let Timestamp::LogAppendTime(ts) = message.timestamp() {
Some(ts)
} else {
None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to kafka's doc, the CreationTime is in fact event time, so I think it's ok to expose it to end user.

}
}
}

impl From<&ColumnDesc> for SourceColumnDesc {
fn from(c: &ColumnDesc) -> Self {
let is_meta = c.name.starts_with("_rw");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we only support _rw_kafka_timestamp as meta, so please use the column name explicitly. We may change name convention later when we decide to add more meta columns.

src/source/src/connector_source.rs Show resolved Hide resolved
Ok(())
})
.inspect_err(|e| {
tracing::warn!("failed to parse source data: {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering do we really need a Result here? If we failed, the whole row should be reverted, not just this builder for meta column.

/// This function is used to fulfill this hollow in `meta_column_builder`.
/// e.g after fulfill
/// `data_budiler` = [1], `meta_column_builder` = [1], `op` = [insert]
pub fn fulfill(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to fullfill_meta_column?

@wangrunji0408
Copy link
Contributor

The failing build for simulation will be fixed by #7361.

mergify bot pushed a commit that referenced this pull request Jan 12, 2023
This PR bumps rdkafka version to v0.29.0. (madsim-rdkafka v0.2.13-alpha)
Also fixes the missing API in madsim used by #7275.

Notice a change from rdkafka v0.28 -> v0.29:
- The return type of `Producer::flush` changed to `KafkaResult<()>`.

Approved-By: BugenZhao
Approved-By: liurenjie1024
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, please add e2e tests to this.

@ZENOTME
Copy link
Contributor Author

ZENOTME commented Jan 13, 2023

Generally LGTM, please add e2e tests to this.

There is a problem: how do we add e2e test for this... We can't infer the _rw_kafka_timestamp.

@liurenjie1024
Copy link
Contributor

Generally LGTM, please add e2e tests to this.

There is a problem: how do we add e2e test for this... We can't infer the _rw_kafka_timestamp.

Yes, it's a little difficult. We can attach a timestamp to each kafka record, but as far as I know, no tool can support this. We need to use kafka producer api for this.

https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge it first.

@mergify
Copy link
Contributor

mergify bot commented Jan 13, 2023

Hey @ZENOTME, this pull request failed to merge and has been dequeued from the merge train. If you believe your PR failed in the merge train because of a flaky test, requeue it by clicking "Update branch" or pushing an empty commit with git commit --allow-empty -m "rerun" && git push.

@liurenjie1024
Copy link
Contributor

@mergify requeue

@mergify
Copy link
Contributor

mergify bot commented Jan 13, 2023

requeue

✅ The queue state of this pull request has been cleaned. It can be re-embarked automatically

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants