From c224c0bb9b017b099adc2a087534a4a793eb8d94 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 12 Sep 2024 13:54:42 -0400 Subject: [PATCH 1/5] Revert BigtableIO change in #31924 --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 104 +----------------- .../io/gcp/bigtable/BigtableServiceImpl.java | 4 - .../io/gcp/bigtable/BigtableWriteOptions.java | 5 - .../sdk/io/gcp/bigtable/BigtableIOTest.java | 15 --- 4 files changed, 4 insertions(+), 124 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 71b2b77fd847..96cb2d713959 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -21,16 +21,13 @@ import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.gax.batching.BatchingException; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.NotFoundException; -import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; @@ -41,7 +38,6 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; -import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -78,8 +74,6 @@ import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -93,7 +87,6 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.StringUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -1121,51 +1114,12 @@ public Write withMaxOutstandingBytes(long bytes) { * always enabled on batch writes and limits the number of outstanding requests to the Bigtable * server. * - *

When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute. - * This enables runner react with increased latency in flush call due to flow control. - * *

Does not modify this object. */ public Write withFlowControl(boolean enableFlowControl) { - BigtableWriteOptions options = getBigtableWriteOptions(); - BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl); - if (enableFlowControl) { - builder = builder.setThrottlingReportTargetMs(60_000); - } - return toBuilder().setBigtableWriteOptions(builder.build()).build(); - } - - /** - * Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. - * - *

Will also set {@link #withThrottlingReportTargetMs} to the same value. - */ - public Write withThrottlingTargetMs(int throttlingTargetMs) { - BigtableWriteOptions options = getBigtableWriteOptions(); - return toBuilder() - .setBigtableWriteOptions( - options - .toBuilder() - .setThrottlingTargetMs(throttlingTargetMs) - .setThrottlingReportTargetMs(throttlingTargetMs) - .build()) - .build(); - } - - /** - * Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write - * request latency exceeded the set value, the amount greater than the target will be considered - * as throttling time and report back to runner. - * - *

If not set, defaults to 3 min for completed batch request. Client side flowing control - * configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust - * the default value accordingly. Set to 0 to disable throttling time reporting. - */ - public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { BigtableWriteOptions options = getBigtableWriteOptions(); return toBuilder() - .setBigtableWriteOptions( - options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build()) + .setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build()) .build(); } @@ -1333,16 +1287,8 @@ private static class BigtableWriterFn private final BigtableServiceFactory.ConfigId id; private final Coder>> inputCoder; private final BadRecordRouter badRecordRouter; - - private final Counter throttlingMsecs = - Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME); - - private final int throttleReportThresMsecs; - private transient ConcurrentLinkedQueue> badRecords = null; - // Due to callback thread not supporting Beam metrics, Record pending metrics and report later. - private transient long pendingThrottlingMsecs; private transient boolean reportedLineage; // Assign serviceEntry in startBundle and clear it in tearDown. @@ -1363,8 +1309,6 @@ private static class BigtableWriterFn this.badRecordRouter = badRecordRouter; this.failures = new ConcurrentLinkedQueue<>(); this.id = factory.newId(); - // a request completed more than this time will be considered throttled. Disabled if set to 0 - throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000); LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions); } @@ -1393,18 +1337,13 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except drainCompletedElementFutures(); checkForFailures(); KV> record = c.element(); - Instant writeStart = Instant.now(); - pendingThrottlingMsecs = 0; CompletableFuture f = bigtableWriter .writeRecord(record) // transform the next CompletionStage to have its own status // this allows us to capture any unexpected errors in the handler - .handle(handleMutationException(record, window, writeStart)); + .handle(handleMutationException(record, window)); outstandingWrites.add(f); - if (pendingThrottlingMsecs > 0) { - throttlingMsecs.inc(pendingThrottlingMsecs); - } ++recordsWritten; seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1); } @@ -1420,39 +1359,14 @@ private void drainCompletedElementFutures() throws ExecutionException, Interrupt } private BiFunction handleMutationException( - KV> record, BoundedWindow window, Instant writeStart) { + KV> record, BoundedWindow window) { return (MutateRowResponse result, Throwable exception) -> { if (exception != null) { if (isDataException(exception)) { retryIndividualRecord(record, window); } else { - // Exception due to resource unavailable or rate limited, - // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED. - boolean isResourceException = false; - if (exception instanceof StatusRuntimeException) { - StatusRuntimeException se = (StatusRuntimeException) exception; - if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus()) - || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) { - isResourceException = true; - } - } else if (exception instanceof DeadlineExceededException - || exception instanceof ResourceExhaustedException) { - isResourceException = true; - } - if (isResourceException) { - pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis(); - } failures.add(new BigtableWriteException(record, exception)); } - } else { - // add the excessive amount to throttling metrics if elapsed time > target latency - if (throttleReportThresMsecs > 0) { - long excessTime = - new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs; - if (excessTime > 0) { - pendingThrottlingMsecs = excessTime; - } - } } return null; }; @@ -1489,7 +1403,6 @@ private static boolean isDataException(Throwable e) { @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { if (bigtableWriter != null) { - Instant closeStart = Instant.now(); try { bigtableWriter.close(); } catch (IOException e) { @@ -1498,7 +1411,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { // to the error queue. Bigtable will successfully write other failures in the batch, // so this exception should be ignored if (!(e.getCause() instanceof BatchingException)) { - throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis()); throw e; } } @@ -1514,14 +1426,6 @@ public void finishBundle(FinishBundleContext c) throws Exception { e); } - // add the excessive amount to throttling metrics if elapsed time > target latency - if (throttleReportThresMsecs > 0) { - long excessTime = - new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs; - if (excessTime > 0) { - throttlingMsecs.inc(excessTime); - } - } if (!reportedLineage) { bigtableWriter.reportLineage(); reportedLineage = true; @@ -2160,7 +2064,7 @@ public BigtableWriteException(KV> record, Throwab super( String.format( "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), StringUtils.leftTruncate(record.getValue(), 100)), + record.getKey().toStringUtf8(), record.getValue()), cause); this.record = record; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 3451bbf450c7..ff8cf9ee6568 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -24,7 +24,6 @@ import com.google.api.gax.batching.BatchingException; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StreamController; @@ -638,9 +637,6 @@ public void onFailure(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { serviceCallMetric.call( ((StatusRuntimeException) throwable).getStatus().getCode().value()); - } else if (throwable instanceof DeadlineExceededException) { - // incoming throwable can be a StatusRuntimeException or a specific grpc ApiException - serviceCallMetric.call(504); } else { serviceCallMetric.call("unknown"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java index 5963eb6be3ce..a63cc575809b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java @@ -57,9 +57,6 @@ abstract class BigtableWriteOptions implements Serializable { /** Returns the target latency if latency based throttling is enabled. */ abstract @Nullable Integer getThrottlingTargetMs(); - /** Returns the target latency if latency based throttling report to runner is enabled. */ - abstract @Nullable Integer getThrottlingReportTargetMs(); - /** Returns true if batch write flow control is enabled. Otherwise return false. */ abstract @Nullable Boolean getFlowControl(); @@ -91,8 +88,6 @@ abstract static class Builder { abstract Builder setThrottlingTargetMs(int targetMs); - abstract Builder setThrottlingReportTargetMs(int targetMs); - abstract Builder setFlowControl(boolean enableFlowControl); abstract Builder setCloseWaitTimeout(Duration timeout); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 2fa62ebdd37e..71c648730bd2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -436,21 +436,6 @@ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() { write.expand(null); } - @Test - public void testWriteClientRateLimitingAlsoSetReportMsecs() { - // client side flow control - BigtableIO.Write write = BigtableIO.write().withTableId("table").withFlowControl(true); - assertEquals( - 60_000, (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs())); - - // client side latency based throttling - int targetMs = 30_000; - write = BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs); - assertEquals( - targetMs, - (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs())); - } - /** Helper function to make a single row mutation to be written. */ private static KV> makeWrite(String key, String value) { ByteString rowKey = ByteString.copyFromUtf8(key); From a08b0fd92dd4040a7fbc265879dcedc74bd605d8 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 12 Sep 2024 13:57:16 -0400 Subject: [PATCH 2/5] Exclude fixes not related to throttling counter change * DEADLINE_EXCEEDED is a common error code and was reported as unknown * BigtableWriteException originally printed the whole raw record, cloudlog get truncated and does not see stacktrace after it --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 3 ++- .../apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 96cb2d713959..b4204d804453 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -87,6 +87,7 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.StringUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -2064,7 +2065,7 @@ public BigtableWriteException(KV> record, Throwab super( String.format( "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), record.getValue()), + record.getKey().toStringUtf8(), StringUtils.leftTruncate(record.getValue(), 100)), cause); this.record = record; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index ff8cf9ee6568..3451bbf450c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -24,6 +24,7 @@ import com.google.api.gax.batching.BatchingException; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StreamController; @@ -637,6 +638,9 @@ public void onFailure(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { serviceCallMetric.call( ((StatusRuntimeException) throwable).getStatus().getCode().value()); + } else if (throwable instanceof DeadlineExceededException) { + // incoming throwable can be a StatusRuntimeException or a specific grpc ApiException + serviceCallMetric.call(504); } else { serviceCallMetric.call("unknown"); } From 42e28b7cf305c1564d6599ce9fd2e557c13b53cc Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 13 Sep 2024 11:38:00 -0400 Subject: [PATCH 3/5] add back public methods --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index b4204d804453..5e52c63b4526 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1124,6 +1124,20 @@ public Write withFlowControl(boolean enableFlowControl) { .build(); } + /** Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. */ + public Write withThrottlingTargetMs(int throttlingTargetMs) { + BigtableWriteOptions options = getBigtableWriteOptions(); + return toBuilder() + .setBigtableWriteOptions( + options.toBuilder().setThrottlingTargetMs(throttlingTargetMs).build()) + .build(); + } + + /** This configuration is removed in Beam 2.60.0, Do not use. */ + public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { + throw new UnsupportedOperationException("withThrottlingReportTargetMs is removed"); + } + public Write withErrorHandler(ErrorHandler badRecordErrorHandler) { return toBuilder() .setBadRecordErrorHandler(badRecordErrorHandler) From ba74be18bf42382e42dca57bf3e183fcb66bb67b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 18:34:09 -0400 Subject: [PATCH 4/5] issue warning for removed configs --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 5e52c63b4526..d821278a6fe5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1124,18 +1124,18 @@ public Write withFlowControl(boolean enableFlowControl) { .build(); } - /** Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. */ + /** This method has been deprecated in Beam 2.60.0. It does not have an effect. */ + @Deprecated public Write withThrottlingTargetMs(int throttlingTargetMs) { - BigtableWriteOptions options = getBigtableWriteOptions(); - return toBuilder() - .setBigtableWriteOptions( - options.toBuilder().setThrottlingTargetMs(throttlingTargetMs).build()) - .build(); + LOG.warn("withThrottlingTargetMs has been removed and does not have effect."); + return this; } - /** This configuration is removed in Beam 2.60.0, Do not use. */ + /** This method has been deprecated in Beam 2.60.0. It does not have an effect. */ + @Deprecated public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { - throw new UnsupportedOperationException("withThrottlingReportTargetMs is removed"); + LOG.warn("withThrottlingReportTargetMs has been removed and does not have an effect."); + return this; } public Write withErrorHandler(ErrorHandler badRecordErrorHandler) { From ffe16794554917b4353f884a6022ba9e55ac544b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 20 Sep 2024 17:58:26 -0400 Subject: [PATCH 5/5] Fix checkstyle --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index d821278a6fe5..389d2e43c74e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -1124,14 +1124,14 @@ public Write withFlowControl(boolean enableFlowControl) { .build(); } - /** This method has been deprecated in Beam 2.60.0. It does not have an effect. */ + /** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */ @Deprecated public Write withThrottlingTargetMs(int throttlingTargetMs) { LOG.warn("withThrottlingTargetMs has been removed and does not have effect."); return this; } - /** This method has been deprecated in Beam 2.60.0. It does not have an effect. */ + /** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */ @Deprecated public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { LOG.warn("withThrottlingReportTargetMs has been removed and does not have an effect.");