Skip to content

Commit

Permalink
Merge pull request #30193: Remove sdks/java/fn-execution
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles authored Feb 2, 2024
2 parents 30a778b + 00f0786 commit 8ec51f1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 80 deletions.
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:extensions:sorter:build")
dependsOn(":sdks:java:extensions:timeseries:build")
dependsOn(":sdks:java:extensions:zetasketch:build")
dependsOn(":sdks:java:fn-execution:build")
dependsOn(":sdks:java:harness:build")
dependsOn(":sdks:java:harness:jmh:build")
dependsOn(":sdks:java:io:bigquery-io-perf-tests:build")
Expand Down
2 changes: 0 additions & 2 deletions runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def dependOnProjects = [":runners:core-construction-java",
":runners:core-java",
":runners:local-java",
":runners:java-fn-execution",
":sdks:java:fn-execution",
":sdks:java:extensions:avro"
]

Expand Down Expand Up @@ -95,7 +94,6 @@ dependencies {
validatesRunner project(path: project.path, configuration: "shadowTest")
permitUnusedDeclared library.java.vendored_grpc_1_60_1
permitUnusedDeclared project(":runners:java-fn-execution")
permitUnusedDeclared project(":sdks:java:fn-execution")
permitUnusedDeclared project(":sdks:java:extensions:avro")
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
package org.apache.beam.runners.dataflow.worker;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.when;

import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.json.Json;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
Expand Down Expand Up @@ -53,16 +49,13 @@
import org.apache.beam.sdk.util.FastNanoClockAndSleeper;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -75,36 +68,34 @@ public class DataflowWorkUnitClientTest {
private static final String PROJECT_ID = "TEST_PROJECT_ID";
private static final String JOB_ID = "TEST_JOB_ID";
private static final String WORKER_ID = "TEST_WORKER_ID";

@Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
@Rule public TestRule restoreLogging = new RestoreDataflowLoggingMDC();
@Rule public ExpectedException expectedException = ExpectedException.none();
@Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper();
@Mock private MockHttpTransport transport;
@Mock private MockLowLevelHttpRequest request;
private DataflowWorkerHarnessOptions pipelineOptions;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
doCallRealMethod().when(request).getContentAsString();

DataflowWorkerHarnessOptions createPipelineOptionsWithTransport(MockHttpTransport transport) {
Dataflow service = new Dataflow(transport, Transport.getJsonFactory(), null);
pipelineOptions = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
DataflowWorkerHarnessOptions pipelineOptions =
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
pipelineOptions.setProject(PROJECT_ID);
pipelineOptions.setJobId(JOB_ID);
pipelineOptions.setWorkerId(WORKER_ID);
pipelineOptions.setGcpCredential(new TestCredential());
pipelineOptions.setDataflowClient(service);
pipelineOptions.setRegion("us-central1");
return pipelineOptions;
}

@Test
public void testCloudServiceCall() throws Exception {
WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID);

when(request.execute()).thenReturn(generateMockResponse(workItem));

MockLowLevelHttpResponse response = generateMockResponse(workItem);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.of(workItem), client.getWorkItem());
Expand All @@ -124,30 +115,40 @@ public void testCloudServiceCall() throws Exception {

@Test
public void testCloudServiceCallMapTaskStagePropagation() throws Exception {
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

// Publish and acquire a map task work item, and verify we're now processing that stage.
final String stageName = "test_stage_name";
MapTask mapTask = new MapTask();
mapTask.setStageName(stageName);
WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID);
workItem.setMapTask(mapTask);
when(request.execute()).thenReturn(generateMockResponse(workItem));

MockLowLevelHttpResponse response = generateMockResponse(workItem);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.of(workItem), client.getWorkItem());
assertEquals(stageName, DataflowWorkerLoggingMDC.getStageName());
}

@Test
public void testCloudServiceCallSeqMapTaskStagePropagation() throws Exception {
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

// Publish and acquire a seq map task work item, and verify we're now processing that stage.
final String stageName = "test_stage_name";
SeqMapTask seqMapTask = new SeqMapTask();
seqMapTask.setStageName(stageName);
WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID);
workItem.setSeqMapTask(seqMapTask);
when(request.execute()).thenReturn(generateMockResponse(workItem));

