diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index a761d38de1ab..c2fad93516bb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -125,10 +125,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setWindmillMessagesBetweenIsReadyChecks(int value); @Description("If true, a most a single active rpc will be used per channel.") - @Default.Boolean(false) - boolean getUseWindmillIsolatedChannels(); + Boolean getUseWindmillIsolatedChannels(); - void setUseWindmillIsolatedChannels(boolean value); + void setUseWindmillIsolatedChannels(Boolean value); @Description( "If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0dedd4f34fd6..2d068ef093d3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.api.services.dataflow.model.CounterUpdate; @@ -63,7 +62,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; -import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool; @@ -79,10 +77,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; @@ -100,7 +95,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; -import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -430,7 +424,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o GrpcWindmillStreamFactory windmillStreamFactory; if (options.isEnableStreamingEngine()) { GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(createStubFactory(options)); + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); @@ -456,7 +450,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o GrpcWindmillServer.create( options, windmillStreamFactory, - GrpcDispatcherClient.create(createStubFactory(options))); + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options))); } else { windmillStreamFactory = windmillStreamFactoryBuilder.build(); windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); @@ -660,24 +654,6 @@ public static void main(String[] args) throws Exception { worker.start(); } - private static ChannelCachingStubFactory createStubFactory( - DataflowWorkerHarnessOptions workerOptions) { - Function channelFactory = - serviceAddress -> - remoteChannel( - serviceAddress, workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()); - ChannelCache channelCache = - ChannelCache.create( - serviceAddress -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress via calling the channelFactory, else just directly return the - // RPC channel. - workerOptions.getUseWindmillIsolatedChannels() - ? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) - : channelFactory.apply(serviceAddress)); - return ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache); - } - private static int chooseMaxThreads(DataflowWorkerHarnessOptions options) { if (options.getNumberOfWorkerHarnessThreads() != 0) { return options.getNumberOfWorkerHarnessThreads(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 412608ea3981..f96464150d4a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -26,9 +26,12 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.DataflowRunner; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub; @@ -36,6 +39,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -49,7 +53,8 @@ public class GrpcDispatcherClient { private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class); - private final WindmillStubFactory windmillStubFactory; + static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS = + "streaming_engine_use_job_settings_for_isolated_channels"; private final CountDownLatch onInitializedEndpoints; /** @@ -62,23 +67,49 @@ public class GrpcDispatcherClient { @GuardedBy("this") private final Random rand; + private final WindmillStubFactoryFactory windmillStubFactoryFactory; + + private final AtomicReference windmillStubFactory = new AtomicReference<>(); + + private final AtomicBoolean useIsolatedChannels = new AtomicBoolean(); + private final boolean reactToIsolatedChannelsJobSetting; + private GrpcDispatcherClient( - WindmillStubFactory windmillStubFactory, + DataflowWorkerHarnessOptions options, + WindmillStubFactoryFactory windmillStubFactoryFactory, DispatcherStubs initialDispatcherStubs, Random rand) { - this.windmillStubFactory = windmillStubFactory; + this.windmillStubFactoryFactory = windmillStubFactoryFactory; + if (DataflowRunner.hasExperiment( + options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) { + if (options.getUseWindmillIsolatedChannels() != null) { + this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels()); + this.reactToIsolatedChannelsJobSetting = false; + } else { + this.useIsolatedChannels.set(false); + this.reactToIsolatedChannelsJobSetting = true; + } + } else { + this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels())); + this.reactToIsolatedChannelsJobSetting = false; + } + this.windmillStubFactory.set( + windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get())); this.rand = rand; this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs); this.onInitializedEndpoints = new CountDownLatch(1); } - public static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { - return new GrpcDispatcherClient(windmillStubFactory, DispatcherStubs.empty(), new Random()); + public static GrpcDispatcherClient create( + DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory windmillStubFactoryFactory) { + return new GrpcDispatcherClient( + options, windmillStubFactoryFactory, DispatcherStubs.empty(), new Random()); } @VisibleForTesting public static GrpcDispatcherClient forTesting( - WindmillStubFactory windmillGrpcStubFactory, + DataflowWorkerHarnessOptions options, + WindmillStubFactoryFactory windmillStubFactoryFactory, List windmillServiceStubs, List windmillMetadataServiceStubs, Set dispatcherEndpoints) { @@ -86,7 +117,8 @@ public static GrpcDispatcherClient forTesting( dispatcherEndpoints.size() == windmillServiceStubs.size() && windmillServiceStubs.size() == windmillMetadataServiceStubs.size()); return new GrpcDispatcherClient( - windmillGrpcStubFactory, + options, + windmillStubFactoryFactory, DispatcherStubs.create( dispatcherEndpoints, windmillServiceStubs, windmillMetadataServiceStubs), new Random()); @@ -153,17 +185,31 @@ public void onJobConfig(StreamingGlobalConfig config) { LOG.warn("Dispatcher client received empty windmill service endpoints from global config"); return; } - consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints()); + boolean forceRecreateStubs = false; + if (reactToIsolatedChannelsJobSetting) { + boolean useIsolatedChannels = config.userWorkerJobSettings().getUseWindmillIsolatedChannels(); + if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != useIsolatedChannels) { + windmillStubFactory.set( + windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels)); + forceRecreateStubs = true; + } + } + consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), forceRecreateStubs); } public synchronized void consumeWindmillDispatcherEndpoints( ImmutableSet dispatcherEndpoints) { + consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /*forceRecreateStubs=*/ false); + } + + private synchronized void consumeWindmillDispatcherEndpoints( + ImmutableSet dispatcherEndpoints, boolean forceRecreateStubs) { ImmutableSet currentDispatcherEndpoints = dispatcherStubs.get().dispatcherEndpoints(); Preconditions.checkArgument( dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(), "Cannot set dispatcher endpoints to nothing."); - if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) { + if (!forceRecreateStubs && currentDispatcherEndpoints.equals(dispatcherEndpoints)) { // The endpoints are equal don't recreate the stubs. return; } @@ -174,7 +220,7 @@ public synchronized void consumeWindmillDispatcherEndpoints( } LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", dispatcherEndpoints); - dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory)); + dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory.get())); onInitializedEndpoints.countDown(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index 1fce4d238b2e..310495982679 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -53,7 +53,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -154,7 +154,7 @@ static GrpcWindmillServer newTestInstance( String name, List experiments, long clientId, - WindmillStubFactory windmillStubFactory) { + WindmillStubFactoryFactory windmillStubFactoryFactory) { ManagedChannel inProcessChannel = inProcessChannel(name); CloudWindmillServiceV1Alpha1Stub stub = CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel); @@ -164,16 +164,18 @@ static GrpcWindmillServer newTestInstance( List windmillMetadataServiceStubs = Lists.newArrayList(metadataStub); + DataflowWorkerHarnessOptions testOptions = + testOptions(/* enableStreamingEngine= */ true, experiments); + Set dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name)); GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.forTesting( - windmillStubFactory, + testOptions, + windmillStubFactoryFactory, windmillServiceStubs, windmillMetadataServiceStubs, dispatcherEndpoints); - DataflowWorkerHarnessOptions testOptions = - testOptions(/* enableStreamingEngine= */ true, experiments); boolean sendKeyedGetDataRequests = !testOptions.isEnableStreamingEngine() || DataflowRunner.hasExperiment( @@ -190,7 +192,7 @@ static GrpcWindmillServer newTestInstance( @VisibleForTesting static GrpcWindmillServer newApplianceTestInstance( - Channel channel, WindmillStubFactory windmillStubFactory) { + Channel channel, WindmillStubFactoryFactory windmillStubFactoryFactory) { DataflowWorkerHarnessOptions options = testOptions(/* enableStreamingEngine= */ false, new ArrayList<>()); GrpcWindmillServer testServer = @@ -198,7 +200,7 @@ static GrpcWindmillServer newApplianceTestInstance( options, GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(), // No-op, Appliance does not use Dispatcher to call Streaming Engine. - GrpcDispatcherClient.create(windmillStubFactory)); + GrpcDispatcherClient.create(options, windmillStubFactoryFactory)); testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); return testServer; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java new file mode 100644 index 000000000000..f7dd9a22b996 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import org.apache.beam.sdk.annotations.Internal; + +@Internal +public interface WindmillStubFactoryFactory { + WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java new file mode 100644 index 000000000000..f6ffb9c14519 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; + +import com.google.auth.Credentials; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; + +public class WindmillStubFactoryFactoryImpl implements WindmillStubFactoryFactory { + + private final int windmillServiceRpcChannelAliveTimeoutSec; + private final Credentials gcpCredential; + + public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions) { + this.gcpCredential = workerOptions.getGcpCredential(); + this.windmillServiceRpcChannelAliveTimeoutSec = + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(); + } + + @Override + public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) { + Function channelFactory = + serviceAddress -> remoteChannel(serviceAddress, windmillServiceRpcChannelAliveTimeoutSec); + ChannelCache channelCache = + ChannelCache.create( + serviceAddress -> + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress via calling the channelFactory, else just directly return the + // RPC channel. + useIsolatedChannels + ? IsolationChannel.create(() -> channelFactory.apply(serviceAddress)) + : channelFactory.apply(serviceAddress)); + return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index aaa71b6598ea..ed8815c48e76 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest; @@ -54,10 +55,12 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress; @@ -111,7 +114,11 @@ public class FanOutStreamingEngineWorkerHarnessTest { WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest"))); private final GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.forTesting( - stubFactory, new ArrayList<>(), new ArrayList<>(), new HashSet<>()); + PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class), + new FakeWindmillStubFactoryFactory(stubFactory), + new ArrayList<>(), + new ArrayList<>(), + new HashSet<>()); @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private Server fakeStreamingEngineServer; private CountDownLatch getWorkerMetadataReady; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java new file mode 100644 index 000000000000..3f746d91a868 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.hamcrest.Matcher; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Enclosed.class) +public class GrpcDispatcherClientTest { + + @RunWith(JUnit4.class) + public static class RespectsJobSettingTest { + + @Test + public void createsNewStubWhenIsolatedChannelsConfigIsChanged() { + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); + options.setExperiments( + Lists.newArrayList( + GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)); + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + // Create first time with Isolated channels disabled + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); + CloudWindmillServiceV1Alpha1Stub stub1 = dispatcherClient.getWindmillServiceStub(); + CloudWindmillServiceV1Alpha1Stub stub2 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub2, stub1); + assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class))); + + // Enable Isolated channels + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ true)); + CloudWindmillServiceV1Alpha1Stub stub3 = dispatcherClient.getWindmillServiceStub(); + assertNotSame(stub3, stub1); + + assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class)); + CloudWindmillServiceV1Alpha1Stub stub4 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub3, stub4); + + // Disable Isolated channels + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); + CloudWindmillServiceV1Alpha1Stub stub5 = dispatcherClient.getWindmillServiceStub(); + assertNotSame(stub4, stub5); + assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class))); + } + } + + @RunWith(Parameterized.class) + public static class RespectsPipelineOptionsTest { + + @Parameters + public static Collection data() { + List list = new ArrayList<>(); + for (Boolean pipelineOption : new Boolean[] {true, false}) { + list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption}); + list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption}); + } + return list; + } + + @Parameter(0) + public Boolean experimentEnabled; + + @Parameter(1) + public Boolean pipelineOption; + + @Test + public void ignoresIsolatedChannelsConfigWithPipelineOption() { + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); + if (experimentEnabled) { + options.setExperiments( + Lists.newArrayList( + GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)); + } + options.setUseWindmillIsolatedChannels(pipelineOption); + GrpcDispatcherClient dispatcherClient = + GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + Matcher classMatcher = + pipelineOption + ? instanceOf(IsolationChannel.class) + : not(instanceOf(IsolationChannel.class)); + + // Job setting disabled, PipelineOption enabled + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); + CloudWindmillServiceV1Alpha1Stub stub1 = dispatcherClient.getWindmillServiceStub(); + CloudWindmillServiceV1Alpha1Stub stub2 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub2, stub1); + assertThat(stub1.getChannel(), classMatcher); + + // Job setting enabled + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ true)); + CloudWindmillServiceV1Alpha1Stub stub3 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub3, stub1); + + CloudWindmillServiceV1Alpha1Stub stub4 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub3, stub4); + + // Job setting disabled + dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); + CloudWindmillServiceV1Alpha1Stub stub5 = dispatcherClient.getWindmillServiceStub(); + assertSame(stub4, stub5); + } + } + + static StreamingGlobalConfig getGlobalConfig(boolean useWindmillIsolatedChannels) { + return StreamingGlobalConfig.builder() + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels) + .build()) + .build(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index 7e5801b65de4..239e3979a3b7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -73,6 +73,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel; @@ -110,6 +111,7 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class GrpcWindmillServerTest { + private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServerTest.class); private static final int STREAM_CHUNK_SIZE = 2 << 20; private final long clientId = 10L; @@ -145,8 +147,9 @@ private void startServerAndClient(List experiments) throws Exception { name, experiments, clientId, - new FakeWindmillStubFactory( - () -> grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name)))); + new FakeWindmillStubFactoryFactory( + new FakeWindmillStubFactory( + () -> grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))))); } private void maybeInjectError(Stream stream) { @@ -212,7 +215,9 @@ public ClientCall interceptCall( this.client = GrpcWindmillServer.newApplianceTestInstance( - inprocessChannel, new FakeWindmillStubFactory(() -> (ManagedChannel) inprocessChannel)); + inprocessChannel, + new FakeWindmillStubFactoryFactory( + new FakeWindmillStubFactory(() -> (ManagedChannel) inprocessChannel))); Windmill.GetWorkResponse response1 = client.getWork(GetWorkRequest.getDefaultInstance()); Windmill.GetWorkResponse response2 = client.getWork(GetWorkRequest.getDefaultInstance()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java new file mode 100644 index 000000000000..51f8b8e14320 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.testing; + +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; + +public class FakeWindmillStubFactoryFactory implements WindmillStubFactoryFactory { + + private final WindmillStubFactory windmillStubFactory; + + public FakeWindmillStubFactoryFactory(WindmillStubFactory windmillStubFactory) { + this.windmillStubFactory = windmillStubFactory; + } + + @Override + public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) { + return windmillStubFactory; + } +}