diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 71b2b77fd847c..389d2e43c74e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -21,16 +21,13 @@
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
-import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
-import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
@@ -41,7 +38,6 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
-import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -78,8 +74,6 @@
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -1121,52 +1115,27 @@ public Write withMaxOutstandingBytes(long bytes) {
* always enabled on batch writes and limits the number of outstanding requests to the Bigtable
* server.
*
- *
When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute.
- * This enables runner react with increased latency in flush call due to flow control.
- *
*
Does not modify this object.
*/
public Write withFlowControl(boolean enableFlowControl) {
BigtableWriteOptions options = getBigtableWriteOptions();
- BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl);
- if (enableFlowControl) {
- builder = builder.setThrottlingReportTargetMs(60_000);
- }
- return toBuilder().setBigtableWriteOptions(builder.build()).build();
+ return toBuilder()
+ .setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build())
+ .build();
}
- /**
- * Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled.
- *
- *
Will also set {@link #withThrottlingReportTargetMs} to the same value.
- */
+ /** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */
+ @Deprecated
public Write withThrottlingTargetMs(int throttlingTargetMs) {
- BigtableWriteOptions options = getBigtableWriteOptions();
- return toBuilder()
- .setBigtableWriteOptions(
- options
- .toBuilder()
- .setThrottlingTargetMs(throttlingTargetMs)
- .setThrottlingReportTargetMs(throttlingTargetMs)
- .build())
- .build();
+ LOG.warn("withThrottlingTargetMs has been removed and does not have effect.");
+ return this;
}
- /**
- * Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write
- * request latency exceeded the set value, the amount greater than the target will be considered
- * as throttling time and report back to runner.
- *
- *
If not set, defaults to 3 min for completed batch request. Client side flowing control
- * configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust
- * the default value accordingly. Set to 0 to disable throttling time reporting.
- */
+ /** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */
+ @Deprecated
public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
- BigtableWriteOptions options = getBigtableWriteOptions();
- return toBuilder()
- .setBigtableWriteOptions(
- options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build())
- .build();
+ LOG.warn("withThrottlingReportTargetMs has been removed and does not have an effect.");
+ return this;
}
public Write withErrorHandler(ErrorHandler badRecordErrorHandler) {
@@ -1333,16 +1302,8 @@ private static class BigtableWriterFn
private final BigtableServiceFactory.ConfigId id;
private final Coder>> inputCoder;
private final BadRecordRouter badRecordRouter;
-
- private final Counter throttlingMsecs =
- Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME);
-
- private final int throttleReportThresMsecs;
-
private transient ConcurrentLinkedQueue> badRecords =
null;
- // Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
- private transient long pendingThrottlingMsecs;
private transient boolean reportedLineage;
// Assign serviceEntry in startBundle and clear it in tearDown.
@@ -1363,8 +1324,6 @@ private static class BigtableWriterFn
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
- // a request completed more than this time will be considered throttled. Disabled if set to 0
- throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
}
@@ -1393,18 +1352,13 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
drainCompletedElementFutures();
checkForFailures();
KV> record = c.element();
- Instant writeStart = Instant.now();
- pendingThrottlingMsecs = 0;
CompletableFuture f =
bigtableWriter
.writeRecord(record)
// transform the next CompletionStage to have its own status
// this allows us to capture any unexpected errors in the handler
- .handle(handleMutationException(record, window, writeStart));
+ .handle(handleMutationException(record, window));
outstandingWrites.add(f);
- if (pendingThrottlingMsecs > 0) {
- throttlingMsecs.inc(pendingThrottlingMsecs);
- }
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}
@@ -1420,39 +1374,14 @@ private void drainCompletedElementFutures() throws ExecutionException, Interrupt
}
private BiFunction handleMutationException(
- KV> record, BoundedWindow window, Instant writeStart) {
+ KV> record, BoundedWindow window) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
- // Exception due to resource unavailable or rate limited,
- // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
- boolean isResourceException = false;
- if (exception instanceof StatusRuntimeException) {
- StatusRuntimeException se = (StatusRuntimeException) exception;
- if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
- || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) {
- isResourceException = true;
- }
- } else if (exception instanceof DeadlineExceededException
- || exception instanceof ResourceExhaustedException) {
- isResourceException = true;
- }
- if (isResourceException) {
- pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis();
- }
failures.add(new BigtableWriteException(record, exception));
}
- } else {
- // add the excessive amount to throttling metrics if elapsed time > target latency
- if (throttleReportThresMsecs > 0) {
- long excessTime =
- new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
- if (excessTime > 0) {
- pendingThrottlingMsecs = excessTime;
- }
- }
}
return null;
};
@@ -1489,7 +1418,6 @@ private static boolean isDataException(Throwable e) {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
if (bigtableWriter != null) {
- Instant closeStart = Instant.now();
try {
bigtableWriter.close();
} catch (IOException e) {
@@ -1498,7 +1426,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
// to the error queue. Bigtable will successfully write other failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
- throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis());
throw e;
}
}
@@ -1514,14 +1441,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
e);
}
- // add the excessive amount to throttling metrics if elapsed time > target latency
- if (throttleReportThresMsecs > 0) {
- long excessTime =
- new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
- if (excessTime > 0) {
- throttlingMsecs.inc(excessTime);
- }
- }
if (!reportedLineage) {
bigtableWriter.reportLineage();
reportedLineage = true;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
index 5963eb6be3ce0..a63cc575809b4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java
@@ -57,9 +57,6 @@ abstract class BigtableWriteOptions implements Serializable {
/** Returns the target latency if latency based throttling is enabled. */
abstract @Nullable Integer getThrottlingTargetMs();
- /** Returns the target latency if latency based throttling report to runner is enabled. */
- abstract @Nullable Integer getThrottlingReportTargetMs();
-
/** Returns true if batch write flow control is enabled. Otherwise return false. */
abstract @Nullable Boolean getFlowControl();
@@ -91,8 +88,6 @@ abstract static class Builder {
abstract Builder setThrottlingTargetMs(int targetMs);
- abstract Builder setThrottlingReportTargetMs(int targetMs);
-
abstract Builder setFlowControl(boolean enableFlowControl);
abstract Builder setCloseWaitTimeout(Duration timeout);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 2fa62ebdd37ef..71c648730bd2d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -436,21 +436,6 @@ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() {
write.expand(null);
}
- @Test
- public void testWriteClientRateLimitingAlsoSetReportMsecs() {
- // client side flow control
- BigtableIO.Write write = BigtableIO.write().withTableId("table").withFlowControl(true);
- assertEquals(
- 60_000, (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
-
- // client side latency based throttling
- int targetMs = 30_000;
- write = BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs);
- assertEquals(
- targetMs,
- (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
- }
-
/** Helper function to make a single row mutation to be written. */
private static KV> makeWrite(String key, String value) {
ByteString rowKey = ByteString.copyFromUtf8(key);