diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 4bc35dfff6ce..a797dd2314cb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; @@ -138,7 +139,7 @@ public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { throw new RuntimeException(e); } - if (options.getUsePortableRunner()) { + if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) { LOG.info("Using portability layer"); // NOTE: Because the pipeline fuser only operates on and returns protos, we do another round // trip here. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index bcebadb5e905..b2cbefbc5b06 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -137,11 +137,6 @@ public interface FlinkPipelineOptions Long getMaxBundleTimeMills(); void setMaxBundleTimeMills(Long time); - @Description("Whether to use the portable runner layer") - @Default.Boolean(false) - Boolean getUsePortableRunner(); - void setUsePortableRunner(Boolean usePortableRunner); - /** * Whether to shutdown sources when their watermark reaches {@code +Inf}. For production use * cases you want this to be disabled because Flink will currently (versions {@literal <=} 1.5) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index a0fd99437687..14e55c35b33b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; -import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment; import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; import static org.apache.beam.runners.dataflow.util.Structs.addDictionary; import static org.apache.beam.runners.dataflow.util.Structs.addList; @@ -29,6 +28,7 @@ import static org.apache.beam.runners.dataflow.util.Structs.addObject; import static org.apache.beam.runners.dataflow.util.Structs.addString; import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java index c3108b69a87e..e163fe8d6745 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.hamcrest.MatcherAssert.assertThat; import com.google.api.services.dataflow.model.JobMessage; @@ -209,7 +210,7 @@ private static String errorMessage( @VisibleForTesting void updatePAssertCount(Pipeline pipeline) { - if (DataflowRunner.hasExperiment(options, "beam_fn_api")) { + if (hasExperiment(options, "beam_fn_api")) { // TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions. expectedNumberOfAssertions = 0; } else { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 4520c057a3c0..a8b128dc3050 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -1290,22 +1289,6 @@ public void testTemplateRunnerLoggedErrorForFile() throws Exception { p.run(); } - @Test - public void testHasExperiment() { - DataflowPipelineDebugOptions options = - PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class); - - options.setExperiments(null); - assertFalse(DataflowRunner.hasExperiment(options, "foo")); - - options.setExperiments(ImmutableList.of("foo", "bar")); - assertTrue(DataflowRunner.hasExperiment(options, "foo")); - assertTrue(DataflowRunner.hasExperiment(options, "bar")); - assertFalse(DataflowRunner.hasExperiment(options, "baz")); - assertFalse(DataflowRunner.hasExperiment(options, "ba")); - assertFalse(DataflowRunner.hasExperiment(options, "BAR")); - } - @Test public void testWorkerHarnessContainerImage() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java index cb5c41c11827..05de22cc99ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java @@ -35,4 +35,18 @@ public interface ExperimentalOptions extends PipelineOptions { @Nullable List getExperiments(); void setExperiments(@Nullable List value); + + /** + * Returns true iff the provided pipeline options has the specified experiment + * enabled. + */ + static boolean hasExperiment(PipelineOptions options, String experiment) { + if (options == null) { + return false; + } + + List experiments = options.as(ExperimentalOptions.class).getExperiments(); + return experiments != null + && experiments.contains(experiment); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index e20addd36db1..7630fb3e1670 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -477,6 +477,8 @@ Class getProxyClass() { /** A predicate that checks if a method is synthetic via {@link Method#isSynthetic()}. */ private static final Predicate NOT_SYNTHETIC_PREDICATE = input -> !input.isSynthetic(); + private static final Predicate NOT_STATIC_PREDICATE = + input -> !Modifier.isStatic(input.getModifiers()); /** Ensure all classloader or volatile data are contained in a single reference. */ static final AtomicReference CACHE = new AtomicReference<>(); @@ -865,6 +867,7 @@ private static List validateClass(Class descriptors = getPropertyDescriptors(allInterfaceMethods, iface); @@ -1135,7 +1138,9 @@ private static void validateMethodsAreEitherBeanMethodOrKnownMethod( Sets.filter( Sets.difference(Sets.newHashSet(iface.getMethods()), knownMethods), Predicates.and( - NOT_SYNTHETIC_PREDICATE, input -> !knownMethodsNames.contains(input.getName())))); + NOT_SYNTHETIC_PREDICATE, + input -> !knownMethodsNames.contains(input.getName()), + NOT_STATIC_PREDICATE))); checkArgument(unknownMethods.isEmpty(), "Methods %s on [%s] do not conform to being bean properties.", FluentIterable.from(unknownMethods).transform(ReflectHelpers.METHOD_FORMATTER), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ExperimentalOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ExperimentalOptionsTest.java new file mode 100644 index 000000000000..c60007aaf021 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ExperimentalOptionsTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ExperimentalOptions}. */ +@RunWith(JUnit4.class) +public class ExperimentalOptionsTest { + @Test + public void testExperimentIsSet() { + ExperimentalOptions options = + PipelineOptionsFactory.fromArgs("--experiments=experimentA,experimentB") + .as(ExperimentalOptions.class); + assertTrue(ExperimentalOptions.hasExperiment(options, "experimentA")); + assertTrue(ExperimentalOptions.hasExperiment(options, "experimentB")); + assertFalse(ExperimentalOptions.hasExperiment(options, "experimentC")); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index e4c4102d6b64..be92f2ac4d03 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -1859,5 +1859,20 @@ default Number getValue() { void setValue(Number value); } + @Test + public void testStaticMethodsAreAllowed() { + assertEquals("value", + OptionsWithStaticMethod.myStaticMethod( + PipelineOptionsFactory.fromArgs("--myMethod=value") + .as(OptionsWithStaticMethod.class))); + } + + private interface OptionsWithStaticMethod extends PipelineOptions { + String getMyMethod(); + void setMyMethod(String value); + static String myStaticMethod(OptionsWithStaticMethod o) { + return o.getMyMethod(); + } + } } diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 839b0b397090..e3d9bc24c5c2 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -169,7 +169,7 @@ def run_pipeline(self, pipeline): # or remove them from being required during execution. options['beam:option:app_name:v1'] = 'ApacheBeamIsTheBest' options['beam:option:runner:v1'] = 'org.apache.beam.runners.flink.FlinkRunner' - options['beam:option:use_portable_runner:v1'] = 'true' + options['beam:option:experiments:v1'] = ['beam_fn_api'] job_service = self._get_job_service() prepare_response = job_service.Prepare(