-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow large timestamp skew for at-least-once streaming #29858
Merged
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
57516c1
large skew
ahmedabu98 419b6be
Merge branch 'master' of https://github.com/ahmedabu98/beam into skew…
ahmedabu98 d9d42fc
test
ahmedabu98 22e97d1
consistent skew; use AppendSerializationError (bq client fixed the typo)
ahmedabu98 d3a70c0
use AppendSerializationError everywhere
ahmedabu98 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,8 @@ | |
import static org.junit.Assert.fail; | ||
import static org.junit.Assume.assumeTrue; | ||
|
||
import com.google.api.core.ApiFuture; | ||
import com.google.api.core.SettableApiFuture; | ||
import com.google.api.services.bigquery.model.Clustering; | ||
import com.google.api.services.bigquery.model.ErrorProto; | ||
import com.google.api.services.bigquery.model.Job; | ||
|
@@ -48,7 +50,12 @@ | |
import com.google.api.services.bigquery.model.TableSchema; | ||
import com.google.api.services.bigquery.model.TimePartitioning; | ||
import com.google.auto.value.AutoValue; | ||
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; | ||
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; | ||
import com.google.cloud.bigquery.storage.v1.Exceptions; | ||
import com.google.cloud.bigquery.storage.v1.ProtoRows; | ||
import com.google.protobuf.ByteString; | ||
import com.google.protobuf.DescriptorProtos; | ||
import java.io.File; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
@@ -124,14 +131,18 @@ | |
import org.apache.beam.sdk.transforms.MapElements; | ||
import org.apache.beam.sdk.transforms.PTransform; | ||
import org.apache.beam.sdk.transforms.ParDo; | ||
import org.apache.beam.sdk.transforms.PeriodicImpulse; | ||
import org.apache.beam.sdk.transforms.SerializableFunction; | ||
import org.apache.beam.sdk.transforms.SerializableFunctions; | ||
import org.apache.beam.sdk.transforms.SimpleFunction; | ||
import org.apache.beam.sdk.transforms.Sum; | ||
import org.apache.beam.sdk.transforms.View; | ||
import org.apache.beam.sdk.transforms.WithKeys; | ||
import org.apache.beam.sdk.transforms.display.DisplayData; | ||
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | ||
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | ||
import org.apache.beam.sdk.transforms.windowing.GlobalWindow; | ||
import org.apache.beam.sdk.transforms.windowing.GlobalWindows; | ||
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; | ||
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; | ||
import org.apache.beam.sdk.transforms.windowing.Window; | ||
|
@@ -3104,6 +3115,104 @@ public void testStorageWriteWithMultipleAppendsPerStream() throws Exception { | |
containsInAnyOrder(Iterables.toArray(rows, TableRow.class))); | ||
} | ||
|
||
public static class ThrowingFakeDatasetServices extends FakeDatasetService { | ||
@Override | ||
public BigQueryServices.StreamAppendClient getStreamAppendClient( | ||
String streamName, | ||
DescriptorProtos.DescriptorProto descriptor, | ||
boolean useConnectionPool, | ||
AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) { | ||
return new BigQueryServices.StreamAppendClient() { | ||
@Override | ||
public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the call made to send rows to BQ. It returns an ApiFuture that may contain an exception. Here we force it to always return the exception and expect the connector to handle this properly |
||
Map<Integer, String> errorMap = new HashMap<>(); | ||
for (int i = 0; i < rows.getSerializedRowsCount(); i++) { | ||
errorMap.put(i, "some serialization error"); | ||
} | ||
SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create(); | ||
appendResult.setException( | ||
new Exceptions.AppendSerializationError( | ||
404, "some description", "some stream", errorMap)); | ||
return appendResult; | ||
} | ||
|
||
@Override | ||
public com.google.cloud.bigquery.storage.v1.@Nullable TableSchema getUpdatedSchema() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public void pin() {} | ||
|
||
@Override | ||
public void unpin() {} | ||
|
||
@Override | ||
public void close() {} | ||
}; | ||
} | ||
} | ||
|
||
@Test | ||
public void testStorageWriteReturnsAppendSerializationError() throws Exception { | ||
assumeTrue(useStorageApi); | ||
assumeTrue(useStreaming); | ||
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(5); | ||
|
||
TableSchema schema = | ||
new TableSchema() | ||
.setFields(Arrays.asList(new TableFieldSchema().setType("INTEGER").setName("long"))); | ||
Table fakeTable = new Table(); | ||
TableReference ref = | ||
new TableReference() | ||
.setProjectId("project-id") | ||
.setDatasetId("dataset-id") | ||
.setTableId("table-id"); | ||
fakeTable.setSchema(schema); | ||
fakeTable.setTableReference(ref); | ||
|
||
ThrowingFakeDatasetServices throwingService = new ThrowingFakeDatasetServices(); | ||
throwingService.createTable(fakeTable); | ||
|
||
int numRows = 100; | ||
|
||
WriteResult res = | ||
p.apply( | ||
PeriodicImpulse.create() | ||
.startAt(Instant.ofEpochMilli(0)) | ||
.stopAfter(Duration.millis(numRows - 1)) | ||
.withInterval(Duration.millis(1))) | ||
.apply( | ||
"Convert to longs", | ||
MapElements.into(TypeDescriptor.of(TableRow.class)) | ||
.via(instant -> new TableRow().set("long", instant.getMillis()))) | ||
.apply( | ||
BigQueryIO.writeTableRows() | ||
.to(ref) | ||
.withSchema(schema) | ||
.withTestServices( | ||
new FakeBigQueryServices() | ||
.withDatasetService(throwingService) | ||
.withJobService(fakeJobService))); | ||
|
||
PCollection<Integer> numErrors = | ||
res.getFailedStorageApiInserts() | ||
.apply( | ||
"Count errors", | ||
MapElements.into(TypeDescriptors.integers()) | ||
.via(err -> err.getErrorMessage().equals("some serialization error") ? 1 : 0)) | ||
.apply( | ||
Window.<Integer>into(new GlobalWindows()) | ||
.triggering(AfterWatermark.pastEndOfWindow()) | ||
.discardingFiredPanes() | ||
.withAllowedLateness(Duration.ZERO)) | ||
.apply(Sum.integersGlobally()); | ||
|
||
PAssert.that(numErrors).containsInAnyOrder(numRows); | ||
|
||
p.run().waitUntilFinish(); | ||
} | ||
|
||
@Test | ||
public void testWriteProtos() throws Exception { | ||
BigQueryIO.Write.Method method = | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageApiWritesShardedRecords has
I think the semantics are the same, just to confirm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup that value is equal to Long.MAX_VALUE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep the consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done