Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(file source): fix message offset of opendal source #19721

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,16 @@ impl<Src: OpendalSource> OpendalReader<Src> {
// note that the buffer contains the newline character
debug_assert_eq!(n_read, line_buf.len());

// FIXME(rc): Here we have to use `offset + n_read`, i.e. the offset of the next line,
// as the *message offset*, because we check whether a file is finished by comparing the
// message offset with the file size in `FsFetchExecutor::into_stream`. However, we must
// understand that this message offset is not semantically consistent with the offset of
// other source connectors.
let msg_offset = (offset + n_read).to_string();
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: offset.to_string(),
offset: msg_offset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my PoV, the meaning of "offset" for file source is too hard to understand and evaluate. (And my brain refuse to think about it.) Is it possible to change to sth like (start,end)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to change to sth like (start,end)?

Agree, will consider doing this later.

split_id: split.id(),
meta: SourceMeta::Empty,
});
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}
_ => unreachable!(),
};
// FIXME(rc): Here we compare `offset` with `fs_split.size` to determine
// whether the file is finished, where the `offset` is the starting position
// of the NEXT message line in the file. However, In other source connectors,
// we use the word `offset` to represent the offset of the current message.
// We have to be careful about this semantical inconsistency.
if offset.parse::<usize>().unwrap() >= fs_split.size {
splits_on_fetch -= 1;
state_store_handler.delete(split_id).await?;
Expand Down
Loading