Skip to content

Commit

Permalink
Allow large timestamp skew for at-least-once streaming (#29858)
Browse files Browse the repository at this point in the history
* large skew

* test

* use AppendSerializationError everywhere
  • Loading branch information
ahmedabu98 authored Dec 27, 2023
1 parent 32660e6 commit 989a219
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,10 +670,10 @@ long flush(
BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError());

if (failedContext.getError() != null
&& failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
Exceptions.AppendSerializtionError error =
&& failedContext.getError() instanceof Exceptions.AppendSerializationError) {
Exceptions.AppendSerializationError error =
Preconditions.checkStateNotNull(
(Exceptions.AppendSerializtionError) failedContext.getError());
(Exceptions.AppendSerializationError) failedContext.getError());

Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
for (int failedIndex : failedRowIndices) {
Expand Down Expand Up @@ -1164,5 +1164,10 @@ public void teardown() {
throw new RuntimeException(e);
}
}

@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,10 @@ public void process(
// failedInserts
// PCollection, and retry with the remaining rows.
if (failedContext.getError() != null
&& failedContext.getError() instanceof Exceptions.AppendSerializtionError) {
Exceptions.AppendSerializtionError error =
&& failedContext.getError() instanceof Exceptions.AppendSerializationError) {
Exceptions.AppendSerializationError error =
Preconditions.checkArgumentNotNull(
(Exceptions.AppendSerializtionError) failedContext.getError());
(Exceptions.AppendSerializationError) failedContext.getError());

Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet();
for (int failedIndex : failedRowIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
}
if (!rowIndexToErrorMessage.isEmpty()) {
return ApiFutures.immediateFailedFuture(
new Exceptions.AppendSerializtionError(
new Exceptions.AppendSerializationError(
Code.INVALID_ARGUMENT.getNumber(),
"Append serialization failed for writer: " + streamName,
stream.streamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
Expand All @@ -48,7 +50,12 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -124,14 +131,18 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand Down Expand Up @@ -3104,6 +3115,104 @@ public void testStorageWriteWithMultipleAppendsPerStream() throws Exception {
containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
}

public static class ThrowingFakeDatasetServices extends FakeDatasetService {
@Override
public BigQueryServices.StreamAppendClient getStreamAppendClient(
String streamName,
DescriptorProtos.DescriptorProto descriptor,
boolean useConnectionPool,
AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) {
return new BigQueryServices.StreamAppendClient() {
@Override
public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) {
Map<Integer, String> errorMap = new HashMap<>();
for (int i = 0; i < rows.getSerializedRowsCount(); i++) {
errorMap.put(i, "some serialization error");
}
SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
appendResult.setException(
new Exceptions.AppendSerializationError(
404, "some description", "some stream", errorMap));
return appendResult;
}

@Override
public com.google.cloud.bigquery.storage.v1.@Nullable TableSchema getUpdatedSchema() {
return null;
}

@Override
public void pin() {}

@Override
public void unpin() {}

@Override
public void close() {}
};
}
}

@Test
public void testStorageWriteReturnsAppendSerializationError() throws Exception {
assumeTrue(useStorageApi);
assumeTrue(useStreaming);
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(5);

TableSchema schema =
new TableSchema()
.setFields(Arrays.asList(new TableFieldSchema().setType("INTEGER").setName("long")));
Table fakeTable = new Table();
TableReference ref =
new TableReference()
.setProjectId("project-id")
.setDatasetId("dataset-id")
.setTableId("table-id");
fakeTable.setSchema(schema);
fakeTable.setTableReference(ref);

ThrowingFakeDatasetServices throwingService = new ThrowingFakeDatasetServices();
throwingService.createTable(fakeTable);

int numRows = 100;

WriteResult res =
p.apply(
PeriodicImpulse.create()
.startAt(Instant.ofEpochMilli(0))
.stopAfter(Duration.millis(numRows - 1))
.withInterval(Duration.millis(1)))
.apply(
"Convert to longs",
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(instant -> new TableRow().set("long", instant.getMillis())))
.apply(
BigQueryIO.writeTableRows()
.to(ref)
.withSchema(schema)
.withTestServices(
new FakeBigQueryServices()
.withDatasetService(throwingService)
.withJobService(fakeJobService)));

PCollection<Integer> numErrors =
res.getFailedStorageApiInserts()
.apply(
"Count errors",
MapElements.into(TypeDescriptors.integers())
.via(err -> err.getErrorMessage().equals("some serialization error") ? 1 : 0))
.apply(
Window.<Integer>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(Sum.integersGlobally());

PAssert.that(numErrors).containsInAnyOrder(numRows);

p.run().waitUntilFinish();
}

@Test
public void testWriteProtos() throws Exception {
BigQueryIO.Write.Method method =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testThrowableToGRPCCodeString() throws Exception {

int notFoundVal = Status.Code.NOT_FOUND.value();
Throwable grpcError =
new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null);
assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(grpcError), equalTo("NOT_FOUND"));
}

Expand Down Expand Up @@ -220,7 +220,7 @@ public void testReportFailedRPCMetrics_KnownGrpcError() throws Exception {
c.setOperationEndTime(t1.plusMillis(5));
int notFoundVal = Status.Code.NOT_FOUND.value();
Throwable grpcError =
new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null);
new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null);
c.setError(grpcError);

// Test disabled SupportMetricsDeletion
Expand Down

0 comments on commit 989a219

Please sign in to comment.