-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow large timestamp skew for at-least-once streaming #29858
Conversation
R: @bvolpato |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
@Override | ||
public Duration getAllowedTimestampSkew() { | ||
return Duration.millis(Long.MAX_VALUE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageApiWritesShardedRecords has
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
}
I think the semantics are the same, just to confirm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup that value is equal to Long.MAX_VALUE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep the consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) { | ||
return new BigQueryServices.StreamAppendClient() { | ||
@Override | ||
public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the call made to send rows to BQ. It returns an ApiFuture that may contain an exception. Here we force it to always return the exception and expect the connector to handle this properly
R: @bvolpato do you ming taking another look? I added a test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test looks awesome, thanks Ahmed!
We maintain a buffer of element values and their timestamps, then flush when necessary (e.g. after reaching some max num elements). By the time we're forced to flush, we arrive at an element E with a timestamp potentially greater than some/most of the elements we have in the buffer. When we flush and some elements fail, we attempt to output them with their original timestamps, which may be earlier than the timestamp of the current element we are processing. Doing so triggers the following error:
Failed to insert row and could not parse the result!
java.lang.IllegalArgumentException: Cannot output with timestamp 2023-12-18T20:26:27.376Z. Output timestamps must be no earlier than the timestamp of the current input or timer (2023-12-18T20:26:31.252Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
To mitigate this, we should allow a large timestamp skew.