MockLowLevelHttpResponse response = generateMockResponse(workItem);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.of(workItem), client.getWorkItem());
assertEquals(stageName, DataflowWorkerLoggingMDC.getStageName());
}
Expand All @@ -157,8 +158,11 @@ public void testCloudServiceCallNoWorkPresent() throws Exception {
// If there's no work the service should return an empty work item.
WorkItem workItem = new WorkItem();

when(request.execute()).thenReturn(generateMockResponse(workItem));

MockLowLevelHttpResponse response = generateMockResponse(workItem);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.empty(), client.getWorkItem());
Expand All @@ -181,8 +185,11 @@ public void testCloudServiceCallNoWorkId() throws Exception {
WorkItem workItem = createWorkItem(PROJECT_ID, JOB_ID);
workItem.setId(null);

when(request.execute()).thenReturn(generateMockResponse(workItem));

MockLowLevelHttpResponse response = generateMockResponse(workItem);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.empty(), client.getWorkItem());
Expand All @@ -201,8 +208,11 @@ public void testCloudServiceCallNoWorkId() throws Exception {

@Test
public void testCloudServiceCallNoWorkItem() throws Exception {
when(request.execute()).thenReturn(generateMockResponse());

MockLowLevelHttpResponse response = generateMockResponse();
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

assertEquals(Optional.empty(), client.getWorkItem());
Expand All @@ -228,8 +238,11 @@ public void testCloudServiceCallMultipleWorkItems() throws Exception {
WorkItem workItem1 = createWorkItem(PROJECT_ID, JOB_ID);
WorkItem workItem2 = createWorkItem(PROJECT_ID, JOB_ID);

when(request.execute()).thenReturn(generateMockResponse(workItem1, workItem2));

MockLowLevelHttpResponse response = generateMockResponse(workItem1, workItem2);
MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

client.getWorkItem();
Expand All @@ -242,7 +255,13 @@ public void testReportWorkerMessage_streamingScalingReport() throws Exception {
SendWorkerMessagesResponse workerMessage = new SendWorkerMessagesResponse();
workerMessage.setFactory(Transport.getJsonFactory());
response.setContent(workerMessage.toPrettyString());
when(request.execute()).thenReturn(response);

MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

StreamingScalingReport activeThreadsReport =
new StreamingScalingReport()
.setActiveThreadCount(1)
Expand All @@ -251,7 +270,6 @@ public void testReportWorkerMessage_streamingScalingReport() throws Exception {
.setMaximumThreadCount(4)
.setMaximumBundleCount(5)
.setMaximumBytes(6L);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
WorkerMessage msg = client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
client.reportWorkerMessage(Collections.singletonList(msg));

Expand All @@ -268,7 +286,13 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception {
SendWorkerMessagesResponse workerMessage = new SendWorkerMessagesResponse();
workerMessage.setFactory(Transport.getJsonFactory());
response.setContent(workerMessage.toPrettyString());
when(request.execute()).thenReturn(response);

MockLowLevelHttpRequest request = new MockLowLevelHttpRequest().setResponse(response);
MockHttpTransport transport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
DataflowWorkerHarnessOptions pipelineOptions = createPipelineOptionsWithTransport(transport);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);

PerStepNamespaceMetrics stepNamespaceMetrics =
new PerStepNamespaceMetrics()
.setOriginalStep("s1")
Expand All @@ -279,7 +303,6 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception {
new PerWorkerMetrics()
.setPerStepNamespaceMetrics(Collections.singletonList(stepNamespaceMetrics));

WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
WorkerMessage perWorkerMetricsMsg =
client.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics);
client.reportWorkerMessage(Collections.singletonList(perWorkerMetricsMsg));
Expand All @@ -290,7 +313,7 @@ public void testReportWorkerMessage_perWorkerMetrics() throws Exception {
assertEquals(ImmutableList.of(perWorkerMetricsMsg), actualRequest.getWorkerMessages());
}

private LowLevelHttpResponse generateMockResponse(WorkItem... workItems) throws Exception {
private MockLowLevelHttpResponse generateMockResponse(WorkItem... workItems) throws Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);
LeaseWorkItemResponse lease = new LeaseWorkItemResponse();
Expand Down
2 changes: 0 additions & 2 deletions sdks/java/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":runners:java-fn-execution")
implementation project(path: ":sdks:java:fn-execution")
implementation project(path: ":sdks:java:harness")
permitUnusedDeclared project(path: ":model:fn-execution")
permitUnusedDeclared project(path: ":sdks:java:fn-execution")
implementation library.java.jackson_annotations
implementation library.java.jackson_databind
implementation library.java.jackson_dataformat_yaml
Expand Down
34 changes: 0 additions & 34 deletions sdks/java/fn-execution/build.gradle

This file was deleted.

3 changes: 1 addition & 2 deletions sdks/java/harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ dependencies {
permitUnusedDeclared project(path: ":sdks:java:transform-service:launcher")
testImplementation library.java.junit
testImplementation library.java.mockito_core
shadowTestRuntimeClasspath project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:fn-execution", configuration: "testRuntimeMigration")
shadowTest project(path: ":sdks:java:core", configuration: "shadowTest")
shadowTestRuntimeClasspath library.java.slf4j_jdk14
permitUnusedDeclared library.java.avro
}
1 change: 0 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ include(":sdks:java:extensions:sql:udf")
include(":sdks:java:extensions:sql:udf-test-provider")
include(":sdks:java:extensions:timeseries")
include(":sdks:java:extensions:zetasketch")
include(":sdks:java:fn-execution")
include(":sdks:java:harness")
include(":sdks:java:harness:jmh")
include(":sdks:java:io:amazon-web-services")
Expand Down

0 comments on commit 8ec51f1

Please sign in to comment.