Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24789][prism] V0 Go Direct Runner Replacement #25391

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
// directory.
module github.com/apache/beam/sdks/v2

go 1.18
go 1.19

require (
cloud.google.com/go/bigquery v1.45.0
cloud.google.com/go/bigtable v1.18.1
cloud.google.com/go/datastore v1.10.0
cloud.google.com/go/profiler v0.3.1
cloud.google.com/go/pubsub v1.28.0
cloud.google.com/go/spanner v1.43.0
cloud.google.com/go/storage v1.29.0
github.com/aws/aws-sdk-go-v2 v1.17.3
github.com/aws/aws-sdk-go-v2/config v1.18.11
Expand All @@ -46,27 +48,23 @@ require (
github.com/proullon/ramsql v0.0.0-20211120092837-c8d0a408b939
github.com/spf13/cobra v1.6.1
github.com/testcontainers/testcontainers-go v0.15.0
github.com/tetratelabs/wazero v1.0.0-pre.9
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
go.mongodb.org/mongo-driver v1.11.1
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
golang.org/x/net v0.5.0
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783
golang.org/x/sync v0.1.0
golang.org/x/sys v0.4.0
golang.org/x/text v0.6.0
google.golang.org/api v0.108.0
google.golang.org/api v0.109.0
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
gopkg.in/retry.v1 v1.0.3
gopkg.in/yaml.v2 v2.4.0
)

require cloud.google.com/go/spanner v1.43.0

require (
cloud.google.com/go/bigtable v1.18.1
github.com/tetratelabs/wazero v1.0.0-pre.7
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -136,9 +134,8 @@ require (
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
)
6 changes: 4 additions & 2 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -1137,8 +1139,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200916195026-c9a70fc28ce3/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
190 changes: 190 additions & 0 deletions sdks/go/pkg/beam/runners/prism/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Apache Beam Go Prism Runner

Prism is a local portable Apache Beam runner authored in Go.

* Local, for fast startup and ease of testing on a single machine.
* Portable, in that it uses the Beam FnAPI to communicate with Beam SDKs of any language.
* Go simple concurrency enables clear structures for testing batch through streaming jobs.

It's intended to replace the current Go Direct runner, but also be for general
single machine use.

For Go SDK users:
- Short term: set runner to "prism" to use it, or invoke directly.
- Medium term: switch the default from "direct" to "prism".
- Long term: alias "direct" to "prism", and delete legacy Go direct runner.

Prisms allow breaking apart and separating a beam of light into
it's component wavelengths, as well as recombining them together.

The Prism Runner leans on this metaphor with the goal of making it
easier for users and Beam SDK developers alike to test and validate
aspects of Beam that are presently under represented.

## Configurability

Prism is configurable using YAML, which is eagerly validated on startup.
The configuration contains a set of variants to specify execution behavior,
either to support specific testing goals, or to emulate different runners.

Beam's implementation contains a number of details that are hidden from
users, and to date, no runner implements the same set of features. This
can make SDK or pipeline development difficult, since exactly what is
being tested will vary on the runner being used.

At the top level the configuration contains "variants", and the variants
configure the behaviors of different "handlers" in Prism.

Jobs will be able to provide a pipeline option to select which variant to
use. Multiple jobs on the same prism instance can use different variants.
Jobs which don't provide a variant will default to testing behavior.

All variants should execute the Beam Model faithfully and correctly,
and with few exceptions it should not be possible for there to be an
invalid execution. The machine's the limit.

It's not expected that all handler options are useful for pipeline authors,
These options should remain useful for SDK developers,
or more precise issue reproduction.

For more detail on the motivation, see Robert Burke's (@lostluck) Beam Summit 2022 talk:
https://2022.beamsummit.org/sessions/portable-go-beam-runner/.

Here's a non-exhaustive set of variants.

### Variant Highlight: "default"

The "default" variant is testing focused, intending to route out issues at development
time, rather than discovering them on production runners. Notably, this mode should
never use fusion, executing each Transform individually and independantly, one at a time.

This variant should be able to execute arbitrary pipelines, correctly, with clarity and
precision when an error occurs. Other features supported by the SDK should be enabled by default to
ensure good coverage, such as caches, or RPC reductions like sending elements in
ProcessBundleRequest and Response, as they should not affect correctness. Composite
transforms like Splitable DoFns and Combines should be expanded to ensure coverage.

Additional validations may be added as time goes on.

Does not retry or provide other resilience features, which may mask errors.

To ensure coverage, there may be sibling variants that use mutually exclusive alternative
executions.

### Variant Highlight: "fast"

Not Yet Implemented - Illustrative goal.

The "fast" variant is performance focused, intended for local scale execution.
A psuedo production execution. Fusion optimizations should be performed.
Large PCollection should be offloaded to persistent disk. Bundles should be
dynamically split. Multiple Bundles should be executed simultaneously. And so on.

Pipelines should execute as swiftly as possible within the bounds of correct
execution.

### Variant Hightlight: "flink" "dataflow" "spark" AKA Emulations

Not Yet Implemented - Illustrative goal.

Emulation variants have the goal of replicating on the local scale,
the behaviors of other runners. Flink execution never "lifts" Combines, and
doesn't dynamically split. Dataflow has different characteristics for batch
and streaming execution with certain execution charateristics enabled or
disabled.

As Prism is intended to implement all facets of Beam Model execution, the handlers
can have features selectively disabled to ensure

## Current Limitations

* Experimental and testing use only.
* Executing docker containers isn't yet implemented.
* This precludes running the Java and Python SDKs, or their transforms for Cross Language.
* Loopback execution only.
* No stand alone execution.
* In Memory Only
* Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds.
* Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing.
* Doesn't yet execute all beam pipeline features.
* No UI for job status inspection.

## Implemented so far.

* DoFns
* Side Inputs
* Multiple Outputs
* Flattens
* GBKs
* Includes handling session windows.
* Global Window
* Interval Windowing
* Session Windows.
* Combines lifted and unlifted.
* Expands Splittable DoFns
* Limited support for Process Continuations
* Residuals are rescheduled for execution immeadiately.
* The transform must be finite (and eventually return a stop process continuation)
* Basic Metrics support

## Next feature short list (unordered)

See https://github.com/apache/beam/issues/24789 for current status.

* Resolve watermark advancement for Process Continuations
* Test Stream
* Triggers & Complex Windowing Strategy execution.
* State
* Timers
* "PubSub" Transform
* Support SDK Containers via Testcontainers
* Cross Language Transforms
* FnAPI Optimizations
* Fusion
* Data with ProcessBundleRequest & Response
* Progess tracking
* Channel Splitting
* Dynamic Splitting
* Stand alone execution support
* UI reporting of in progress jobs

This is not a comprehensive feature set, but a set of goals to best
support users of the Go SDK in testing their pipelines.

## How to contribute

Until additional structure is necessary, check the main issue
https://github.com/apache/beam/issues/24789 for the current
status, file an issue for the feature or bug to fix with `[prism]`
in the title, and refer to the main issue, before begining work
to avoid duplication of effort.

If a feature will take a long time, please send a PR to
link to your issue from this README to help others discover it.

Otherwise, ordinary [Beam contribution guidelines apply](https://beam.apache.org/contribute/).

# Long Term Goals

Once support for containers is implemented, Prism should become a target
for the Java Runner Validation tests, which are the current specification
for correct runner behavior. This will inform further feature developement.
62 changes: 62 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Prism internal packages

Go has a mechanism for ["internal" packages](https://go.dev/doc/go1.4#internalpackages)
to prevent use of implementation details outside of their intended use.

This mechanism is used thoroughly for Prism to ensure we can make changes to the
runner's internals without worrying about the exposed surface changes breaking
non-compliant users.

# Structure

Here's a loose description of the current structure of the runner. Leaf packages should
not depend on other parts of the runner. Runner packages can and do depend on other
parts of the SDK, such as for Coder handling.

`config` contains configuration parsing and handling. Leaf package.
Handler configurations are registered by dependant packages.

`urns` contains beam URN strings pulled from the protos. Leaf package.

`engine` contains the core manager for handling elements, watermarks, and windowing strategies.
Determines bundle readiness, and stages to execute. Leaf package.

`jobservices` contains GRPC service handlers for job management and submission.
Should only depend on the `config` and `urns` packages.

`worker` contains interactions with FnAPI services to communicate with worker SDKs. Leaf package
except for dependency on `engine.TentativeData` which will likely be removed at some point.

`internal` AKA the package in this directory root. Contains fhe job execution
flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating
with the `engine` and `worker` packages, and handlers urn.
Most configurable behavior is determined here.

# Testing

The sub packages should have reasonable Unit Test coverage in their own directories, but
most features will be exercised via executing pipelines in this package.

For the time being test DoFns should be added to standard build in order to validate execution
coverage, in particular for Combine and Splittable DoFns.

Eventually these behaviors should be covered by using Prism in the main SDK tests.
Loading