From 0781bc37db3c33c48e2de54a2aac5facbb43f7ec Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:26:14 -0700 Subject: [PATCH 1/2] [prism] Factor out hold tracking to dedicated structures --- .../prism/internal/engine/elementmanager.go | 77 ++---------- .../internal/engine/elementmanager_test.go | 2 +- .../runners/prism/internal/engine/holds.go | 105 ++++++++++++++++ .../prism/internal/engine/holds_test.go | 115 ++++++++++++++++++ .../prism/internal/engine/teststream.go | 12 +- 5 files changed, 236 insertions(+), 75 deletions(-) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/holds.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index e40f5513dae9..5d665edf2862 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -414,7 +414,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) { outW := ss.OutputWatermark() upPCol, upW := ss.UpstreamWatermark() upS := em.pcolParents[upPCol] - stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts)) + stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts)) } panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))) } @@ -706,18 +706,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol delete(stage.inprogressKeysByBundle, rb.BundleID) for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] { - n := stage.watermarkHoldsCounts[hold] - v - if n == 0 { - delete(stage.watermarkHoldsCounts, hold) - for i, h := range stage.watermarkHoldHeap { - if hold == h { - heap.Remove(&stage.watermarkHoldHeap, i) - break - } - } - } else { - stage.watermarkHoldsCounts[hold] = n - } + stage.watermarkHolds.Drop(hold, v) } delete(stage.inprogressHoldsByBundle, rb.BundleID) @@ -918,8 +907,7 @@ type stageState struct { // We track the count of timers with the same hold, and clear it from // the map and heap when the count goes to zero. // This avoids scanning the heap to remove or access a hold for each element. - watermarkHoldsCounts map[mtime.Time]int - watermarkHoldHeap holdHeap + watermarkHolds *holdTracker inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds. } @@ -940,37 +928,15 @@ type dataAndTimers struct { timers map[timerKey]timerTimes } -// holdHeap orders holds based on their timestamps -// so we can always find the minimum timestamp of pending holds. -type holdHeap []mtime.Time - -func (h holdHeap) Len() int { return len(h) } -func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] } -func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } - -func (h *holdHeap) Push(x any) { - // Push and Pop use pointer receivers because they modify the slice's length, - // not just its contents. - *h = append(*h, x.(mtime.Time)) -} - -func (h *holdHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - // makeStageState produces an initialized stageState. func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState { ss := &stageState{ - ID: ID, - outputIDs: outputIDs, - sides: sides, - strat: defaultStrat{}, - state: map[LinkID]map[typex.Window]map[string]StateData{}, - watermarkHoldsCounts: map[mtime.Time]int{}, + ID: ID, + outputIDs: outputIDs, + sides: sides, + strat: defaultStrat{}, + state: map[LinkID]map[typex.Window]map[string]StateData{}, + watermarkHolds: newHoldTracker(), input: mtime.MinTimestamp, output: mtime.MinTimestamp, @@ -1016,29 +982,13 @@ func (ss *stageState) AddPending(newPending []element) int { // don't increase the count this time, as "this" timer is already pending. count-- // clear out the existing hold for accounting purposes. - v := ss.watermarkHoldsCounts[lastSet.hold] - 1 - if v == 0 { - delete(ss.watermarkHoldsCounts, lastSet.hold) - for i, hold := range ss.watermarkHoldHeap { - if hold == lastSet.hold { - heap.Remove(&ss.watermarkHoldHeap, i) - break - } - } - } else { - ss.watermarkHoldsCounts[lastSet.hold] = v - } + ss.watermarkHolds.Drop(lastSet.hold, 1) } // Update the last set time on the timer. dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp} // Mark the hold in the heap. - ss.watermarkHoldsCounts[e.holdTimestamp] = ss.watermarkHoldsCounts[e.holdTimestamp] + 1 - - if len(ss.watermarkHoldsCounts) != len(ss.watermarkHoldHeap) { - // The hold should not be in the heap, so we add it. - heap.Push(&ss.watermarkHoldHeap, e.holdTimestamp) - } + ss.watermarkHolds.Add(e.holdTimestamp, 1) } } return count @@ -1308,10 +1258,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] { defer ss.mu.Unlock() minPending := ss.minPendingTimestampLocked() - minWatermarkHold := mtime.MaxTimestamp - if ss.watermarkHoldHeap.Len() > 0 { - minWatermarkHold = ss.watermarkHoldHeap[0] - } + minWatermarkHold := ss.watermarkHolds.Min() // PCollection watermarks are based on their parents's output watermark. _, newIn := ss.UpstreamWatermark() diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go index 7235508f164a..275dd790d2b1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go @@ -295,7 +295,7 @@ func TestStageState_updateWatermarks(t *testing.T) { ss.output = test.initOutput ss.updateUpstreamWatermark(inputCol, test.upstream) ss.pending = append(ss.pending, element{timestamp: test.minPending}) - ss.watermarkHoldHeap = append(ss.watermarkHoldHeap, test.minStateHold) + ss.watermarkHolds.Add(test.minStateHold, 1) ss.updateWatermarks(em) if got, want := ss.input, test.wantInput; got != want { pcol, up := ss.UpstreamWatermark() diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go new file mode 100644 index 000000000000..a6e7e562e3dd --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "container/heap" + "fmt" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +// holdHeap orders holds based on their timestamps +// so we can always find the minimum timestamp of pending holds. +type holdHeap []mtime.Time + +func (h holdHeap) Len() int { return len(h) } +func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *holdHeap) Push(x any) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(mtime.Time)) +} + +func (h *holdHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// holdTracker track the watermark holds for a stage. +// +// Timers hold back the watermark until they fire, but multiple +// timers may set the same watermark hold. +// To track when the watermark may advance further this structure maintains +// counts for each set watermark hold. +// As timers are processed, their associated holds are removed, reducing the counts. +// +// A heap of the hold times is kept so we have quick access to the minimum hold, for calculating +// how to advance the watermark. +type holdTracker struct { + heap holdHeap + counts map[mtime.Time]int +} + +func newHoldTracker() *holdTracker { + return &holdTracker{ + counts: map[mtime.Time]int{}, + } +} + +// Drop the given hold count. When the count of a hold time reaches zero, it's +// removed from the heap. Drop panics if holds become negative. +func (ht *holdTracker) Drop(hold mtime.Time, v int) { + n := ht.counts[hold] - v + if n == 0 { + delete(ht.counts, hold) + for i, h := range ht.heap { + if hold == h { + heap.Remove(&ht.heap, i) + break + } + } + } else if n > 0 { + ht.counts[hold] = n + } else { + panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold)) + } +} + +// Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added. +func (ht *holdTracker) Add(hold mtime.Time, v int) { + // Mark the hold in the heap. + ht.counts[hold] = ht.counts[hold] + v + + if len(ht.counts) != len(ht.heap) { + // Since there's a difference, the hold should not be in the heap, so we add it. + heap.Push(&ht.heap, hold) + } +} + +// Min returns the earliest hold in the heap. Returns [mtime.MaxTimestamp] if the heap is empty. +func (ht *holdTracker) Min() mtime.Time { + minWatermarkHold := mtime.MaxTimestamp + if len(ht.heap) > 0 { + minWatermarkHold = ht.heap[0] + } + return minWatermarkHold +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go new file mode 100644 index 000000000000..91de51bc1af0 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" +) + +func TestHoldTracker(t *testing.T) { + + type op func(*holdTracker) + add := func(hold mtime.Time, count int) op { + return func(ht *holdTracker) { + ht.Add(hold, count) + } + } + + drop := func(hold mtime.Time, count int) op { + return func(ht *holdTracker) { + ht.Drop(hold, count) + } + } + + tests := []struct { + name string + ops []op + wantMin mtime.Time + wantLen int + }{ + { + name: "zero-max", + wantMin: mtime.MaxTimestamp, + wantLen: 0, + }, { + + name: "one-min", + ops: []op{ + add(mtime.MinTimestamp, 1), + }, + wantMin: mtime.MinTimestamp, + wantLen: 1, + }, { + + name: "cleared-max", + ops: []op{ + add(mtime.MinTimestamp, 1), + drop(mtime.MinTimestamp, 1), + }, + wantMin: mtime.MaxTimestamp, + wantLen: 0, + }, { + name: "cleared-non-eogw", + ops: []op{ + add(mtime.MinTimestamp, 1), + add(mtime.EndOfGlobalWindowTime, 1), + drop(mtime.MinTimestamp, 1), + }, + wantMin: mtime.EndOfGlobalWindowTime, + wantLen: 1, + }, { + name: "uncleared-non-min", + ops: []op{ + add(mtime.MinTimestamp, 2), + add(mtime.EndOfGlobalWindowTime, 1), + drop(mtime.MinTimestamp, 1), + }, + wantMin: mtime.MinTimestamp, + wantLen: 2, + }, { + name: "uncleared-non-min", + ops: []op{ + add(1, 1), + add(2, 1), + add(3, 1), + drop(2, 1), + add(4, 1), + add(3, 1), + drop(1, 1), + add(2, 1), + drop(4, 1), + }, + wantMin: 2, + wantLen: 2, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tracker := newHoldTracker() + for _, op := range test.ops { + op(tracker) + } + if got, want := tracker.Min(), test.wantMin; got != want { + t.Errorf("tracker.heap.Min() = %v, want %v", got, want) + } + if got, want := tracker.heap.Len(), test.wantLen; got != want { + t.Errorf("tracker.heap.Len() = %v, want %v", got, want) + } + }) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go index f0350064d526..34b79d455ce1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go @@ -16,7 +16,6 @@ package engine import ( - "container/heap" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -139,13 +138,9 @@ func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time) ss.mu.Lock() defer ss.mu.Unlock() - if ss.watermarkHoldsCounts[ts.currentHold] > 0 { - heap.Pop(&ss.watermarkHoldHeap) - ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1 - } + ss.watermarkHolds.Drop(ts.currentHold, 1) ts.currentHold = newHold - heap.Push(&ss.watermarkHoldHeap, ts.currentHold) - ss.watermarkHoldsCounts[ts.currentHold] = 1 + ss.watermarkHolds.Add(ts.currentHold, 1) // kick the TestStream and Impulse stages too. kick := singleSet(ts.ID) @@ -281,8 +276,7 @@ func (tsi *testStreamImpl) initHandler(id string) { tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition. // Arrest the watermark initially to prevent terminal advancement. - heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold) - ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1 + ss.watermarkHolds.Add(tsi.em.testStreamHandler.currentHold, 1) } } From a558dabbc7479a9f36d971b99fe09bf44b7fb404 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 26 Apr 2024 13:19:14 -0700 Subject: [PATCH 2/2] review comment-reorder move code out of ladder. --- .../runners/prism/internal/engine/holds.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go index a6e7e562e3dd..9077b3f439d6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/holds.go @@ -69,19 +69,19 @@ func newHoldTracker() *holdTracker { // removed from the heap. Drop panics if holds become negative. func (ht *holdTracker) Drop(hold mtime.Time, v int) { n := ht.counts[hold] - v - if n == 0 { - delete(ht.counts, hold) - for i, h := range ht.heap { - if hold == h { - heap.Remove(&ht.heap, i) - break - } - } - } else if n > 0 { + if n > 0 { ht.counts[hold] = n - } else { + return + } else if n < 0 { panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold)) } + delete(ht.counts, hold) + for i, h := range ht.heap { + if hold == h { + heap.Remove(&ht.heap, i) + break + } + } } // Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added.