Skip to content

Commit

Permalink
[prism] Basic Xlang support.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Sep 19, 2023
1 parent 6dc03eb commit f321ab0
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 63 deletions.
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ func (s *decodeStream) Read() (*FullValue, error) {
}
err := s.d.DecodeTo(s.r, &s.ret)
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, errors.Wrap(err, "decodeStream value decode failed")
}
s.next++
Expand Down Expand Up @@ -342,6 +345,9 @@ func (s *decodeMultiChunkStream) Read() (*FullValue, error) {
if s.chunk == 0 && s.next == 0 {
chunk, err := coder.DecodeVarInt(s.r.reader)
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, errors.Wrap(err, "decodeMultiChunkStream chunk size decoding failed")
}
s.chunk = chunk
Expand Down
49 changes: 29 additions & 20 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"sort"
"sync/atomic"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand All @@ -46,15 +47,14 @@ func RunPipeline(j *jobservices.Job) {
// here, we only want and need the go one, operating
// in loopback mode.
envs := j.Pipeline.GetComponents().GetEnvironments()
if len(envs) != 1 {
j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
return
}
env, _ := getOnlyPair(envs)
wk, err := makeWorker(env, j)
if err != nil {
j.Failed(err)
return
wks := map[string]*worker.W{}
for envID := range envs {
wk, err := makeWorker(envID, j)
if err != nil {
j.Failed(err)
return
}
wks[envID] = wk
}
// When this function exits, we cancel the context to clear
// any related job resources.
Expand All @@ -65,7 +65,7 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

if err := executePipeline(j.RootCtx, wk, j); err != nil {
if err := executePipeline(j.RootCtx, wks, j); err != nil {
j.Failed(err)
return
}
Expand All @@ -92,7 +92,7 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
// Check for connection succeeding after we've created the environment successfully.
timeout := 1 * time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
if wk.Connected() || wk.Stopped() {
return
}
err := fmt.Errorf("prism %v didn't get control connection to %v after %v", wk, wk.Endpoint(), timeout)
Expand All @@ -112,7 +112,7 @@ type processor struct {
transformExecuters map[string]transformExecuter
}

func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) error {
func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservices.Job) error {
pipeline := j.Pipeline
comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components)

Expand Down Expand Up @@ -155,7 +155,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
// TODO move this loop and code into the preprocessor instead.
stages := map[string]*stage{}
var impulses []string
for _, stage := range topo {

// Inialize the "dataservice cache" to support side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
ds := &worker.DataService{}

for i, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
urn := t.GetSpec().GetUrn()
Expand All @@ -166,11 +171,11 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
if stage.exe != nil {
stage.envID = stage.exe.ExecuteWith(t)
}
stage.ID = wk.NextStage()
stage.ID = fmt.Sprintf("stage-%03d", i)
wk := wks[stage.envID]

switch stage.envID {
case "": // Runner Transforms

var onlyOut string
for _, out := range t.GetOutputs() {
onlyOut = out
Expand Down Expand Up @@ -229,10 +234,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
em.AddStage(stage.ID, inputs, nil, []string{getOnlyValue(t.GetOutputs())})
}
stages[stage.ID] = stage
wk.Descriptors[stage.ID] = stage.desc
case wk.Env:
// Great! this is for this environment. // Broken abstraction.
if err := buildDescriptor(stage, comps, wk); err != nil {
if err := buildDescriptor(stage, comps, wk, ds); err != nil {
return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
Expand All @@ -256,7 +259,12 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
maxParallelism := make(chan struct{}, 8)
// Execute stages here
bundleFailed := make(chan error)
bundles := em.Bundles(ctx, wk.NextInst)

var instID uint64
bundles := em.Bundles(ctx, func() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
})

for {
select {
case <-ctx.Done():
Expand All @@ -270,7 +278,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
go func(rb engine.RunBundle) {
defer func() { <-maxParallelism }()
s := stages[rb.StageID]
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, ds, comps, em, rb); err != nil {
// Ensure we clean up on bundle failure
em.FailBundle(rb)
bundleFailed <- err
Expand Down
20 changes: 13 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
return nil, err
}
var errs []error
check := func(feature string, got, want any) {
if got != want {
err := unimplementedError{
feature: feature,
value: got,
check := func(feature string, got any, wants ...any) {
for _, want := range wants {
if got == want {
return
}
errs = append(errs, err)
}

err := unimplementedError{
feature: feature,
value: got,
}
errs = append(errs, err)
}

// Inspect Transforms for unsupported features.
Expand All @@ -114,6 +118,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
urns.TransformGBK,
urns.TransformFlatten,
urns.TransformCombinePerKey,
urns.TransformCombineGlobally, // Used by Java SDK
urns.TransformCombineGroupedValues, // Used by Java SDK
urns.TransformAssignWindows:
// Very few expected transforms types for submitted pipelines.
// Most URNs are for the runner to communicate back to the SDK for execution.
Expand Down Expand Up @@ -154,7 +160,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
}
if !bypassedWindowingStrategies[wsID] {
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY)
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS)
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// Non nil triggers should fail.
if ws.GetTrigger().GetDefault() == nil {
Expand Down
16 changes: 8 additions & 8 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type stage struct {
OutputsToCoders map[string]engine.PColInfo
}

func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error {
func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, ds *worker.DataService, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error {
slog.Debug("Execute: starting bundle", "bundle", rb)

var b *worker.B
Expand Down Expand Up @@ -204,8 +204,8 @@ progress:
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
// TODO handle side input data properly.
wk.D.Commit(b.OutputData)
// TODO(https://github.com/apache/beam/issues/28543) handle side input data properly.
ds.Commit(b.OutputData)
var residualData [][]byte
var minOutputWatermark map[string]mtime.Time
for _, rr := range resp.GetResidualRoots() {
Expand Down Expand Up @@ -270,7 +270,7 @@ func portFor(wInCid string, wk *worker.W) []byte {
// It assumes that the side inputs are not sourced from PCollections generated by any transform in this stage.
//
// Because we need the local ids for routing the sources/sinks information.
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W, ds *worker.DataService) error {
// Assume stage has an indicated primary input

coders := map[string]*pipepb.Coder{}
Expand Down Expand Up @@ -327,7 +327,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
// Update side inputs to point to new PCollection with any replaced coders.
transforms[si.transform].GetInputs()[si.local] = newGlobal
}
prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, wk)
prepSide, err := handleSideInput(si.transform, si.local, si.global, comps, coders, ds)
if err != nil {
slog.Error("buildDescriptor: handleSideInputs", err, slog.String("transformID", si.transform))
return err
Expand Down Expand Up @@ -392,7 +392,7 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
}

// handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark.
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) {
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, ds *worker.DataService) (func(b *worker.B, watermark mtime.Time), error) {
t := comps.GetTransforms()[tid]
sis, err := getSideInputs(t)
if err != nil {
Expand All @@ -412,7 +412,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders

global, local := global, local
return func(b *worker.B, watermark mtime.Time) {
data := wk.D.GetAllData(global)
data := ds.GetAllData(global)

if b.IterableSideInputData == nil {
b.IterableSideInputData = map[string]map[string]map[typex.Window][][]byte{}
Expand Down Expand Up @@ -447,7 +447,7 @@ func handleSideInput(tid, local, global string, comps *pipepb.Components, coders
global, local := global, local
return func(b *worker.B, watermark mtime.Time) {
// May be of zero length, but that's OK. Side inputs can be empty.
data := wk.D.GetAllData(global)
data := ds.GetAllData(global)
if b.MultiMapSideInputData == nil {
b.MultiMapSideInputData = map[string]map[string]map[typex.Window]map[string][][]byte{}
}
Expand Down
2 changes: 2 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ var (
// SDK transforms.
TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO)
TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY)
TransformCombineGlobally = ctUrn(pipepb.StandardPTransforms_COMBINE_GLOBALLY)
TransformReshuffle = ctUrn(pipepb.StandardPTransforms_RESHUFFLE)
TransformCombineGroupedValues = cmbtUrn(pipepb.StandardPTransforms_COMBINE_GROUPED_VALUES)
TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE)
TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS)
TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS)
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *B) Respond(resp *fnpb.InstructionResponse) {
}
b.responded = true
if resp.GetError() != "" {
b.BundleErr = fmt.Errorf("bundle %v failed:%v", resp.GetInstructionId(), resp.GetError())
b.BundleErr = fmt.Errorf("bundle %v %v failed:%v", resp.GetInstructionId(), b.PBDID, resp.GetError())
close(b.Resp)
return
}
Expand Down
24 changes: 12 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type W struct {
server *grpc.Server

// These are the ID sources
inst, bund uint64
inst uint64
connected, stopped atomic.Bool

InstReqs chan *fnpb.InstructionRequest
Expand All @@ -76,8 +76,6 @@ type W struct {
mu sync.Mutex
activeInstructions map[string]controlResponder // Active instructions keyed by InstructionID
Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID

D *DataService
}

type controlResponder interface {
Expand All @@ -104,8 +102,6 @@ func New(id, env string) *W {

activeInstructions: make(map[string]controlResponder),
Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),

D: &DataService{},
}
slog.Debug("Serving Worker components", slog.String("endpoint", wk.Endpoint()))
fnpb.RegisterBeamFnControlServer(wk.server, wk)
Expand Down Expand Up @@ -149,11 +145,7 @@ func (wk *W) Stop() {
}

func (wk *W) NextInst() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&wk.inst, 1))
}

func (wk *W) NextStage() string {
return fmt.Sprintf("stage%03d", atomic.AddUint64(&wk.bund, 1))
return fmt.Sprintf("inst-%v-%03d", wk.Env, atomic.AddUint64(&wk.inst, 1))
}

// TODO set logging level.
Expand Down Expand Up @@ -263,6 +255,11 @@ func (wk *W) Connected() bool {
return wk.connected.Load()
}

// Stopped indicates that the worker has stopped.
func (wk *W) Stopped() bool {
return wk.stopped.Load()
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
Expand Down Expand Up @@ -312,10 +309,12 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.mu.Lock()
// Fail extant instructions
slog.Debug("SDK Disconnected", "worker", wk, "ctx_error", ctrl.Context().Err(), "outstanding_instructions", len(wk.activeInstructions))

msg := fmt.Sprintf("SDK worker disconnected: %v, %v active instructions", wk.String(), len(wk.activeInstructions))
for instID, b := range wk.activeInstructions {
b.Respond(&fnpb.InstructionResponse{
InstructionId: instID,
Error: "SDK Disconnected",
Error: msg,
})
}
wk.mu.Unlock()
Expand Down Expand Up @@ -536,7 +535,7 @@ func (wk *W) sendInstruction(ctx context.Context, req *fnpb.InstructionRequest)

req.InstructionId = progInst

if wk.stopped.Load() {
if wk.Stopped() {
return nil
}
wk.InstReqs <- req
Expand Down Expand Up @@ -566,6 +565,7 @@ func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.

// DataService is slated to be deleted in favour of stage based state
// management for side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
type DataService struct {
mu sync.Mutex
// TODO actually quick process the data to windows here as well.
Expand Down
14 changes: 1 addition & 13 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ func TestWorker_NextInst(t *testing.T) {
}
}

func TestWorker_NextStage(t *testing.T) {
w := New("test", "testEnv")

stageIDs := map[string]struct{}{}
for i := 0; i < 100; i++ {
stageIDs[w.NextStage()] = struct{}{}
}
if got, want := len(stageIDs), 100; got != want {
t.Errorf("calling w.NextStage() got %v unique ids, want %v", got, want)
}
}

func TestWorker_GetProcessBundleDescriptor(t *testing.T) {
w := New("test", "testEnv")

Expand Down Expand Up @@ -189,7 +177,7 @@ func TestWorker_Data_HappyPath(t *testing.T) {

b := &B{
InstID: instID,
PBDID: wk.NextStage(),
PBDID: "teststageID",
InputData: [][]byte{
{1, 1, 1, 1, 1, 1},
},
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ var portableFilters = []string{
}

var prismFilters = []string{
// The prism runner does not yet support cross-language.
"TestXLang.*",
// The prism runner does not yet support Java's CoGBK.
"TestXLang_CoGroupBy",
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
Expand Down

0 comments on commit f321ab0

Please sign in to comment.