From 8d96dc550b2616b1c45424a03fdad04cfc743fb6 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 7 Aug 2023 11:27:38 -0700 Subject: [PATCH] [#24789] Remainder of changes from #27550. (#27822) * Make the prism runner the default Go SDK runner. * Break cycle with ptest. * [DO NOT SUBMIT] Most changes needec to set prism as default Go SDK runner. * rm commented out code. * Avoid unnecessary logs on normal path. * Fix top. * Adjust Go versions? * [prism] Update symbol lookup to not be unit test specific. * [prism] Support for reshuffles. * [prism] move reshuffle test out of unimplemented. * [prism] Add CoGBK test to unimplemented set. * [prism] graduate additional tests. * delint * [prog] guide updates * [prism] Support CoGBKs and wafer thin fusion. * Make window close strict. * quick first pass * chang cleanup * remove unnecessary churn changes * rm execute line * Update sdks/go/pkg/beam/runners/vet/vet.go Co-authored-by: Ritesh Ghorse --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> Co-authored-by: Ritesh Ghorse --- sdks/go/pkg/beam/core/runtime/harness/datamgr.go | 12 ++++++++++-- sdks/go/pkg/beam/core/runtime/harness/harness.go | 2 +- .../go/pkg/beam/core/runtime/harness/statemgr.go | 9 ++++++++- sdks/go/pkg/beam/core/runtime/symbols.go | 2 +- sdks/go/pkg/beam/create.go | 5 +++++ sdks/go/pkg/beam/create_test.go | 4 ++-- sdks/go/pkg/beam/io/databaseio/database_test.go | 1 + .../go/pkg/beam/io/datastoreio/datastore_test.go | 5 +++++ sdks/go/pkg/beam/pardo_test.go | 4 ++-- .../prism/internal/engine/elementmanager.go | 1 - .../beam/runners/prism/internal/handlerunner.go | 2 +- sdks/go/pkg/beam/runners/prism/internal/stage.go | 8 ++++++-- .../runners/prism/internal/unimplemented_test.go | 1 - .../beam/runners/prism/internal/worker/bundle.go | 2 ++ .../beam/runners/prism/internal/worker/worker.go | 7 ++++--- .../pkg/beam/runners/universal/runnerlib/job.go | 16 ++++++++-------- sdks/go/pkg/beam/runners/vet/vet.go | 2 +- sdks/go/test/integration/integration.go | 3 +-- .../en/documentation/programming-guide.md | 9 ++++++--- 19 files changed, 64 insertions(+), 31 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go index d8c0f4d1d8527..9662ac07c9cdd 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go @@ -27,6 +27,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -128,7 +130,12 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha return nil, err } ch.forceRecreate = func(id string, err error) { - log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err) + switch status.Code(err) { + case codes.Canceled: + // Don't log on context canceled path. + default: + log.Warnf(ctx, "forcing DataChannel[%v] reconnection on port %v due to %v", id, port, err) + } m.mu.Lock() delete(m.ports, port.URL) m.mu.Unlock() @@ -371,7 +378,8 @@ func (c *DataChannel) read(ctx context.Context) { c.terminateStreamOnError(err) c.mu.Unlock() - if err == io.EOF { + st := status.Code(err) + if st == codes.Canceled || err == io.EOF { return } log.Errorf(ctx, "DataChannel.read %v bad: %v", c.id, err) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index d97b6b7db0791..c5db9a85f3677 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -708,6 +708,6 @@ func fail(ctx context.Context, id instructionID, format string, args ...any) *fn // dial to the specified endpoint. if timeout <=0, call blocks until // grpc.Dial succeeds. func dial(ctx context.Context, endpoint, purpose string, timeout time.Duration) (*grpc.ClientConn, error) { - log.Infof(ctx, "Connecting via grpc @ %s for %s ...", endpoint, purpose) + log.Output(ctx, log.SevDebug, 1, fmt.Sprintf("Connecting via grpc @ %s for %s ...", endpoint, purpose)) return grpcx.Dial(ctx, endpoint, timeout) } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index f10f0d92e84ea..76d4e1f32c23a 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -29,6 +29,8 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type writeTypeEnum int32 @@ -525,7 +527,12 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC return nil, err } ch.forceRecreate = func(id string, err error) { - log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err) + switch status.Code(err) { + case codes.Canceled: + // Don't log on context canceled path. + default: + log.Warnf(ctx, "forcing StateChannel[%v] reconnection on port %v due to %v", id, port, err) + } m.mu.Lock() delete(m.ports, port.URL) m.mu.Unlock() diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go b/sdks/go/pkg/beam/core/runtime/symbols.go index e8ff532e76373..84afe9b769af3 100644 --- a/sdks/go/pkg/beam/core/runtime/symbols.go +++ b/sdks/go/pkg/beam/core/runtime/symbols.go @@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (any, error) { type failResolver bool func (p failResolver) Sym2Addr(name string) (uintptr, error) { - return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) + return 0, errors.Errorf("%v not found. Register DoFns and functions with the the beam/register package.", name) } diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go index d2bd554963ee6..91e9f335ef870 100644 --- a/sdks/go/pkg/beam/create.go +++ b/sdks/go/pkg/beam/create.go @@ -112,6 +112,11 @@ func createList(s Scope, values []any, t reflect.Type) (PCollection, error) { // TODO(herohde) 6/26/2017: make 'create' a SDF once supported. See BEAM-2421. +func init() { + register.DoFn2x1[[]byte, func(T), error]((*createFn)(nil)) + register.Emitter1[T]() +} + type createFn struct { Values [][]byte `json:"values"` Type EncodedType `json:"type"` diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go index 9033979d05028..785c3b33db621 100644 --- a/sdks/go/pkg/beam/create_test.go +++ b/sdks/go/pkg/beam/create_test.go @@ -75,8 +75,8 @@ func TestCreateList(t *testing.T) { {[]float64{float64(0.1), float64(0.2), float64(0.3)}}, {[]uint{uint(1), uint(2), uint(3)}}, {[]bool{false, true, true, false, true}}, - {[]wc{wc{"a", 23}, wc{"b", 42}, wc{"c", 5}}}, - {[]*testProto{&testProto{}, &testProto{stringValue("test")}}}, // Test for BEAM-4401 + {[]wc{{"a", 23}, {"b", 42}, {"c", 5}}}, + {[]*testProto{{}, {stringValue("test")}}}, // Test for BEAM-4401 } for _, test := range tests { diff --git a/sdks/go/pkg/beam/io/databaseio/database_test.go b/sdks/go/pkg/beam/io/databaseio/database_test.go index b93d5c9da727b..f6c1355e851a3 100644 --- a/sdks/go/pkg/beam/io/databaseio/database_test.go +++ b/sdks/go/pkg/beam/io/databaseio/database_test.go @@ -22,6 +22,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" _ "github.com/proullon/ramsql/driver" diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go index 345eaa2a59ef7..b95439e2d56dc 100644 --- a/sdks/go/pkg/beam/io/datastoreio/datastore_test.go +++ b/sdks/go/pkg/beam/io/datastoreio/datastore_test.go @@ -64,6 +64,11 @@ type Foo struct { type Bar struct { } +func init() { + beam.RegisterType(reflect.TypeOf((*Foo)(nil)).Elem()) + beam.RegisterType(reflect.TypeOf((*Bar)(nil)).Elem()) +} + func Test_query(t *testing.T) { testCases := []struct { v any diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go index b88a6d642ea93..56ed7e3e9fa66 100644 --- a/sdks/go/pkg/beam/pardo_test.go +++ b/sdks/go/pkg/beam/pardo_test.go @@ -72,9 +72,9 @@ func testFunction() int64 { func TestFormatParDoError(t *testing.T) { got := formatParDoError(testFunction, 2, 1) - want := "beam.testFunction has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead." + want := "has 2 outputs, but ParDo requires 1 outputs, use ParDo2 instead." if !strings.Contains(got, want) { - t.Errorf("formatParDoError(testFunction,2,1) = %v, want = %v", got, want) + t.Errorf("formatParDoError(testFunction,2,1) = \n%q want =\n%q", got, want) } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index c8721e1a20795..5e1585ffcd1f4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -707,7 +707,6 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { ready := true for _, side := range ss.sides { pID, ok := em.pcolParents[side] - // These panics indicate pre-process/stage construction problems. if !ok { panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side)) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go index 27303f03b705b..05b3d3bbaa0ea 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -43,7 +43,7 @@ import ( type RunnerCharacteristic struct { SDKFlatten bool // Sets whether we should force an SDK side flatten. SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK. - SDKReshuffle bool + SDKReshuffle bool // Sets whether we should use the SDK backup implementation to handle a Reshuffle. } func Runner(config any) *runner { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index e6fe28714b7f3..1a9c2548df83d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -50,6 +50,10 @@ type link struct { // should in principle be able to connect two SDK environments directly // instead of going through the runner at all, which would be a small // efficiency gain, in runner memory use. +// +// That would also warrant an execution mode where fusion is taken into +// account, but all serialization boundaries remain since the pcollections +// would continue to get serialized. type stage struct { ID string transforms []string @@ -145,11 +149,11 @@ progress: if previousIndex == index && !splitsDone { sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) if err != nil { - slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) + slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) break progress } if sr.GetChannelSplits() == nil { - slog.Warn("split failed", "bundle", rb) + slog.Debug("SDK returned no splits", "bundle", rb) splitsDone = true continue progress } diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index f738a299cfd20..5f8d387599982 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -41,7 +41,6 @@ func TestUnimplemented(t *testing.T) { tests := []struct { pipeline func(s beam.Scope) }{ - // These tests don't terminate, so can't be run. // {pipeline: primitives.Drain}, // Can't test drain automatically yet. {pipeline: primitives.TestStreamBoolSequence}, diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 30515fa6f6e83..c931655f000b0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -145,6 +145,7 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +// Progress sends a progress request for the given bundle to the passed in worker, blocking on the response. func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) { resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleProgress{ @@ -159,6 +160,7 @@ func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) { return resp.GetProcessBundleProgress(), nil } +// Split sends a split request for the given bundle to the passed in worker, blocking on the response. func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) { resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleSplit{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index eefab54a54cc2..a1d0ff79baf14 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -256,7 +256,6 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { // TODO: Do more than assume these are ProcessBundleResponses. wk.mu.Lock() if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { - // Error is handled in the resonse handler. b.Respond(resp) } else { slog.Debug("ctrl.Recv: %v", resp) @@ -268,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { for { select { case req := <-wk.InstReqs: - ctrl.Send(req) + err := ctrl.Send(req) + if err != nil { + return err + } case <-ctrl.Context().Done(): slog.Debug("Control context canceled") return ctrl.Context().Err() @@ -322,7 +324,6 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { wk.mu.Unlock() } }() - for { select { case req, ok := <-wk.DataReqs: diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index 8cbb274e184f6..4e50661b3db8e 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -103,7 +103,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID return errors.Wrap(err, "failed to get job stream") } - mostRecentError := errors.New("") + mostRecentError := "" var errReceived, jobFailed bool for { @@ -111,8 +111,8 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID if err != nil { if err == io.EOF { if jobFailed { - // Connection finished with a failed status, so produce what we have. - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + // Connection finished, so time to exit, produce what we have. + return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError) } return nil } @@ -123,7 +123,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case msg.GetStateResponse() != nil: resp := msg.GetStateResponse() - log.Infof(ctx, "Job state: %v", resp.GetState().String()) + log.Infof(ctx, "Job[%v] state: %v", jobID, resp.GetState().String()) switch resp.State { case jobpb.JobState_DONE, jobpb.JobState_CANCELLED: @@ -131,9 +131,9 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID case jobpb.JobState_FAILED: jobFailed = true if errReceived { - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + return errors.Errorf("job %v failed:\n%v", jobID, mostRecentError) } - // Otherwise, wait for at least one error log from the runner, or the connection to close. + // Otherwise we should wait for at least one error log from the runner. } case msg.GetMessageResponse() != nil: @@ -144,10 +144,10 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID if resp.GetImportance() >= jobpb.JobMessage_JOB_MESSAGE_ERROR { errReceived = true - mostRecentError = errors.New(resp.GetMessageText()) + mostRecentError = resp.GetMessageText() if jobFailed { - return errors.Errorf("job %v failed:\n%w", jobID, mostRecentError) + return errors.Errorf("job %v failed:\n%w", jobID, errors.New(mostRecentError)) } } diff --git a/sdks/go/pkg/beam/runners/vet/vet.go b/sdks/go/pkg/beam/runners/vet/vet.go index 131fa0b1ec125..739f5db61c5bc 100644 --- a/sdks/go/pkg/beam/runners/vet/vet.go +++ b/sdks/go/pkg/beam/runners/vet/vet.go @@ -54,7 +54,7 @@ func init() { type disabledResolver bool func (p disabledResolver) Sym2Addr(name string) (uintptr, error) { - return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name) + return 0, errors.Errorf("%v not found. Register DoFns and functions with the beam/register package.", name) } // Execute evaluates the pipeline on whether it can run without reflection. diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 0f9e5984eadde..f66cc1f53bfc2 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -136,7 +136,6 @@ var portableFilters = []string{ "TestSetStateClear", } -// TODO(lostluck): set up a specific run for these. var prismFilters = []string{ // The prism runner does not support the TestStream primitive "TestTestStream.*", @@ -149,7 +148,7 @@ var prismFilters = []string{ // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. "TestBigQueryIO.*", "TestSpannerIO.*", - // The prsim runner does not support pipeline drain for SDF. + // The prism runner does not support pipeline drain for SDF. "TestDrain", // FhirIO currently only supports Dataflow runner "TestFhirIO.*", diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 82ada91f26a4b..b0118df39872d 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -1124,6 +1124,9 @@ words = ... {{< /highlight >}} {{< highlight go >}} + +The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner. + // words is the input PCollection of strings var words beam.PCollection = ... @@ -1170,8 +1173,8 @@ words = ... {{< /highlight >}} {{< highlight go >}} -// words is the input PCollection of strings -var words beam.PCollection = ... + +The Go SDK cannot support anonymous functions outside of the deprecated Go Direct runner. {{< code_sample "sdks/go/examples/snippets/04transforms.go" model_pardo_apply_anon >}} {{< /highlight >}} @@ -1191,7 +1194,7 @@ words = ... -> **Note:** Anonymous function DoFns may not work on distributed runners. +> **Note:** Anonymous function DoFns do not work on distributed runners. > It's recommended to use named functions and register them with `register.FunctionXxY` in > an `init()` block.