diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index c22aa35d8f3aa..32e218d3cb111 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -454,7 +454,7 @@ protected String getKindString() { * Pipeline authors should use {@link Window} directly instead. */ public static class Assign extends PTransform, PCollection> { - private final Window original; + private final @Nullable Window original; private final WindowingStrategy updatedStrategy; /** @@ -463,7 +463,7 @@ public static class Assign extends PTransform, PCollection> * #getWindowFn()}. */ @VisibleForTesting - Assign(Window original, WindowingStrategy updatedStrategy) { + Assign(@Nullable Window original, WindowingStrategy updatedStrategy) { this.original = original; this.updatedStrategy = updatedStrategy; } @@ -476,12 +476,18 @@ public PCollection expand(PCollection input) { @Override public void populateDisplayData(DisplayData.Builder builder) { - original.populateDisplayData(builder); + if (original != null) { + original.populateDisplayData(builder); + } } public @Nullable WindowFn getWindowFn() { return updatedStrategy.getWindowFn(); } + + public static Assign createInternal(WindowingStrategy finalStrategy) { + return new Assign(null, finalStrategy); + } } /** diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java new file mode 100644 index 0000000000000..6272b9445eb8d --- /dev/null +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/WindowIntoTransformProvider.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.expansion.service; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for WindowInto. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class WindowIntoTransformProvider + extends TypedSchemaTransformProvider { + private static final String INPUT_ROWS_TAG = "input"; + + protected static final String OUTPUT_ROWS_TAG = "output"; + + @Override + protected Class configurationClass() { + return Configuration.class; + } + + @Override + protected SchemaTransform from(Configuration configuration) { + try { + return new WindowIntoStrategy( + (WindowingStrategy) + WindowingStrategyTranslation.fromProto( + RunnerApi.WindowingStrategy.parseFrom( + configuration.getSerializedWindowingStrategy()), + null)); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1"; + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_ROWS_TAG); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_ROWS_TAG); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + + @SuppressWarnings({"AutoValueMutable", "mutable"}) + public abstract byte[] getSerializedWindowingStrategy(); + + public static Builder builder() { + return new AutoValue_WindowIntoTransformProvider_Configuration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setSerializedWindowingStrategy(byte[] serializedWindowingStrategy); + + public abstract Configuration build(); + } + } + + private static class WindowIntoStrategy extends SchemaTransform { + + private final WindowingStrategy windowingStrategy; + + WindowIntoStrategy(WindowingStrategy windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple inputTuple) { + PCollection input = inputTuple.get(INPUT_ROWS_TAG); + return PCollectionRowTuple.of( + OUTPUT_ROWS_TAG, + input + .apply(Window.Assign.createInternal(windowingStrategy)) + .setCoder(input.getCoder())); + } + } +} diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 99c98f891de43..5e9da3b0b05ae 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -46,6 +46,7 @@ import apache_beam.io import apache_beam.transforms.util from apache_beam.portability.api import schema_pb2 +from apache_beam.runners import pipeline_context from apache_beam.transforms import external from apache_beam.transforms import window from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform @@ -755,6 +756,81 @@ def create_builtin_provider(): no_input_transforms=('Create', )) +class TranslatingProvider(Provider): + def __init__( + self, + transforms: Mapping[str, Callable[..., beam.PTransform]], + underlying_provider: Provider): + self._transforms = transforms + self._underlying_provider = underlying_provider + + def provided_transforms(self): + return self._transforms.keys() + + def available(self): + return self._underlying_provider.available() + + def cache_artifacts(self): + return self._underlying_provider.cache_artifacts() + + def underlying_provider(self): + return self._underlying_provider + + def to_json(self): + return {'type': "TranslatingProvider"} + + def create_transform( + self, typ: str, config: Mapping[str, Any], + yaml_create_transform: Any) -> beam.PTransform: + return self._transforms[typ](self._underlying_provider, **config) + + +def create_java_builtin_provider(): + """Exposes built-in transforms from Java as well as Python to maximize + opportunities for fusion. + + This class holds those transforms that require pre-processing of the configs. + For those Java transforms that can consume the user-provided configs directly + (or only need a simple renaming of parameters) a direct or renaming provider + is the simpler choice. + """ + + # An alternative could be examining the capabilities of various environments + # during (or as a pre-processing phase before) fusion to align environments + # where possible. This would also require extra care in skipping these + # common transforms when doing the provider affinity analysis. + + def java_window_into(java_provider, **config): + """Parses the config into a WindowingStrategy and invokes the Java class. + + Though it would not be that difficult to implement this in Java as well, + we prefer to implement it exactly once for consistency (especially as + it evolves). + """ + windowing_strategy = YamlProviders.WindowInto._parse_window_spec( + config).get_windowing(None) + # No context needs to be preserved for the basic WindowFns. + empty_context = pipeline_context.PipelineContext() + return java_provider.create_transform( + 'WindowIntoStrategy', + { + 'serializedWindowingStrategy': windowing_strategy.to_runner_api( + empty_context).SerializeToString() + }, + None) + + return TranslatingProvider( + transforms={'WindowInto': java_window_into}, + underlying_provider=beam_jar( + urns={ + 'WindowIntoStrategy': ( + 'beam:schematransform:' + 'org.apache.beam:yaml:window_into_strategy:v1') + }, + gradle_target= + 'sdks:java:extensions:schemaio-expansion-service:shadowJar')) + + class PypiExpansionService: """Expands transforms by fully qualified name in a virtual environment with the given dependencies. @@ -993,6 +1069,7 @@ def standard_providers(): return merge_providers( YamlProviders.create_builtin_provider(), + create_java_builtin_provider(), create_mapping_providers(), create_combine_providers(), io_providers(), diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index ff20685489c2f..03574b5f98ffe 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -237,8 +237,9 @@ def followers(self, transform_name): for transform in self._transforms: if transform['type'] != 'composite': for input in empty_if_explicitly_empty(transform['input']).values(): - transform_id, _ = self.get_transform_id_and_output_name(input) - self._all_followers[transform_id].append(transform['__uuid__']) + if input not in self._inputs: + transform_id, _ = self.get_transform_id_and_output_name(input) + self._all_followers[transform_id].append(transform['__uuid__']) return self._all_followers[self.get_transform_id(transform_name)] def compute_all(self): @@ -738,7 +739,9 @@ def preprocess_windowing(spec): 'type': 'WindowInto', 'name': f'WindowInto[{key}]', 'windowing': windowing, - 'input': key, + 'input': { + 'input': key + }, '__line__': spec['__line__'], '__uuid__': SafeLineLoader.create_uuid(), } for key in original_inputs.keys()] diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py index 00e509310d6f4..a9e4db29d19cd 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -653,7 +653,7 @@ def test_preprocess_windowing_custom_type(self): windowing: type: fixed size: 4 - input: input + input: {{input: input}} output: {result['transforms'][0]['__uuid__']} ''' self.assertYaml(expected, result)