Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YAML] Allow windowing to be done in Java or Python. #30055

Merged
merged 4 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ protected String getKindString() {
* Pipeline authors should use {@link Window} directly instead.
*/
public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>> {
private final Window<T> original;
private final @Nullable Window<T> original;
private final WindowingStrategy<T, ?> updatedStrategy;

/**
Expand All @@ -463,7 +463,7 @@ public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>>
* #getWindowFn()}.
*/
@VisibleForTesting
Assign(Window<T> original, WindowingStrategy updatedStrategy) {
Assign(@Nullable Window<T> original, WindowingStrategy updatedStrategy) {
this.original = original;
this.updatedStrategy = updatedStrategy;
}
Expand All @@ -476,12 +476,18 @@ public PCollection<T> expand(PCollection<T> input) {

@Override
public void populateDisplayData(DisplayData.Builder builder) {
original.populateDisplayData(builder);
if (original != null) {
original.populateDisplayData(builder);
}
}

public @Nullable WindowFn<T, ?> getWindowFn() {
return updatedStrategy.getWindowFn();
}

public static <T> Assign<T> createInternal(WindowingStrategy finalStrategy) {
return new Assign<T>(null, finalStrategy);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.expansion.service;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowingStrategy;

/**
* An implementation of {@link TypedSchemaTransformProvider} for WindowInto.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@AutoService(SchemaTransformProvider.class)
public class WindowIntoTransformProvider
extends TypedSchemaTransformProvider<WindowIntoTransformProvider.Configuration> {
private static final String INPUT_ROWS_TAG = "input";

protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
try {
return new WindowIntoStrategy(
(WindowingStrategy<Row, ?>)
WindowingStrategyTranslation.fromProto(
RunnerApi.WindowingStrategy.parseFrom(
configuration.getSerializedWindowingStrategy()),
null));
} catch (IOException exn) {
throw new RuntimeException(exn);
}
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:yaml:window_into_strategy:v1";
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_ROWS_TAG);
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Configuration {

@SuppressWarnings({"AutoValueMutable", "mutable"})
public abstract byte[] getSerializedWindowingStrategy();

public static Builder builder() {
return new AutoValue_WindowIntoTransformProvider_Configuration.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setSerializedWindowingStrategy(byte[] serializedWindowingStrategy);

public abstract Configuration build();
}
}

private static class WindowIntoStrategy extends SchemaTransform {

private final WindowingStrategy<Row, ?> windowingStrategy;

WindowIntoStrategy(WindowingStrategy<Row, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple inputTuple) {
PCollection<Row> input = inputTuple.get(INPUT_ROWS_TAG);
return PCollectionRowTuple.of(
OUTPUT_ROWS_TAG,
input
.apply(Window.Assign.<Row>createInternal(windowingStrategy))
.setCoder(input.getCoder()));
}
}
}
77 changes: 77 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import apache_beam.io
import apache_beam.transforms.util
from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
from apache_beam.transforms import external
from apache_beam.transforms import window
from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
Expand Down Expand Up @@ -755,6 +756,81 @@ def create_builtin_provider():
no_input_transforms=('Create', ))


class TranslatingProvider(Provider):
def __init__(
self,
transforms: Mapping[str, Callable[..., beam.PTransform]],
underlying_provider: Provider):
self._transforms = transforms
self._underlying_provider = underlying_provider

def provided_transforms(self):
return self._transforms.keys()

def available(self):
return self._underlying_provider.available()

def cache_artifacts(self):
return self._underlying_provider.cache_artifacts()

def underlying_provider(self):
return self._underlying_provider

def to_json(self):
return {'type': "TranslatingProvider"}

def create_transform(
self, typ: str, config: Mapping[str, Any],
yaml_create_transform: Any) -> beam.PTransform:
return self._transforms[typ](self._underlying_provider, **config)
Comment on lines +759 to +785
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does there need to be a new provider? Seems as though RenamingProvider has all the relevant functionality and would allow mappings and defaults should there be Java built-in transforms that expose those in the future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does a deeper rewriting of the arguments than renaming, though I was thinking that the renaming transform could be defined as a special case of this one. (The SQL-based one possibly as well.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think they could be merged in either direction, but not a blocker. LGTM



def create_java_builtin_provider():
"""Exposes built-in transforms from Java as well as Python to maximize
opportunities for fusion.

This class holds those transforms that require pre-processing of the configs.
For those Java transforms that can consume the user-provided configs directly
(or only need a simple renaming of parameters) a direct or renaming provider
is the simpler choice.
"""

# An alternative could be examining the capabilities of various environments
# during (or as a pre-processing phase before) fusion to align environments
# where possible. This would also require extra care in skipping these
# common transforms when doing the provider affinity analysis.

def java_window_into(java_provider, **config):
"""Parses the config into a WindowingStrategy and invokes the Java class.

Though it would not be that difficult to implement this in Java as well,
we prefer to implement it exactly once for consistency (especially as
it evolves).
"""
windowing_strategy = YamlProviders.WindowInto._parse_window_spec(
config).get_windowing(None)
# No context needs to be preserved for the basic WindowFns.
empty_context = pipeline_context.PipelineContext()
return java_provider.create_transform(
'WindowIntoStrategy',
{
'serializedWindowingStrategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)

return TranslatingProvider(
transforms={'WindowInto': java_window_into},
underlying_provider=beam_jar(
urns={
'WindowIntoStrategy': (
'beam:schematransform:'
'org.apache.beam:yaml:window_into_strategy:v1')
},
gradle_target=
'sdks:java:extensions:schemaio-expansion-service:shadowJar'))


class PypiExpansionService:
"""Expands transforms by fully qualified name in a virtual environment
with the given dependencies.
Expand Down Expand Up @@ -993,6 +1069,7 @@ def standard_providers():

return merge_providers(
YamlProviders.create_builtin_provider(),
create_java_builtin_provider(),
create_mapping_providers(),
create_combine_providers(),
io_providers(),
Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ def followers(self, transform_name):
for transform in self._transforms:
if transform['type'] != 'composite':
for input in empty_if_explicitly_empty(transform['input']).values():
transform_id, _ = self.get_transform_id_and_output_name(input)
self._all_followers[transform_id].append(transform['__uuid__'])
if input not in self._inputs:
transform_id, _ = self.get_transform_id_and_output_name(input)
self._all_followers[transform_id].append(transform['__uuid__'])
return self._all_followers[self.get_transform_id(transform_name)]

def compute_all(self):
Expand Down Expand Up @@ -738,7 +739,9 @@ def preprocess_windowing(spec):
'type': 'WindowInto',
'name': f'WindowInto[{key}]',
'windowing': windowing,
'input': key,
'input': {
'input': key
},
'__line__': spec['__line__'],
'__uuid__': SafeLineLoader.create_uuid(),
} for key in original_inputs.keys()]
Expand Down
Loading