Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#30083][prism] Factor out hold tracking to dedicated structures #31105

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 12 additions & 65 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
Expand Down Expand Up @@ -706,18 +706,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
delete(stage.inprogressKeysByBundle, rb.BundleID)

for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
n := stage.watermarkHoldsCounts[hold] - v
if n == 0 {
delete(stage.watermarkHoldsCounts, hold)
for i, h := range stage.watermarkHoldHeap {
if hold == h {
heap.Remove(&stage.watermarkHoldHeap, i)
break
}
}
} else {
stage.watermarkHoldsCounts[hold] = n
}
stage.watermarkHolds.Drop(hold, v)
}
delete(stage.inprogressHoldsByBundle, rb.BundleID)

Expand Down Expand Up @@ -918,8 +907,7 @@ type stageState struct {
// We track the count of timers with the same hold, and clear it from
// the map and heap when the count goes to zero.
// This avoids scanning the heap to remove or access a hold for each element.
watermarkHoldsCounts map[mtime.Time]int
watermarkHoldHeap holdHeap
watermarkHolds *holdTracker
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds.
}

Expand All @@ -940,37 +928,15 @@ type dataAndTimers struct {
timers map[timerKey]timerTimes
}

// holdHeap orders holds based on their timestamps
// so we can always find the minimum timestamp of pending holds.
type holdHeap []mtime.Time

func (h holdHeap) Len() int { return len(h) }
func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *holdHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(mtime.Time))
}

func (h *holdHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState {
ss := &stageState{
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHoldsCounts: map[mtime.Time]int{},
ID: ID,
outputIDs: outputIDs,
sides: sides,
strat: defaultStrat{},
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),

input: mtime.MinTimestamp,
output: mtime.MinTimestamp,
Expand Down Expand Up @@ -1016,29 +982,13 @@ func (ss *stageState) AddPending(newPending []element) int {
// don't increase the count this time, as "this" timer is already pending.
count--
// clear out the existing hold for accounting purposes.
v := ss.watermarkHoldsCounts[lastSet.hold] - 1
if v == 0 {
delete(ss.watermarkHoldsCounts, lastSet.hold)
for i, hold := range ss.watermarkHoldHeap {
if hold == lastSet.hold {
heap.Remove(&ss.watermarkHoldHeap, i)
break
}
}
} else {
ss.watermarkHoldsCounts[lastSet.hold] = v
}
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}

// Mark the hold in the heap.
ss.watermarkHoldsCounts[e.holdTimestamp] = ss.watermarkHoldsCounts[e.holdTimestamp] + 1

if len(ss.watermarkHoldsCounts) != len(ss.watermarkHoldHeap) {
// The hold should not be in the heap, so we add it.
heap.Push(&ss.watermarkHoldHeap, e.holdTimestamp)
}
ss.watermarkHolds.Add(e.holdTimestamp, 1)
}
}
return count
Expand Down Expand Up @@ -1308,10 +1258,7 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
defer ss.mu.Unlock()

minPending := ss.minPendingTimestampLocked()
minWatermarkHold := mtime.MaxTimestamp
if ss.watermarkHoldHeap.Len() > 0 {
minWatermarkHold = ss.watermarkHoldHeap[0]
}
minWatermarkHold := ss.watermarkHolds.Min()

// PCollection watermarks are based on their parents's output watermark.
_, newIn := ss.UpstreamWatermark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
ss.output = test.initOutput
ss.updateUpstreamWatermark(inputCol, test.upstream)
ss.pending = append(ss.pending, element{timestamp: test.minPending})
ss.watermarkHoldHeap = append(ss.watermarkHoldHeap, test.minStateHold)
ss.watermarkHolds.Add(test.minStateHold, 1)
ss.updateWatermarks(em)
if got, want := ss.input, test.wantInput; got != want {
pcol, up := ss.UpstreamWatermark()
Expand Down
105 changes: 105 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/holds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"container/heap"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

// holdHeap orders holds based on their timestamps
// so we can always find the minimum timestamp of pending holds.
type holdHeap []mtime.Time

func (h holdHeap) Len() int { return len(h) }
func (h holdHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h holdHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *holdHeap) Push(x any) {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(mtime.Time))
}

