Skip to content

Commit

Permalink
default to 3 min for throttlingReportTargetMs
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Jul 24, 2024
1 parent 83bbe21 commit 8e77ef0
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -1147,9 +1148,13 @@ public Write withThrottlingTargetMs(int throttlingTargetMs) {
}

/**
* Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When latency
* exceeded the set value, the amount greater than the target will be considered as throttling
* time and report back to runner.
* Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write
* request latency exceeded the set value, the amount greater than the target will be considered
* as throttling time and report back to runner.
*
* <p>If not set, defaults to 3 min for completed batch request. Client side flowing control
* configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust
* the default value accordingly. Set to 0 to disable throttling time reporting.
*/
public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) {
BigtableWriteOptions options = getBigtableWriteOptions();
Expand Down Expand Up @@ -1324,6 +1329,11 @@ private static class BigtableWriterFn
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;

private final Counter throttlingMsecs =
Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME);

private final int throttleReportThresMsecs;

private transient Set<KV<BigtableWriteException, BoundedWindow>> badRecords = null;
// Due to callback thread not supporting Beam metrics, Record pending metrics and report later.
private transient long pendingThrottlingMsecs;
Expand All @@ -1344,6 +1354,8 @@ private static class BigtableWriterFn
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
// a request completed more than this time will be considered throttled. Disabled if set to 0
throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000);
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
}

Expand All @@ -1361,9 +1373,6 @@ public void startBundle(StartBundleContext c) throws IOException {
badRecords = new HashSet<>();
}

private final Counter throttlingMsecs =
Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME);

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
checkForFailures();
Expand Down Expand Up @@ -1407,11 +1416,9 @@ private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
}
} else {
// add the excessive amount to throttling metrics if elapsed time > target latency
if (writeOptions.getThrottlingReportTargetMs() != null
&& writeOptions.getThrottlingReportTargetMs() > 0) {
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(writeStart, Instant.now()).getMillis()
- writeOptions.getThrottlingReportTargetMs();
new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
pendingThrottlingMsecs = excessTime;
}
Expand Down Expand Up @@ -1466,11 +1473,9 @@ public void finishBundle(FinishBundleContext c) throws Exception {
}
}
// add the excessive amount to throttling metrics if elapsed time > target latency
if (writeOptions.getThrottlingReportTargetMs() != null
&& writeOptions.getThrottlingReportTargetMs() > 0) {
if (throttleReportThresMsecs > 0) {
long excessTime =
new Duration(closeStart, Instant.now()).getMillis()
- writeOptions.getThrottlingReportTargetMs();
new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs;
if (excessTime > 0) {
throttlingMsecs.inc(excessTime);
}
Expand Down

0 comments on commit 8e77ef0

Please sign in to comment.