Skip to content

Commit

Permalink
Rollback Bigtable throttling counter (#32442)
Browse files Browse the repository at this point in the history
* Revert BigtableIO change in #31924

* Exclude fixes not related to throttling counter change

* issue warning for removed configs
  • Loading branch information
Abacn authored Sep 20, 2024
1 parent 678104c commit 7474e6a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.ResourceExhaustedException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
Expand All @@ -41,7 +38,6 @@
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -78,8 +74,6 @@
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -1121,52 +1115,27 @@ public Write withMaxOutstandingBytes(long bytes) {
* always enabled on batch writes and limits the number of outstanding requests to the Bigtable
* server.
*
* <p>When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute.
* This enables runner react with increased latency in flush call due to flow control.
*
* <p>Does not modify this object.
*/
public Write withFlowControl(boolean enableFlowControl) {
BigtableWriteOptions options = getBigtableWriteOptions();
BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl);
if (enableFlowControl) {
builder = builder.setThrottlingReportTargetMs(60_000);
}
return toBuilder().setBigtableWriteOptions(builder.build()).build();
return toBuilder()
.setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled.
*
* <p>Will also set {@link #withThrottlingReportTargetMs} to the same value.
*/
/** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */
@Deprecated
public Write withThrottlingTargetMs(int throttlingTargetMs) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(
options
.toBuilder()
.setThrottlingTargetMs(throttlingTargetMs)
.setThrottlingReportTargetMs(throttlingTargetMs)
.build())
.build();
LOG.warn("withThrottlingTargetMs has been removed and does not have effect.");
return this;
}

/**
* Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write
* request latency exceeded the set value, the amount greater than the target will be considered
* as throttling time and report back to runner.
*
* <p>If not set, defaults to 3 min for completed batch request. Client side flowing control
* configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust
* the default value accordingly. Set to 0 to disable throttling time reporting.
*/
/** @deprecated This method has been deprecated in Beam 2.60.0. It does not have an effect. */
@Deprecated
public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
BigtableWriteOptions options = getBigtableWriteOptions();
return toBuilder()
.setBigtableWriteOptions(
options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build())
.build();
LOG.warn("withThrottlingReportTargetMs has been removed and does not have an effect.");
return this;
}

public Write withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
Expand Down Expand Up @@ -1333,16 +1302,8 @@ private static class BigtableWriterFn
private final BigtableServiceFactory.ConfigId id;
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;

private final Counter throttlingMsecs =
Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME);

private final int throttleReportThresMsecs;

private transient ConcurrentLinkedQueue<KV<BigtableWriteException, BoundedWindow>> badRecords =
null;
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
private transient long pendingThrottlingMsecs;
private transient boolean reportedLineage;

// Assign serviceEntry in startBundle and clear it in tearDown.
Expand All @@ -1363,8 +1324,6 @@ private static class BigtableWriterFn
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
// a request completed more than this time will be considered throttled. Disabled if set to 0
throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
}

Expand Down Expand Up @@ -1393,18 +1352,13 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
drainCompletedElementFutures();
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
Instant writeStart = Instant.now();
pendingThrottlingMsecs = 0;
CompletableFuture<Void> f =
bigtableWriter
.writeRecord(record)
// transform the next CompletionStage to have its own status
// this allows us to capture any unexpected errors in the handler
.handle(handleMutationException(record, window, writeStart));
.handle(handleMutationException(record, window));
outstandingWrites.add(f);
if (pendingThrottlingMsecs > 0) {
throttlingMsecs.inc(pendingThrottlingMsecs);
}
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}
Expand All @@ -1420,39 +1374,14 @@ private void drainCompletedElementFutures() throws ExecutionException, Interrupt
}

private BiFunction<MutateRowResponse, Throwable, Void> handleMutationException(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, Instant writeStart) {
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
// Exception due to resource unavailable or rate limited,
// including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED.
boolean isResourceException = false;
if (exception instanceof StatusRuntimeException) {
StatusRuntimeException se = (StatusRuntimeException) exception;
if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus())
|| io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) {
isResourceException = true;
}
} else if (exception instanceof DeadlineExceededException
|| exception instanceof ResourceExhaustedException) {
isResourceException = true;
}
if (isResourceException) {
pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis();
}
failures.add(new BigtableWriteException(record, exception));
}
} else {
// add the excessive amount to throttling metrics if elapsed time > target latency
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
pendingThrottlingMsecs = excessTime;
}
}
}
return null;
};
Expand Down Expand Up @@ -1489,7 +1418,6 @@ private static boolean isDataException(Throwable e) {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
if (bigtableWriter != null) {
Instant closeStart = Instant.now();
try {
bigtableWriter.close();
} catch (IOException e) {
Expand All @@ -1498,7 +1426,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
// to the error queue. Bigtable will successfully write other failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis());
throw e;
}
}
Expand All @@ -1514,14 +1441,6 @@ public void finishBundle(FinishBundleContext c) throws Exception {
e);
}

// add the excessive amount to throttling metrics if elapsed time > target latency
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
throttlingMsecs.inc(excessTime);
}
}
if (!reportedLineage) {
bigtableWriter.reportLineage();
reportedLineage = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ abstract class BigtableWriteOptions implements Serializable {
/** Returns the target latency if latency based throttling is enabled. */
abstract @Nullable Integer getThrottlingTargetMs();

/** Returns the target latency if latency based throttling report to runner is enabled. */
abstract @Nullable Integer getThrottlingReportTargetMs();

/** Returns true if batch write flow control is enabled. Otherwise return false. */
abstract @Nullable Boolean getFlowControl();

Expand Down Expand Up @@ -91,8 +88,6 @@ abstract static class Builder {

abstract Builder setThrottlingTargetMs(int targetMs);

abstract Builder setThrottlingReportTargetMs(int targetMs);

abstract Builder setFlowControl(boolean enableFlowControl);

abstract Builder setCloseWaitTimeout(Duration timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,21 +436,6 @@ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() {
write.expand(null);
}

@Test
public void testWriteClientRateLimitingAlsoSetReportMsecs() {
// client side flow control
BigtableIO.Write write = BigtableIO.write().withTableId("table").withFlowControl(true);
assertEquals(
60_000, (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));

// client side latency based throttling
int targetMs = 30_000;
write = BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs);
assertEquals(
targetMs,
(int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs()));
}

/** Helper function to make a single row mutation to be written. */
private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
ByteString rowKey = ByteString.copyFromUtf8(key);
Expand Down

0 comments on commit 7474e6a

Please sign in to comment.