Skip to content

Commit

Permalink
[#29917][prism] Initial TestStream support (#30072)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Feb 16, 2024
1 parent 0d46e30 commit 48adde9
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 50 deletions.
99 changes: 73 additions & 26 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type ElementManager struct {

livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.

testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
}

func (em *ElementManager) addPending(v int) {
Expand Down Expand Up @@ -223,6 +225,15 @@ func (em *ElementManager) StageStateful(ID string) {
em.stages[ID].stateful = true
}

// AddTestStream provides a builder interface for the execution layer to build the test stream from
// the protos.
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
impl := &testStreamImpl{em: em}
impl.initHandler(id)
impl.TagsToPCollections(tagToPCol)
return impl
}

// Impulse marks and initializes the given stage as an impulse which
// is a root transform that starts processing.
func (em *ElementManager) Impulse(stageID string) {
Expand Down Expand Up @@ -319,37 +330,72 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
em.refreshCond.L.Lock()
}
}
if len(em.inprogressBundles) == 0 && len(em.watermarkRefreshes) == 0 {
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
if v > 0 {
var stageState []string
ids := maps.Keys(em.stages)
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
} else if len(em.inprogressBundles) == 0 {
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
}
em.refreshCond.L.Unlock()
em.checkForQuiescence(advanced)
}
}()
return runStageCh
}

// checkForQuiescence sees if this element manager is no longer able to do any pending work or make progress.
//
// Quiescense can happen if there are no inprogress bundles, and there are no further watermark refreshes, which
// are the only way to access new pending elements. If there are no pending elements, then the pipeline will
// terminate successfully.
//
// Otherwise, produce information for debugging why the pipeline is stuck and take appropriate action, such as
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
func (em *ElementManager) checkForQuiescence(advanced set[string]) {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
return
}
// The job has quiesced!

// There are no further incoming watermark changes, see if there are test stream events for this job.
nextEvent := em.testStreamHandler.NextEvent()
if nextEvent != nil {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
return
}

v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be terminating successfully.
return
}
// The job is officially stuck. Fail fast and produce debugging information.
// Jobs must never get stuck so this indicates a bug in prism to be investigated.

slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
var stageState []string
ids := maps.Keys(em.stages)
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}

// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
// the PCollection's coders.
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte {
Expand Down Expand Up @@ -429,6 +475,7 @@ const (
BlockTimer // BlockTimer represents timers for the bundle.
)

// Block represents a contiguous set of data or timers for the same destination.
type Block struct {
Kind BlockKind
Bytes [][]byte
Expand Down
47 changes: 47 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,50 @@ func TestElementManagerCoverage(t *testing.T) {
})
}
}

func TestTestStream(t *testing.T) {
initRunner(t)

tests := []struct {
pipeline func(s beam.Scope)
}{
{pipeline: primitives.TestStreamBoolSequence},
{pipeline: primitives.TestStreamByteSliceSequence},
{pipeline: primitives.TestStreamFloat64Sequence},
{pipeline: primitives.TestStreamInt64Sequence},
{pipeline: primitives.TestStreamInt16Sequence},
{pipeline: primitives.TestStreamStrings},
{pipeline: primitives.TestStreamTwoBoolSequences},
{pipeline: primitives.TestStreamTwoFloat64Sequences},
{pipeline: primitives.TestStreamTwoInt64Sequences},
{pipeline: primitives.TestStreamTwoUserTypeSequences},
}

configs := []struct {
name string
OneElementPerKey, OneKeyPerBundle bool
}{
{"Greedy", false, false},
{"AllElementsPerKey", false, true},
{"OneElementPerKey", true, false},
{"OneElementPerBundle", true, true},
}
for _, config := range configs {
for _, test := range tests {
t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) {
t.Cleanup(func() {
engine.OneElementPerKey = false
engine.OneKeyPerBundle = false
})
engine.OneElementPerKey = config.OneElementPerKey
engine.OneKeyPerBundle = config.OneKeyPerBundle
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
_, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err)
}
})
}
}
}
Loading

0 comments on commit 48adde9

Please sign in to comment.