Skip to content

Commit

Permalink
fix message offset of opendal source
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc committed Dec 9, 2024
1 parent 430fbb9 commit 099c8da
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,16 @@ impl<Src: OpendalSource> OpendalReader<Src> {
// note that the buffer contains the newline character
debug_assert_eq!(n_read, line_buf.len());

// FIXME(rc): Here we have to use `offset + n_read`, i.e. the offset of the next line,
// as the *message offset*, because we check whether a file is finished by comparing the
// message offset with the file size in `FsFetchExecutor::into_stream`. However, we must
// understand that this message offset is not semantically consistent with the offset of
// other source connectors.
let msg_offset = (offset + n_read).to_string();
batch.push(SourceMessage {
key: None,
payload: Some(std::mem::take(&mut line_buf).into_bytes()),
offset: offset.to_string(),
offset: msg_offset,
split_id: split.id(),
meta: SourceMeta::Empty,
});
Expand Down
5 changes: 5 additions & 0 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
}
_ => unreachable!(),
};
// FIXME(rc): Here we compare `offset` with `fs_split.size` to determine
// whether the file is finished, where the `offset` is the starting position
// of the NEXT message line in the file. However, In other source connectors,
// we use the word `offset` to represent the offset of the current message.
// We have to be careful about this semantical inconsistency.
if offset.parse::<usize>().unwrap() >= fs_split.size {
splits_on_fetch -= 1;
state_store_handler.delete(split_id).await?;
Expand Down

0 comments on commit 099c8da

Please sign in to comment.