Skip to content

Commit

Permalink
Add options to control number of Storage API connections when using m…
Browse files Browse the repository at this point in the history
…ultiplexing (#31721)

* add options to set min and max connections to connection management pool; rename counter to be more accurate

* add multiplexing description

* add to CHANGES.md

* clarify documentation and address comments

* adjust description

* add details
  • Loading branch information
ahmedabu98 authored Jul 12, 2024
1 parent 4561fd1 commit 4d429dd
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721))
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)])
Expand All @@ -82,6 +83,7 @@

## Bugfixes

* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
*/
@AutoValue
abstract class AppendClientInfo {
private final Counter activeConnections =
Metrics.counter(AppendClientInfo.class, "activeConnections");
private final Counter activeStreamAppendClients =
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");

abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

Expand Down Expand Up @@ -123,7 +123,7 @@ public AppendClientInfo withAppendClient(
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation);

activeConnections.inc();
activeStreamAppendClients.inc();

return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
Expand All @@ -133,7 +133,7 @@ public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
activeConnections.dec();
activeStreamAppendClients.dec();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ public interface BigQueryOptions

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the minimum number of connections each pool creates before any connections are shared. This is "
+ "on a per worker, per region basis. Note that in practice, the minimum number of connections created is the minimum "
+ "of this value and (numStorageWriteApiStreamAppendClients x num destinations). BigQuery will create this many "
+ "connections at first and will only create more connections if the current ones are \"overwhelmed\". Consider "
+ "increasing this value if you are running into performance issues.")
@Default.Integer(2)
Integer getMinConnectionPoolConnections();

void setMinConnectionPoolConnections(Integer value);

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. "
+ "If writing to many dynamic destinations (>20) and experiencing performance issues or seeing append operations competing"
+ "for streams, consider increasing this value.")
@Default.Integer(20)
Integer getMaxConnectionPoolConnections();

void setMaxConnectionPoolConnections(Integer value);

@Description("The max number of messages inflight that we expect each connection will retain.")
@Default.Long(1000)
Long getStorageWriteMaxInflightRequests();
Expand All @@ -122,6 +144,11 @@ public interface BigQueryOptions

void setStorageWriteMaxInflightBytes(Long value);

@Description(
"Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning "
+ "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. "
+ "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management")
@Default.Boolean(false)
Boolean getUseStorageApiConnectionPool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
Expand Down Expand Up @@ -1423,6 +1424,14 @@ public StreamAppendClient getStreamAppendClient(
bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(),
bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId());

ConnectionWorkerPool.setOptions(
ConnectionWorkerPool.Settings.builder()
.setMinConnectionsPerRegion(
options.as(BigQueryOptions.class).getMinConnectionPoolConnections())
.setMaxConnectionsPerRegion(
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
.build());

StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(
Expand Down

0 comments on commit 4d429dd

Please sign in to comment.