Skip to content

Commit

Permalink
[apache#22737] Re-write Go SDK data plane to support timers. (apache#…
Browse files Browse the repository at this point in the history
…25982)

* commit with lock

* add throughput benchmark

* Move closed signal to atomic.

* gofmt datasource_test.go

* Retain session runner receives in this PR.

* timerWriter copypasta

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
2 people authored and jakubrauch committed Apr 27, 2023
1 parent c47b343 commit 8d61e57
Show file tree
Hide file tree
Showing 6 changed files with 862 additions and 380 deletions.
16 changes: 12 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ type SideCache interface {
// DataManager manages external data byte streams. Each data stream can be
// opened by one consumer only.
type DataManager interface {
// OpenRead opens a closable byte stream for reading.
OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
// OpenWrite opens a closable byte stream for writing.
// OpenElementChan opens a channel for data and timers.
OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error)
// OpenWrite opens a closable byte stream for data writing.
OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
// OpenTimerWrite opens a byte stream for writing timers
OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error)
}

// StateReader is the interface for reading side input data.
Expand Down Expand Up @@ -91,4 +93,10 @@ type StateReader interface {
GetSideInputCache() SideCache
}

// TODO(herohde) 7/20/2018: user state management
// Elements holds data or timers sent across the data channel.
// If TimerFamilyID is populated, it's a timer, otherwise it's
// data elements.
type Elements struct {
Data, Timers []byte
TimerFamilyID, PtransformID string
}
182 changes: 122 additions & 60 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"golang.org/x/exp/maps"
)

// DataSource is a Root execution unit.
Expand All @@ -40,9 +41,12 @@ type DataSource struct {
Coder *coder.Coder
Out Node
PCol PCollection // Handles size metrics. Value instead of pointer so it's initialized by default in tests.
// OnTimerTransforms maps PtransformIDs to their execution nodes that handle OnTimer callbacks.
OnTimerTransforms map[string]*ParDo

source DataManager
state StateReader
source DataManager
state StateReader
curInst string

index int64
splitIdx int64
Expand Down Expand Up @@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
// StartBundle initializes this datasource for the bundle.
func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
n.mu.Lock()
n.curInst = id
n.source = data.Data
n.state = data.State
n.start = time.Now()
n.index = -1
n.index = 0
n.splitIdx = math.MaxInt64
n.mu.Unlock()
return n.Out.StartBundle(ctx, id, data)
}

// splitSuccess is a marker error to indicate we've reached the split index.
// Akin to io.EOF.
var splitSuccess = errors.New("split index reached")

// process handles converting elements from the data source to timers.
//
// The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
// PTransform is done processing data for this instruction.
func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error {
// The SID contains this instruction's expected data processing transform (this one).
elms, err := n.source.OpenElementChan(ctx, n.SID, maps.Keys(n.OnTimerTransforms))
if err != nil {
return err
}

n.PCol.resetSize() // initialize the size distribution for this bundle.
var r bytes.Reader

var byteCount int
bcr := byteCountReader{reader: &r, count: &byteCount}

splitPrimaryComplete := map[string]bool{}
for {
var err error
select {
case e, ok := <-elms:
// Channel closed, so time to exit
if !ok {
return nil
}
if splitPrimaryComplete[e.PtransformID] {
continue
}
if len(e.Data) > 0 {
r.Reset(e.Data)
err = data(&bcr, e.PtransformID)
}
if len(e.Timers) > 0 {
r.Reset(e.Timers)
err = timer(&bcr, e.PtransformID, e.TimerFamilyID)
}

if err == splitSuccess {
// Returning splitSuccess means we've split, and aren't consuming the remaining buffer.
// We mark the PTransform done to ignore further data.
splitPrimaryComplete[e.PtransformID] = true
} else if err != nil && err != io.EOF {
return errors.Wrap(err, "source failed")
}
// io.EOF means the reader successfully drained.
// We're ready for a new buffer.
case <-ctx.Done():
return nil
}
}
}

// ByteCountReader is a passthrough reader that counts all the bytes read through it.
// It trusts the nested reader to return accurate byte information.
type byteCountReader struct {
count *int
reader io.ReadCloser
reader io.Reader
}

