Skip to content

Commit

Permalink
[YAML] Allow windowing to be done in Java or Python. (#30055)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jan 23, 2024
1 parent 9dd3076 commit 7ff25d8
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ protected String getKindString() {
* Pipeline authors should use {@link Window} directly instead.
*/
public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final Window<T> original;
private final @Nullable Window<T> original;
private final WindowingStrategy<T, ?> updatedStrategy;

/**
Expand All @@ -463,7 +463,7 @@ public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>>
* #getWindowFn()}.
*/
@VisibleForTesting
Assign(Window<T> original, WindowingStrategy updatedStrategy) {
Assign(@Nullable Window<T> original, WindowingStrategy updatedStrategy) {
this.original = original;
this.updatedStrategy = updatedStrategy;
}
Expand All @@ -476,12 +476,18 @@ public PCollection<T> expand(PCollection<T> input) {

@Override
public void populateDisplayData(DisplayData.Builder builder) {
original.populateDisplayData(builder);
if (original != null) {
original.populateDisplayData(builder);
}
}

public @Nullable WindowFn<T, ?> getWindowFn() {
return updatedStrategy.getWindowFn();
}

public static <T> Assign<T> createInternal(WindowingStrategy finalStrategy) {
return new Assign<T>(null, finalStrategy);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.expansion.service;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
* An implementation of {@link TypedSchemaTransformProvider} for WindowInto.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@AutoService(SchemaTransformProvider.class)
public class WindowIntoTransformProvider
extends TypedSchemaTransformProvider<WindowIntoTransformProvider.Configuration> {
private static final String INPUT_ROWS_TAG = "input";

protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
try {
return new WindowIntoStrategy(
(WindowingStrategy<Row, ?>)
WindowingStrategyTranslation.fromProto(
RunnerApi.WindowingStrategy.parseFrom(
configuration.getSerializedWindowingStrategy()),
null));
} catch (IOException exn) {
throw new RuntimeException(exn);
}
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration {

@SuppressWarnings({"AutoValueMutable", "mutable"})
public abstract byte[] getSerializedWindowingStrategy();

public static Builder builder() {
return new AutoValue_WindowIntoTransformProvider_Configuration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setSerializedWindowingStrategy(byte[] serializedWindowingStrategy);

public abstract Configuration build();
}
}

private static class WindowIntoStrategy extends SchemaTransform {

private final WindowingStrategy<Row, ?> windowingStrategy;

WindowIntoStrategy(WindowingStrategy<Row, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple inputTuple) {
PCollection<Row> input = inputTuple.get(INPUT_ROWS_TAG);
return PCollectionRowTuple.of(
OUTPUT_ROWS_TAG,
input
.apply(Window.Assign.<Row>createInternal(windowingStrategy))
.setCoder(input.getCoder()));
}
}
}
77 changes: 77 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import apache_beam.io
import apache_beam.transforms.util
from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
from apache_beam.transforms import external
from apache_beam.transforms import window
from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
Expand Down Expand Up @@ -755,6 +756,81 @@ def create_builtin_provider():
no_input_transforms=('Create', ))


class TranslatingProvider(Provider):
def __init__(
self,
transforms: Mapping[str, Callable[..., beam.PTransform]],
underlying_provider: Provider):
self._transforms = transforms
self._underlying_provider = underlying_provider

def provided_transforms(self):
return self._transforms.keys()

def available(self):
return self._underlying_provider.available()

def cache_artifacts(self):
return self._underlying_provider.cache_artifacts()

def underlying_provider(self):
return self._underlying_provider

def to_json(self):
return {'type': "TranslatingProvider"}

def create_transform(
self, typ: str, config: Mapping[str, Any],
yaml_create_transform: Any) -> beam.PTransform:
return self._transforms[typ](self._underlying_provider, **config)


def create_java_builtin_provider():
"""Exposes built-in transforms from Java as well as Python to maximize
opportunities for fusion.
This class holds those transforms that require pre-processing of the configs.
For those Java transforms that can consume the user-provided configs directly
(or only need a simple renaming of parameters) a direct or renaming provider
is the simpler choice.
"""

# An alternative could be examining the capabilities of various environments
# during (or as a pre-processing phase before) fusion to align environments
# where possible. This would also require extra care in skipping these
# common transforms when doing the provider affinity analysis.

def java_window_into(java_provider, **config):
"""Parses the config into a WindowingStrategy and invokes the Java class.
Though it would not be that difficult to implement this in Java as well,
we prefer to implement it exactly once for consistency (especially as
it evolves).
"""
windowing_strategy = YamlProviders.WindowInto._parse_window_spec(
config).get_windowing(None)
# No context needs to be preserved for the basic WindowFns.
empty_context = pipeline_context.PipelineContext()
return java_provider.create_transform(
'WindowIntoStrategy',
{
'serializedWindowingStrategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)

return TranslatingProvider(
transforms={'WindowInto': java_window_into},
underlying_provider=beam_jar(
urns={
'WindowIntoStrategy': (
'beam:schematransform:'
'org.apache.beam:yaml:window_into_strategy:v1')
},
gradle_target=
'sdks:java:extensions:schemaio-expansion-service:shadowJar'))


class PypiExpansionService:
"""Expands transforms by fully qualified name in a virtual environment
with the given dependencies.
Expand Down Expand Up @@ -993,6 +1069,7 @@ def standard_providers():

return merge_providers(
YamlProviders.create_builtin_provider(),
create_java_builtin_provider(),
create_mapping_providers(),
create_combine_providers(),
io_providers(),
Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ def followers(self, transform_name):
for transform in self._transforms:
if transform['type'] != 'composite':
for input in empty_if_explicitly_empty(transform['input']).values():
transform_id, _ = self.get_transform_id_and_output_name(input)
self._all_followers[transform_id].append(transform['__uuid__'])
if input not in self._inputs:
transform_id, _ = self.get_transform_id_and_output_name(input)
self._all_followers[transform_id].append(transform['__uuid__'])
return self._all_followers[self.get_transform_id(transform_name)]

def compute_all(self):
Expand Down Expand Up @@ -738,7 +739,9 @@ def preprocess_windowing(spec):
'type': 'WindowInto',
'name': f'WindowInto[{key}]',
'windowing': windowing,
'input': key,
'input': {
'input': key
},
'__line__': spec['__line__'],
'__uuid__': SafeLineLoader.create_uuid(),
} for key in original_inputs.keys()]
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def test_preprocess_windowing_custom_type(self):
windowing:
type: fixed
size: 4
input: input
input: {{input: input}}
output: {result['transforms'][0]['__uuid__']}
'''
self.assertYaml(expected, result)
Expand Down

0 comments on commit 7ff25d8

Please sign in to comment.