Skip to content

Commit

Permalink
Fix backlog reported by periodicsequence in python and go sdks to not…
Browse files Browse the repository at this point in the history
… include future outputs (#32513)
  • Loading branch information
scwhittle authored Sep 24, 2024
1 parent 72d46ce commit d93f93a
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 4 deletions.
13 changes: 11 additions & 2 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ func NewSequenceDefinition(start, end time.Time, interval time.Duration) Sequenc
}
}

// Calculates size of the output that the sequence should have emitted up to now.
func calculateSequenceByteSize(now time.Time, sd SequenceDefinition, rest offsetrange.Restriction) int64 {
nowIndex := int64(now.Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval)
if nowIndex < rest.Start {
return 0
}
return 8 * (min(rest.End, nowIndex) - rest.Start)
}

type sequenceGenDoFn struct{}

func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
Expand All @@ -75,8 +84,8 @@ func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.Lock
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
return rest.Size()
func (fn *sequenceGenDoFn) RestrictionSize(sd SequenceDefinition, rest offsetrange.Restriction) float64 {
return float64(calculateSequenceByteSize(time.Now(), sd, rest))
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
Expand Down
36 changes: 36 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
Expand Down Expand Up @@ -56,3 +57,38 @@ func TestImpulse(t *testing.T) {
passert.Count(s, out, "SecondsInMinute", 60)
ptest.RunAndValidate(t, p)
}

func TestSize(t *testing.T) {
sd := SequenceDefinition{
Interval: 10 * time.Second,
Start: 0,
End: 1000 * time.Minute.Milliseconds(),
}
end := int64((1000 * time.Minute) / (10 * time.Second))

sizeTests := []struct {
now, startIndex, endIndex, want int64
}{
{100, 10, end, 0},
{100, 9, end, 8},
{100, 8, end, 16},
{101, 9, end, 8},
{10000, 0, end, 8 * 10000 / 10},
{10000, 1002, 1003, 0},
{10100, 1002, 1003, 8},
}

for _, test := range sizeTests {
got := calculateSequenceByteSize(
time.Unix(test.now, 0),
sd,
offsetrange.Restriction{
Start: int64(test.startIndex),
End: int64(test.endIndex),
})
if got != test.want {
t.Errorf("TestBytes(%v, %v, %v) = %v, want %v",
test.now, test.startIndex, test.endIndex, got, test.want)
}
}
}
21 changes: 19 additions & 2 deletions sdks/python/apache_beam/transforms/periodicsequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,31 @@ def initial_restriction(self, element):
def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)

def restriction_size(self, unused_element, restriction):
return restriction.size()
def restriction_size(self, element, restriction):
return _sequence_backlog_bytes(element, time.time(), restriction)

# On drain, immediately stop emitting new elements
def truncate(self, unused_element, unused_restriction):
return None


def _sequence_backlog_bytes(element, now, offset_range):
'''
Calculates size of the output that the sequence should have emitted up to now.
'''
start, _, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
assert interval > 0

now_index = math.floor((now - start) / interval)
if now_index < offset_range.start:
return 0
# We attempt to be precise as some runners scale based upon bytes and
# output byte throughput.
return 8 * (min(offset_range.stop, now_index) - offset_range.start)


class ImpulseSeqGenDoFn(beam.DoFn):
'''
ImpulseSeqGenDoFn fn receives tuple elements with three parts:
Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/transforms/periodicsequence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import unittest

import apache_beam as beam
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.periodicsequence import PeriodicSequence
from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes

# Disable frequent lint warning due to pipe operator for chaining transforms.
# pylint: disable=expression-not-assigned
Expand Down Expand Up @@ -112,6 +114,24 @@ def test_periodicsequence_outputs_valid_sequence_in_past(self):
self.assertEqual(result.is_bounded, False)
assert_that(result, equal_to(k))

def test_periodicsequence_output_size(self):
element = [0, 1000000000, 10]
self.assertEqual(
_sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0)
self.assertEqual(
_sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8)
self.assertEqual(
_sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16)
self.assertEqual(
_sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8)
self.assertEqual(
_sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)),
8 * 10000 / 10)
self.assertEqual(
_sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0)
self.assertEqual(
_sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8)


if __name__ == '__main__':
unittest.main()

0 comments on commit d93f93a

Please sign in to comment.