func (r *byteCountReader) Read(p []byte) (int, error) {
Expand All @@ -117,7 +180,10 @@ func (r *byteCountReader) Read(p []byte) (int, error) {
}

func (r *byteCountReader) Close() error {
return r.reader.Close()
if c, ok := r.reader.(io.Closer); ok {
c.Close()
}
return nil
}

func (r *byteCountReader) reset() int {
Expand All @@ -128,15 +194,6 @@ func (r *byteCountReader) reset() int {

// Process opens the data source, reads and decodes data, kicking off element processing.
func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
r, err := n.source.OpenRead(ctx, n.SID)
if err != nil {
return nil, err
}
defer r.Close()
n.PCol.resetSize() // initialize the size distribution for this bundle.
var byteCount int
bcr := byteCountReader{reader: r, count: &byteCount}

c := coder.SkipW(n.Coder)
wc := MakeWindowDecoder(n.Coder.Window)

Expand All @@ -155,58 +212,63 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
}

var checkpoints []*Checkpoint
for {
if n.incrementIndexAndCheckSplit() {
break
}
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, pn, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
break
err := n.process(ctx, func(bcr *byteCountReader, ptransformID string) error {
for {
// TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes?
ws, t, pn, err := DecodeWindowedValueHeader(wc, bcr.reader)
if err != nil {
return err
}
return nil, errors.Wrap(err, "source failed")
}

// Decode key or parallel element.
pe, err := cp.Decode(&bcr)
if err != nil {
return nil, errors.Wrap(err, "source decode failed")
}
pe.Timestamp = t
pe.Windows = ws
pe.Pane = pn

var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, cv, &bcr, len(cvs) == 1 && n.singleIterate)
// Decode key or parallel element.
pe, err := cp.Decode(bcr)
if err != nil {
return nil, err
return errors.Wrap(err, "source decode failed")
}
valReStreams = append(valReStreams, values)
}
pe.Timestamp = t
pe.Windows = ws
pe.Pane = pn

if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return nil, err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))
bcr.reader = r

// Check if there's a continuation and return residuals
// Needs to be done immeadiately after processing to not lose the element.
if c := n.getProcessContinuation(); c != nil {
cp, err := n.checkpointThis(ctx, c)
if err != nil {
// Errors during checkpointing should fail a bundle.
return nil, err
var valReStreams []ReStream
for _, cv := range cvs {
values, err := n.makeReStream(ctx, cv, bcr, len(cvs) == 1 && n.singleIterate)
if err != nil {
return err
}
valReStreams = append(valReStreams, values)
}
if cp != nil {
checkpoints = append(checkpoints, cp)

if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
return err
}
// Collect the actual size of the element, and reset the bytecounter reader.
n.PCol.addSize(int64(bcr.reset()))

// Check if there's a continuation and return residuals
// Needs to be done immediately after processing to not lose the element.
if c := n.getProcessContinuation(); c != nil {
cp, err := n.checkpointThis(ctx, c)
if err != nil {
// Errors during checkpointing should fail a bundle.
return err
}
if cp != nil {
checkpoints = append(checkpoints, cp)
}
}
// We've finished processing an element, check if we have finished a split.
if n.incrementIndexAndCheckSplit() {
return splitSuccess
}
}
}
return checkpoints, nil
},
func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {
tmap, err := decodeTimer(cp, wc, bcr)
log.Infof(ctx, "DEBUGLOG: timer received for: %v and %v - %+v err: %v", ptransformID, timerFamilyID, tmap, err)
return nil
})

return checkpoints, err
}

func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *byteCountReader, onlyStream bool) (ReStream, error) {
Expand Down Expand Up @@ -313,7 +375,7 @@ func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *b
}
}

func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
func readStreamToBuffer(cv ElementDecoder, r io.Reader, size int64, buf []FullValue) ([]FullValue, error) {
for i := int64(0); i < size; i++ {
value, err := cv.Decode(r)
if err != nil {
Expand Down Expand Up @@ -472,7 +534,7 @@ func (n *DataSource) checkpointThis(ctx context.Context, pc sdf.ProcessContinuat
// The bufSize param specifies the estimated number of elements that will be
// sent to this DataSource, and is used to be able to perform accurate splits
// even if the DataSource has not yet received all its elements. A bufSize of
// 0 or less indicates that its unknown, and so uses the current known size.
// 0 or less indicates that it's unknown, and so uses the current known size.
func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
if n == nil {
return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
Expand Down
Loading

0 comments on commit 8d61e57

Please sign in to comment.