diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 19e0d089947a..d78ae2cb6c57 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -1147,9 +1148,13 @@ public Write withThrottlingTargetMs(int throttlingTargetMs) { } /** - * Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When latency - * exceeded the set value, the amount greater than the target will be considered as throttling - * time and report back to runner. + * Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write + * request latency exceeded the set value, the amount greater than the target will be considered + * as throttling time and report back to runner. + * + *

If not set, defaults to 3 min for completed batch request. Client side flowing control + * configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust + * the default value accordingly. Set to 0 to disable throttling time reporting. */ public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { BigtableWriteOptions options = getBigtableWriteOptions(); @@ -1324,6 +1329,11 @@ private static class BigtableWriterFn private final Coder>> inputCoder; private final BadRecordRouter badRecordRouter; + private final Counter throttlingMsecs = + Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME); + + private final int throttleReportThresMsecs; + private transient Set> badRecords = null; // Due to callback thread not supporting Beam metrics, Record pending metrics and report later. private transient long pendingThrottlingMsecs; @@ -1344,6 +1354,8 @@ private static class BigtableWriterFn this.badRecordRouter = badRecordRouter; this.failures = new ConcurrentLinkedQueue<>(); this.id = factory.newId(); + // a request completed more than this time will be considered throttled. Disabled if set to 0 + throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000); LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions); } @@ -1361,9 +1373,6 @@ public void startBundle(StartBundleContext c) throws IOException { badRecords = new HashSet<>(); } - private final Counter throttlingMsecs = - Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME); - @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) throws Exception { checkForFailures(); @@ -1407,11 +1416,9 @@ private BiConsumer handleMutationException( } } else { // add the excessive amount to throttling metrics if elapsed time > target latency - if (writeOptions.getThrottlingReportTargetMs() != null - && writeOptions.getThrottlingReportTargetMs() > 0) { + if (throttleReportThresMsecs > 0) { long excessTime = - new Duration(writeStart, Instant.now()).getMillis() - - writeOptions.getThrottlingReportTargetMs(); + new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs; if (excessTime > 0) { pendingThrottlingMsecs = excessTime; } @@ -1466,11 +1473,9 @@ public void finishBundle(FinishBundleContext c) throws Exception { } } // add the excessive amount to throttling metrics if elapsed time > target latency - if (writeOptions.getThrottlingReportTargetMs() != null - && writeOptions.getThrottlingReportTargetMs() > 0) { + if (throttleReportThresMsecs > 0) { long excessTime = - new Duration(closeStart, Instant.now()).getMillis() - - writeOptions.getThrottlingReportTargetMs(); + new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs; if (excessTime > 0) { throttlingMsecs.inc(excessTime); }