func (h *holdHeap) Pop() any {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// holdTracker track the watermark holds for a stage.
//
// Timers hold back the watermark until they fire, but multiple
// timers may set the same watermark hold.
// To track when the watermark may advance further this structure maintains
// counts for each set watermark hold.
// As timers are processed, their associated holds are removed, reducing the counts.
//
// A heap of the hold times is kept so we have quick access to the minimum hold, for calculating
// how to advance the watermark.
type holdTracker struct {
heap holdHeap
counts map[mtime.Time]int
}

func newHoldTracker() *holdTracker {
return &holdTracker{
counts: map[mtime.Time]int{},
}
}

// Drop the given hold count. When the count of a hold time reaches zero, it's
// removed from the heap. Drop panics if holds become negative.
func (ht *holdTracker) Drop(hold mtime.Time, v int) {
n := ht.counts[hold] - v
if n > 0 {
ht.counts[hold] = n
return
} else if n < 0 {
panic(fmt.Sprintf("prism error: negative watermark hold count %v for time %v", n, hold))
}
delete(ht.counts, hold)
for i, h := range ht.heap {
if hold == h {
heap.Remove(&ht.heap, i)
break
}
}
}

// Add a hold a number of times to heap. If the hold time isn't already present in the heap, it is added.
func (ht *holdTracker) Add(hold mtime.Time, v int) {
// Mark the hold in the heap.
ht.counts[hold] = ht.counts[hold] + v

if len(ht.counts) != len(ht.heap) {
// Since there's a difference, the hold should not be in the heap, so we add it.
heap.Push(&ht.heap, hold)
}
}

// Min returns the earliest hold in the heap. Returns [mtime.MaxTimestamp] if the heap is empty.
func (ht *holdTracker) Min() mtime.Time {
minWatermarkHold := mtime.MaxTimestamp
if len(ht.heap) > 0 {
minWatermarkHold = ht.heap[0]
}
return minWatermarkHold
}
115 changes: 115 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/holds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)

func TestHoldTracker(t *testing.T) {

type op func(*holdTracker)
add := func(hold mtime.Time, count int) op {
return func(ht *holdTracker) {
ht.Add(hold, count)
}
}

drop := func(hold mtime.Time, count int) op {
return func(ht *holdTracker) {
ht.Drop(hold, count)
}
}

tests := []struct {
name string
ops []op
wantMin mtime.Time
wantLen int
}{
{
name: "zero-max",
wantMin: mtime.MaxTimestamp,
wantLen: 0,
}, {

name: "one-min",
ops: []op{
add(mtime.MinTimestamp, 1),
},
wantMin: mtime.MinTimestamp,
wantLen: 1,
}, {

name: "cleared-max",
ops: []op{
add(mtime.MinTimestamp, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.MaxTimestamp,
wantLen: 0,
}, {
name: "cleared-non-eogw",
ops: []op{
add(mtime.MinTimestamp, 1),
add(mtime.EndOfGlobalWindowTime, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.EndOfGlobalWindowTime,
wantLen: 1,
}, {
name: "uncleared-non-min",
ops: []op{
add(mtime.MinTimestamp, 2),
add(mtime.EndOfGlobalWindowTime, 1),
drop(mtime.MinTimestamp, 1),
},
wantMin: mtime.MinTimestamp,
wantLen: 2,
}, {
name: "uncleared-non-min",
ops: []op{
add(1, 1),
add(2, 1),
add(3, 1),
drop(2, 1),
add(4, 1),
add(3, 1),
drop(1, 1),
add(2, 1),
drop(4, 1),
},
wantMin: 2,
wantLen: 2,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tracker := newHoldTracker()
for _, op := range test.ops {
op(tracker)
}
if got, want := tracker.Min(), test.wantMin; got != want {
t.Errorf("tracker.heap.Min() = %v, want %v", got, want)
}
if got, want := tracker.heap.Len(), test.wantLen; got != want {
t.Errorf("tracker.heap.Len() = %v, want %v", got, want)
}
})
}
}
12 changes: 3 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package engine

import (
"container/heap"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -139,13 +138,9 @@ func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time)
ss.mu.Lock()
defer ss.mu.Unlock()

if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
heap.Pop(&ss.watermarkHoldHeap)
ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1
}
ss.watermarkHolds.Drop(ts.currentHold, 1)
ts.currentHold = newHold
heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
ss.watermarkHoldsCounts[ts.currentHold] = 1
ss.watermarkHolds.Add(ts.currentHold, 1)

// kick the TestStream and Impulse stages too.
kick := singleSet(ts.ID)
Expand Down Expand Up @@ -281,8 +276,7 @@ func (tsi *testStreamImpl) initHandler(id string) {
tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition.

// Arrest the watermark initially to prevent terminal advancement.
heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold)
ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1
ss.watermarkHolds.Add(tsi.em.testStreamHandler.currentHold, 1)
}
}

Expand Down
Loading