diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go index aaa049510f525..0a9343199a1ca 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go @@ -251,6 +251,9 @@ func (s *decodeStream) Read() (*FullValue, error) { } err := s.d.DecodeTo(s.r, &s.ret) if err != nil { + if err == io.EOF { + return nil, io.EOF + } return nil, errors.Wrap(err, "decodeStream value decode failed") } s.next++ @@ -342,6 +345,9 @@ func (s *decodeMultiChunkStream) Read() (*FullValue, error) { if s.chunk == 0 && s.next == 0 { chunk, err := coder.DecodeVarInt(s.r.reader) if err != nil { + if err == io.EOF { + return nil, io.EOF + } return nil, errors.Wrap(err, "decodeMultiChunkStream chunk size decoding failed") } s.chunk = chunk diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index d4fb6ad5b3e1b..7d54cb366ffeb 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -99,6 +99,7 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo pool.StopWorker(context.Background(), &fnpb.StopWorkerRequest{ WorkerId: wk.ID, }) + wk.Stop() } func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.DockerPayload, wk *worker.W, artifactEndpoint string) error { @@ -170,6 +171,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock // Start goroutine to wait on container state. go func() { defer cli.Close() + defer wk.Stop() statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) select { diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index cf04381b9cbe3..c1ac6ea4488c2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "sort" + "sync/atomic" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -46,15 +47,14 @@ func RunPipeline(j *jobservices.Job) { // here, we only want and need the go one, operating // in loopback mode. envs := j.Pipeline.GetComponents().GetEnvironments() - if len(envs) != 1 { - j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs)) - return - } - env, _ := getOnlyPair(envs) - wk, err := makeWorker(env, j) - if err != nil { - j.Failed(err) - return + wks := map[string]*worker.W{} + for envID := range envs { + wk, err := makeWorker(envID, j) + if err != nil { + j.Failed(err) + return + } + wks[envID] = wk } // When this function exits, we cancel the context to clear // any related job resources. @@ -65,15 +65,12 @@ func RunPipeline(j *jobservices.Job) { j.SendMsg("running " + j.String()) j.Running() - if err := executePipeline(j.RootCtx, wk, j); err != nil { + if err := executePipeline(j.RootCtx, wks, j); err != nil { j.Failed(err) return } j.SendMsg("pipeline completed " + j.String()) - // Stop the worker. - wk.Stop() - j.SendMsg("terminating " + j.String()) j.Done() } @@ -95,7 +92,7 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { // Check for connection succeeding after we've created the environment successfully. timeout := 1 * time.Minute time.AfterFunc(timeout, func() { - if wk.Connected() { + if wk.Connected() || wk.Stopped() { return } err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout) @@ -115,7 +112,7 @@ type processor struct { transformExecuters map[string]transformExecuter } -func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) error { +func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservices.Job) error { pipeline := j.Pipeline comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components) @@ -158,7 +155,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro // TODO move this loop and code into the preprocessor instead. stages := map[string]*stage{} var impulses []string - for _, stage := range topo { + + // Inialize the "dataservice cache" to support side inputs. + // TODO(https://github.com/apache/beam/issues/28543), remove this concept. + ds := &worker.DataService{} + + for i, stage := range topo { tid := stage.transforms[0] t := ts[tid] urn := t.GetSpec().GetUrn() @@ -169,11 +171,11 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro if stage.exe != nil { stage.envID = stage.exe.ExecuteWith(t) } - stage.ID = wk.NextStage() + stage.ID = fmt.Sprintf("stage-%03d", i) + wk := wks[stage.envID] switch stage.envID { case "": // Runner Transforms - var onlyOut string for _, out := range t.GetOutputs() { onlyOut = out @@ -232,10 +234,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro em.AddStage(stage.ID, inputs, nil, []string{getOnlyValue(t.GetOutputs())}) } stages[stage.ID] = stage - wk.Descriptors[stage.ID] = stage.desc case wk.Env: - // Great! this is for this environment. // Broken abstraction. - if err := buildDescriptor(stage, comps, wk); err != nil { + if err := buildDescriptor(stage, comps, wk, ds); err != nil { return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err) } stages[stage.ID] = stage @@ -259,7 +259,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro maxParallelism := make(chan struct{}, 8) // Execute stages here bundleFailed := make(chan error) - bundles := em.Bundles(ctx, wk.NextInst) + + var instID uint64 + bundles := em.Bundles(ctx, func() string { + return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1)) + }) + for { select { case <-ctx.Done(): @@ -273,7 +278,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro go func(rb engine.RunBundle) { defer func() { <-maxParallelism }() s := stages[rb.StageID] - if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil { + wk := wks[s.envID] + if err := s.Execute(ctx, j, wk, ds, comps, em, rb); err != nil { // Ensure we clean up on bundle failure em.FailBundle(rb) bundleFailed <- err diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index 213e33a783795..0fd7381e17f4b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -93,14 +93,18 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo return nil, err } var errs []error - check := func(feature string, got, want any) { - if got != want { - err := unimplementedError{ - feature: feature, - value: got, + check := func(feature string, got any, wants ...any) { + for _, want := range wants { + if got == want { + return } - errs = append(errs, err) } + + err := unimplementedError{ + feature: feature, + value: got, + } + errs = append(errs, err) } // Inspect Transforms for unsupported features. @@ -114,6 +118,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo urns.TransformGBK, urns.TransformFlatten, urns.TransformCombinePerKey, + urns.TransformCombineGlobally, // Used by Java SDK + urns.TransformCombineGroupedValues, // Used by Java SDK urns.TransformAssignWindows: // Very few expected transforms types for submitted pipelines. // Most URNs are for the runner to communicate back to the SDK for execution. @@ -154,7 +160,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING) } if !bypassedWindowingStrategies[wsID] { - check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY) + check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS) check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW) // Non nil triggers should fail. if ws.GetTrigger().GetDefault() == nil { diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 4d8d4621168de..4ce3ce7ffeb6e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -75,7 +75,7 @@ type stage struct { OutputsToCoders map[string]engine.PColInfo } -func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error { +func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, ds *worker.DataService, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error { slog.Debug("Execute: starting bundle", "bundle", rb) var b *worker.B @@ -204,8 +204,8 @@ progress: md := wk.MonitoringMetadata(ctx, unknownIDs) j.AddMetricShortIDs(md) } - // TODO handle side input data properly. - wk.D.Commit(b.OutputData) + // TODO(https://github.com/apache/beam/issues/28543) handle side input data properly. + ds.Commit(b.OutputData) var residualData [][]byte var minOutputWatermark map[string]mtime.Time for _, rr := range resp.GetResidualRoots() { @@ -270,7 +270,7 @@ func portFor(wInCid string, wk *worker.W) []byte { // It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage. // // Because we need the local ids for routing the sources/sinks information. -func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { +func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, ds *worker.DataService) error { // Assume stage has an indicated primary input coders := map[string]*pipepb.Coder{} @@ -327,7 +327,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { // Update side inputs to point to new PCollection with any replaced coders. transforms[si.transform].GetInputs()[si.local] = newGlobal } - prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, wk) + prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, ds) if err != nil { slog.Error("buildDescriptor: handleSideInputs", err, slog.String("transformID", si.transform)) return err @@ -392,7 +392,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { } // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. -func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) { +func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, ds *worker.DataService) (func(b *worker.B, watermark mtime.Time), error) { t := comps.GetTransforms()[tid] sis, err := getSideInputs(t) if err != nil { @@ -412,7 +412,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders global, local := global, local return func(b *worker.B, watermark mtime.Time) { - data := wk.D.GetAllData(global) + data := ds.GetAllData(global) if b.IterableSideInputData == nil { b.IterableSideInputData = map[string]map[string]map[typex.Window][][]byte{} @@ -447,7 +447,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders global, local := global, local return func(b *worker.B, watermark mtime.Time) { // May be of zero length, but that's OK. Side inputs can be empty. - data := wk.D.GetAllData(global) + data := ds.GetAllData(global) if b.MultiMapSideInputData == nil { b.MultiMapSideInputData = map[string]map[string]map[typex.Window]map[string][][]byte{} } diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 9fc2c1a923c5d..bf1e36656661b 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -57,7 +57,9 @@ var ( // SDK transforms. TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO) TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY) + TransformCombineGlobally = ctUrn(pipepb.StandardPTransforms_COMBINE_GLOBALLY) TransformReshuffle = ctUrn(pipepb.StandardPTransforms_RESHUFFLE) + TransformCombineGroupedValues = cmbtUrn(pipepb.StandardPTransforms_COMBINE_GROUPED_VALUES) TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE) TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS) TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 98479e3db0710..573bdf4aeb9db 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -93,7 +93,7 @@ func (b *B) Respond(resp *fnpb.InstructionResponse) { } b.responded = true if resp.GetError() != "" { - b.BundleErr = fmt.Errorf("bundle %v failed:%v", resp.GetInstructionId(), resp.GetError()) + b.BundleErr = fmt.Errorf("bundle %v %v failed:%v", resp.GetInstructionId(), b.PBDID, resp.GetError()) close(b.Resp) return } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index f33ff178c46d4..4968c9eb433e3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -67,7 +67,7 @@ type W struct { server *grpc.Server // These are the ID sources - inst, bund uint64 + inst uint64 connected, stopped atomic.Bool InstReqs chan *fnpb.InstructionRequest @@ -76,8 +76,6 @@ type W struct { mu sync.Mutex activeInstructions map[string]controlResponder // Active instructions keyed by InstructionID Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID - - D *DataService } type controlResponder interface { @@ -104,8 +102,6 @@ func New(id, env string) *W { activeInstructions: make(map[string]controlResponder), Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor), - - D: &DataService{}, } slog.Debug("Serving Worker components", slog.String("endpoint", wk.Endpoint())) fnpb.RegisterBeamFnControlServer(wk.server, wk) @@ -149,11 +145,7 @@ func (wk *W) Stop() { } func (wk *W) NextInst() string { - return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1)) -} - -func (wk *W) NextStage() string { - return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1)) + return fmt.Sprintf("inst-%v-%03d", wk.Env, atomic.AddUint64(&wk.inst, 1)) } // TODO set logging level. @@ -263,6 +255,11 @@ func (wk *W) Connected() bool { return wk.connected.Load() } +// Stopped indicates that the worker has stopped. +func (wk *W) Stopped() bool { + return wk.stopped.Load() +} + // Control relays instructions to SDKs and back again, coordinated via unique instructionIDs. // // Requests come from the runner, and are sent to the client in the SDK. @@ -312,10 +309,12 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { wk.mu.Lock() // Fail extant instructions slog.Debug("SDK Disconnected", "worker", wk, "ctx_error", ctrl.Context().Err(), "outstanding_instructions", len(wk.activeInstructions)) + + msg := fmt.Sprintf("SDK worker disconnected: %v, %v active instructions", wk.String(), len(wk.activeInstructions)) for instID, b := range wk.activeInstructions { b.Respond(&fnpb.InstructionResponse{ InstructionId: instID, - Error: "SDK Disconnected", + Error: msg, }) } wk.mu.Unlock() @@ -536,7 +535,7 @@ func (wk *W) sendInstruction(ctx context.Context, req *fnpb.InstructionRequest) req.InstructionId = progInst - if wk.stopped.Load() { + if wk.Stopped() { return nil } wk.InstReqs <- req @@ -566,6 +565,7 @@ func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb. // DataService is slated to be deleted in favour of stage based state // management for side inputs. +// TODO(https://github.com/apache/beam/issues/28543), remove this concept. type DataService struct { mu sync.Mutex // TODO actually quick process the data to windows here as well. diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go index ed61f484481ca..6a90b463c45d8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go @@ -50,18 +50,6 @@ func TestWorker_NextInst(t *testing.T) { } } -func TestWorker_NextStage(t *testing.T) { - w := New("test", "testEnv") - - stageIDs := map[string]struct{}{} - for i := 0; i < 100; i++ { - stageIDs[w.NextStage()] = struct{}{} - } - if got, want := len(stageIDs), 100; got != want { - t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want) - } -} - func TestWorker_GetProcessBundleDescriptor(t *testing.T) { w := New("test", "testEnv") @@ -189,7 +177,7 @@ func TestWorker_Data_HappyPath(t *testing.T) { b := &B{ InstID: instID, - PBDID: wk.NextStage(), + PBDID: "teststageID", InputData: [][]byte{ {1, 1, 1, 1, 1, 1}, }, diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index bb7f5275a1638..f3cffd1761109 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -140,8 +140,8 @@ var portableFilters = []string{ } var prismFilters = []string{ - // The prism runner does not yet support cross-language. - "TestXLang.*", + // The prism runner does not yet support Java's CoGBK. + "TestXLang_CoGroupBy", // The prism runner does not support the TestStream primitive "TestTestStream.*", // The trigger and pane tests uses TestStream