diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 28ea75ac9e52..ba0ab6f13d27 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -153,7 +153,8 @@ type Config struct { type ElementManager struct { config Config - stages map[string]*stageState // The state for each stage. + impulses set[string] // List of impulse stages. + stages map[string]*stageState // The state for each stage. consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input. sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input. @@ -254,6 +255,14 @@ func (em *ElementManager) Impulse(stageID string) { em.addPending(count) } refreshes := stage.updateWatermarks(em) + + // Since impulses are synthetic, we need to simulate them properly + // if a pipeline is only test stream driven. + if em.impulses == nil { + em.impulses = refreshes + } else { + em.impulses.merge(refreshes) + } em.addRefreshes(refreshes) } @@ -286,6 +295,13 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string) // Watermark evaluation goroutine. go func() { defer close(runStageCh) + + // If we have a test stream, clear out existing refreshes, so the test stream can + // insert any elements it needs. + if em.testStreamHandler != nil { + em.watermarkRefreshes = singleSet(em.testStreamHandler.ID) + } + for { em.refreshCond.L.Lock() // If there are no watermark refreshes available, we wait until there are. @@ -370,7 +386,13 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { nextEvent.Execute(em) // Decrement pending for the event being processed. em.addPending(-1) - return + // If there are refreshes scheduled, then test stream permitted execution to continue. + // Note: it's a prism bug if test stream never causes a refresh to occur for a given event. + // It's not correct to move to the next event if no refreshes would occur. + if len(em.watermarkRefreshes) > 0 { + return + } + // If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail. } v := em.livePending.Load() diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go index 04269e3dd6af..0c042d731d6a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go @@ -186,6 +186,11 @@ func TestTestStream(t *testing.T) { {pipeline: primitives.TestStreamTwoFloat64Sequences}, {pipeline: primitives.TestStreamTwoInt64Sequences}, {pipeline: primitives.TestStreamTwoUserTypeSequences}, + + {pipeline: primitives.TestStreamSimple}, + {pipeline: primitives.TestStreamSimple_InfinityDefault}, + {pipeline: primitives.TestStreamToGBK}, + {pipeline: primitives.TestStreamTimersEventTime}, } configs := []struct { diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index c0a0ff8ebe7d..f0350064d526 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -16,6 +16,7 @@ package engine import ( + "container/heap" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -46,13 +47,15 @@ type testStreamHandler struct { tagState map[string]tagState // Map from event tag to related outputs. - completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once. + currentHold mtime.Time // indicates if the default watermark hold has been lifted. + completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once. } func makeTestStreamHandler(id string) *testStreamHandler { return &testStreamHandler{ - ID: id, - tagState: map[string]tagState{}, + ID: id, + tagState: map[string]tagState{}, + currentHold: mtime.MinTimestamp, } } @@ -124,6 +127,35 @@ func (ts *testStreamHandler) NextEvent() tsEvent { return ev } +// UpdateHold restrains the watermark based on upcoming elements in the test stream queue +// This uses the element manager's normal hold mechnanisms to avoid premature pipeline termination, +// when there are still remaining events to process. +func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time) { + if ts == nil { + return + } + + ss := em.stages[ts.ID] + ss.mu.Lock() + defer ss.mu.Unlock() + + if ss.watermarkHoldsCounts[ts.currentHold] > 0 { + heap.Pop(&ss.watermarkHoldHeap) + ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1 + } + ts.currentHold = newHold + heap.Push(&ss.watermarkHoldHeap, ts.currentHold) + ss.watermarkHoldsCounts[ts.currentHold] = 1 + + // kick the TestStream and Impulse stages too. + kick := singleSet(ts.ID) + kick.merge(em.impulses) + + // This executes under the refreshCond lock, so we can't call em.addRefreshes. + em.watermarkRefreshes.merge(kick) + em.refreshCond.Broadcast() +} + // TestStreamElement wraps the provided bytes and timestamp for ingestion and use. type TestStreamElement struct { Encoded []byte @@ -195,6 +227,8 @@ func (ev tsWatermarkEvent) Execute(em *ElementManager) { ss.updateUpstreamWatermark(ss.inputID, t.watermark) em.watermarkRefreshes.insert(sID) } + // Clear the default hold after the inserts have occured. + em.testStreamHandler.UpdateHold(em, t.watermark) } // tsProcessingTimeEvent implements advancing the synthetic processing time. @@ -215,7 +249,7 @@ type tsFinalEvent struct { } func (ev tsFinalEvent) Execute(em *ElementManager) { - em.addPending(1) // We subtrack a pending after event execution, so add one now. + em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp) ss := em.stages[ev.stageID] kickSet := ss.updateWatermarks(em) kickSet.insert(ev.stageID) @@ -242,6 +276,13 @@ var ( func (tsi *testStreamImpl) initHandler(id string) { if tsi.em.testStreamHandler == nil { tsi.em.testStreamHandler = makeTestStreamHandler(id) + + ss := tsi.em.stages[id] + tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition. + + // Arrest the watermark initially to prevent terminal advancement. + heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold) + ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1 } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 35d23e7024a5..842c5fdfc19d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -73,9 +73,10 @@ type B struct { func (b *B) Init() { // We need to see final data and timer signals that match the number of // outputs the stage this bundle executes posesses - b.dataSema.Store(int32(b.OutputCount + len(b.HasTimers))) + outCap := int32(b.OutputCount + len(b.HasTimers)) + b.dataSema.Store(outCap) b.DataWait = make(chan struct{}) - if b.OutputCount == 0 { + if outCap == 0 { close(b.DataWait) // Can happen if there are no outputs for the bundle. } b.Resp = make(chan *fnpb.ProcessBundleResponse, 1) diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go index c8ba9b565c0f..404817fd4703 100644 --- a/sdks/go/test/integration/primitives/teststream.go +++ b/sdks/go/test/integration/primitives/teststream.go @@ -16,7 +16,10 @@ package primitives import ( + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" ) @@ -172,3 +175,74 @@ func TestStreamInt16Sequence(s beam.Scope) { passert.Count(s, col, "teststream int15", 3) passert.EqualsList(s, col, ele) } + +// panicIfNot42 panics if the value is not 42. +func panicIfNot42(v int) { + if v != 42 { + panic(fmt.Sprintf("got %v, want 42", v)) + } +} + +// dropKeyEmitValues drops the key and emits the value. +func dropKeyEmitValues(_ int, vs func(*int) bool, emit func(int)) { + var v int + for vs(&v) { + emit(v) + } +} + +func init() { + register.Function1x0(panicIfNot42) + register.Function3x0(dropKeyEmitValues) +} + +// TestStreamSimple is a trivial pipeline where teststream sends +// a single element to a DoFn that checks that it's received the value. +// Intended for runner validation. +func TestStreamSimple(s beam.Scope) { + con := teststream.NewConfig() + ele := []int{42} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + beam.ParDo0(s, panicIfNot42, col) +} + +// TestStreamSimple_InfinityDefault is the same trivial pipeline that +// validates that the watermark is automatically advanced to infinity +// even when the user doesn't set it. +// Intended for runner validation. +func TestStreamSimple_InfinityDefault(s beam.Scope) { + con := teststream.NewConfig() + ele := []int{42} + con.AddElementList(100, ele) + + col := teststream.Create(s, con) + beam.ParDo0(s, panicIfNot42, col) +} + +// TestStreamToGBK is a trivial pipeline where teststream sends +// a single element to a GBK. +func TestStreamToGBK(s beam.Scope) { + con := teststream.NewConfig() + ele := []int{42} + con.AddElementList(100, ele) + con.AdvanceWatermarkToInfinity() + + col := teststream.Create(s, con) + keyed := beam.AddFixedKey(s, col) + gbk := beam.GroupByKey(s, keyed) + dropped := beam.ParDo(s, dropKeyEmitValues, gbk) + beam.ParDo0(s, panicIfNot42, dropped) +} + +// TestStreamTimersEventTime validates event time timers in a test stream "driven" pipeline. +func TestStreamTimersEventTime(s beam.Scope) { + timersEventTimePipelineBuilder(func(s beam.Scope) beam.PCollection { + c := teststream.NewConfig() + c.AddElements(123456, []byte{42}) + c.AdvanceWatermarkToInfinity() + return teststream.Create(s, c) + })(s) +} diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go index b0144f148cb0..3bd1a7d4de45 100644 --- a/sdks/go/test/integration/primitives/teststream_test.go +++ b/sdks/go/test/integration/primitives/teststream_test.go @@ -71,3 +71,23 @@ func TestTestStreamTwoUserTypeSequences(t *testing.T) { integration.CheckFilters(t) ptest.BuildAndRun(t, TestStreamTwoUserTypeSequences) } + +func TestTestStreamSimple(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TestStreamSimple) +} + +func TestTestStreamSimple_InfinityDefault(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TestStreamSimple_InfinityDefault) +} + +func TestTestStreamToGBK(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TestStreamToGBK) +} + +func TestTestStreamTimersEventTime(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TestStreamTimersEventTime) +} diff --git a/sdks/go/test/integration/primitives/timers.go b/sdks/go/test/integration/primitives/timers.go index b4443296ecce..4b7f6e9765f0 100644 --- a/sdks/go/test/integration/primitives/timers.go +++ b/sdks/go/test/integration/primitives/timers.go @@ -93,7 +93,7 @@ func (fn *eventTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state. } } -// TimersEventTime takes in an impulse transform and produces a pipeline construction +// timersEventTimePipelineBuilder takes in an impulse transform and produces a pipeline construction // function that validates EventTime timers. // // The impulse is provided outside to swap between a bounded impulse, and