Skip to content

Commit

Permalink
Plumbing remaining autoscaling metrics (#30070)
Browse files Browse the repository at this point in the history
OutstandingBytes and MaximumOutstandingBytes
  • Loading branch information
edman124 authored Jan 23, 2024
1 parent 5e7edc4 commit 0a813b9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20230812-$google_clients_version",
// Keep version consistent with the version in google_cloud_resourcemanager, managed by google_cloud_platform_libraries_bom
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20230806-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20231203-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240113-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240110-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
// Keep version consistent with the version in google_cloud_nio, managed by google_cloud_platform_libraries_bom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1782,8 +1782,10 @@ private void sendWorkerMessage() throws IOException {
new StreamingScalingReport()
.setActiveThreadCount(workUnitExecutor.activeCount())
.setActiveBundleCount(workUnitExecutor.elementsOutstanding())
.setOutstandingBytes(workUnitExecutor.bytesOutstanding())
.setMaximumThreadCount(chooseMaximumNumberOfThreads())
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding());
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding())
.setMaximumBytes(workUnitExecutor.maximumBytesOutstanding());
workUnitClient.reportWorkerMessage(
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,13 @@ public void testReportWorkerMessage() throws Exception {
response.setContent(workerMessage.toPrettyString());
when(request.execute()).thenReturn(response);
StreamingScalingReport activeThreadsReport =
new StreamingScalingReport().setActiveThreadCount(1);
new StreamingScalingReport()
.setActiveThreadCount(1)
.setActiveBundleCount(2)
.setOutstandingBytes(3L)
.setMaximumThreadCount(4)
.setMaximumBundleCount(5)
.setMaximumBytes(6L);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
WorkerMessage msg = client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
client.reportWorkerMessage(msg);
Expand Down

0 comments on commit 0a813b9

Please sign in to comment.