diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 76c60e810d40..2c4e08bcd094 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -34,6 +34,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "golang.org/x/exp/maps" "golang.org/x/exp/slog" ) @@ -290,7 +291,7 @@ func (rb RunBundle) LogValue() slog.Value { // Bundles is the core execution loop. It produces a sequences of bundles able to be executed. // The returned channel is closed when the context is canceled, or there are no pending elements // remaining. -func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) <-chan RunBundle { +func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle { runStageCh := make(chan RunBundle) ctx, cancelFn := context.WithCancelCause(ctx) go func() { @@ -384,7 +385,9 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) } } } - em.checkForQuiescence(advanced) + if err := em.checkForQuiescence(advanced); err != nil { + upstreamCancelFn(err) + } } }() return runStageCh @@ -400,11 +403,11 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) // executing off the next TestStream event. // // Must be called while holding em.refreshCond.L. -func (em *ElementManager) checkForQuiescence(advanced set[string]) { +func (em *ElementManager) checkForQuiescence(advanced set[string]) error { defer em.refreshCond.L.Unlock() if len(em.inprogressBundles) > 0 { // If there are bundles in progress, then there may be watermark refreshes when they terminate. - return + return nil } if len(em.watermarkRefreshes) > 0 { // If there are watermarks to refresh, we aren't yet stuck. @@ -414,12 +417,12 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { slog.Int("refreshCount", len(em.watermarkRefreshes)), slog.Int64("pendingElementCount", v), ) - return + return nil } if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 { // If there's no test stream involved, and processing time events exist, then // it's only a matter of time. - return + return nil } // The job has quiesced! @@ -433,12 +436,12 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { // Note: it's a prism bug if test stream never causes a refresh to occur for a given event. // It's not correct to move to the next event if no refreshes would occur. if len(em.watermarkRefreshes) > 0 { - return + return nil } else if _, ok := nextEvent.(tsProcessingTimeEvent); ok { // It's impossible to fully control processing time SDK side handling for processing time // Runner side, so we specialize refresh handling here to avoid spuriously getting stuck. em.watermarkRefreshes.insert(em.testStreamHandler.ID) - return + return nil } // If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail. } @@ -446,7 +449,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { v := em.livePending.Load() if v == 0 { // Since there are no further pending elements, the job will be terminating successfully. - return + return nil } // The job is officially stuck. Fail fast and produce debugging information. // Jobs must never get stuck so this indicates a bug in prism to be investigated. @@ -469,7 +472,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { upS := em.pcolParents[upPCol] stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire)) } - panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))) + return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")) } // InputForBundle returns pre-allocated data for the given bundle, encoding the elements using diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index 275dd790d2b1..d5904b13fb88 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -316,6 +316,7 @@ func TestStageState_updateWatermarks(t *testing.T) { func TestElementManager(t *testing.T) { t.Run("impulse", func(t *testing.T) { + ctx, cancelFn := context.WithCancelCause(context.Background()) em := NewElementManager(Config{}) em.AddStage("impulse", nil, []string{"output"}, nil) em.AddStage("dofn", []string{"output"}, nil, nil) @@ -327,7 +328,7 @@ func TestElementManager(t *testing.T) { } var i int - ch := em.Bundles(context.Background(), func() string { + ch := em.Bundles(ctx, cancelFn, func() string { defer func() { i++ }() return fmt.Sprintf("%v", i) }) @@ -371,6 +372,7 @@ func TestElementManager(t *testing.T) { } t.Run("dofn", func(t *testing.T) { + ctx, cancelFn := context.WithCancelCause(context.Background()) em := NewElementManager(Config{}) em.AddStage("impulse", nil, []string{"input"}, nil) em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil) @@ -378,7 +380,7 @@ func TestElementManager(t *testing.T) { em.Impulse("impulse") var i int - ch := em.Bundles(context.Background(), func() string { + ch := em.Bundles(ctx, cancelFn, func() string { defer func() { i++ }() t.Log("generating bundle", i) return fmt.Sprintf("%v", i) @@ -422,6 +424,7 @@ func TestElementManager(t *testing.T) { }) t.Run("side", func(t *testing.T) { + ctx, cancelFn := context.WithCancelCause(context.Background()) em := NewElementManager(Config{}) em.AddStage("impulse", nil, []string{"input"}, nil) em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil) @@ -429,7 +432,7 @@ func TestElementManager(t *testing.T) { em.Impulse("impulse") var i int - ch := em.Bundles(context.Background(), func() string { + ch := em.Bundles(ctx, cancelFn, func() string { defer func() { i++ }() t.Log("generating bundle", i) return fmt.Sprintf("%v", i) @@ -473,13 +476,14 @@ func TestElementManager(t *testing.T) { } }) t.Run("residual", func(t *testing.T) { + ctx, cancelFn := context.WithCancelCause(context.Background()) em := NewElementManager(Config{}) em.AddStage("impulse", nil, []string{"input"}, nil) em.AddStage("dofn", []string{"input"}, nil, nil) em.Impulse("impulse") var i int - ch := em.Bundles(context.Background(), func() string { + ch := em.Bundles(ctx, cancelFn, func() string { defer func() { i++ }() t.Log("generating bundle", i) return fmt.Sprintf("%v", i) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 7276b725a7e0..08ab9f687c5f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -335,7 +335,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic eg.SetLimit(8) var instID uint64 - bundles := em.Bundles(egctx, func() string { + bundles := em.Bundles(egctx, j.CancelFn, func() string { return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1)) }) for {