Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24339] Make Slices use iterable coder instead of custom coder. #24346

Merged
merged 8 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,23 @@ func NewCoder(t FullType) Coder {

func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
case typex.Container:
switch t.Type() {
case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
}
switch t.Type().Kind() {
case reflect.Slice:
c, err := inferCoder(t.Components()[0])
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Iterable, T: t, Components: []*coder.Coder{c}}, nil

default:
panic(fmt.Sprintf("inferCoder: unknown container kind %v", t))
}
case typex.Concrete:
switch t.Type() {
case reflectx.Int64:
// use the beam varint coder.
Expand Down Expand Up @@ -183,9 +199,6 @@ func inferCoder(t FullType) (*coder.Coder, error) {
case reflectx.String:
return &coder.Coder{Kind: coder.String, T: t}, nil

case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil

case reflectx.Bool:
return &coder.Coder{Kind: coder.Bool, T: t}, nil

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (n *DataSource) Down(ctx context.Context) error {
}

func (n *DataSource) String() string {
return fmt.Sprintf("DataSource[%v, %v] Coder:%v Out:%v", n.SID, n.Name, n.Coder, n.Out.ID())
return fmt.Sprintf("DataSource[%v, %v] Out:%v Coder:%v ", n.SID, n.Name, n.Out.ID(), n.Coder)
}

// incrementIndexAndCheckSplit increments DataSource.index by one and checks if
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,5 +403,5 @@ func (n *ParDo) fail(err error) error {
}

func (n *ParDo) String() string {
return fmt.Sprintf("ParDo[%v] Out:%v", path.Base(n.Fn.Name()), IDs(n.Out...))
return fmt.Sprintf("ParDo[%v] Out:%v Sig: %v", path.Base(n.Fn.Name()), IDs(n.Out...), n.Fn.ProcessElementFn().Fn.Type())
}
57 changes: 57 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
Expand Down Expand Up @@ -95,11 +96,67 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
b.units = b.units[:len(b.units)-1]
}

mayFixDataSourceCoder(u)
b.units = append(b.units, u)
}
return b.build()
}

// mayFixDataSourceCoder checks the node downstream of the DataSource and if applicable, changes
// a KV<k, Iter<V>> coder to a CoGBK<k, v>. This requires knowledge of the downstream node because
// coder interpretation is ambiguous to received types in DoFns, and we can only interpret it right
// at execution time with knowledge of both.
func mayFixDataSourceCoder(u *DataSource) {
if !coder.IsKV(coder.SkipW(u.Coder)) {
return // If it's not a KV, there's nothing to do here.
}
if coder.SkipW(u.Coder).Components[1].Kind != coder.Iterable {
return // If the V is not an iterable, we don't care.
}
out := u.Out
if mp, ok := out.(*Multiplex); ok {
// Here we trust that the Multiplex Outs are all the same signature, since we've validated
// that at construction time.
out = mp.Out[0]
}

switch n := out.(type) {
// These nodes always expect CoGBK behavior.
case *Expand, *MergeAccumulators, *ReshuffleOutput, *Combine:
u.Coder = convertToCoGBK(u.Coder)
return
case *ParDo:
// So we now know we have a KV<k, Iter<V>>. So we need to validate whether the DoFn has an
// iter function in the value slot. If it does, we need to use a CoGBK coder.
sig := n.Fn.ProcessElementFn()
// Get all valid inputs and side inputs.
in := sig.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter)

if len(in) < 2 {
return // Somehow there's only a single value, so we're done. (Defense against generic KVs)
}
// It's an iterator, so we can assume it's a GBK, due to previous pre-conditions.
if sig.Param[in[1]].Kind == funcx.FnIter {
u.Coder = convertToCoGBK(u.Coder)
return
}
}
}

func convertToCoGBK(oc *coder.Coder) *coder.Coder {
ocnw := coder.SkipW(oc)
// Validate that all values from the coder are iterables.
comps := make([]*coder.Coder, 0, len(ocnw.Components))
comps = append(comps, ocnw.Components[0])
for _, c := range ocnw.Components[1:] {
if c.Kind != coder.Iterable {
panic(fmt.Sprintf("want all values to be iterables: %v", oc))
}
comps = append(comps, c.Components[0])
}
return coder.NewW(coder.NewCoGBK(comps), oc.Window)
}

type builder struct {
desc *fnpb.ProcessBundleDescriptor
coders *graphx.CoderUnmarshaller
Expand Down
94 changes: 94 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
Expand Down Expand Up @@ -90,6 +91,99 @@ func TestUnmarshalReshuffleCoders(t *testing.T) {
}
}

func TestMayFixDataSourceCoder(t *testing.T) {
knownStart := coder.NewW(
coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewI(coder.NewString())}),
coder.NewGlobalWindow())
knownWant := coder.NewW(
coder.NewCoGBK([]*coder.Coder{coder.NewBytes(), coder.NewString()}),
coder.NewGlobalWindow())

makeParDo := func(t *testing.T, fn any) *ParDo {
t.Helper()
dfn, err := graph.NewDoFn(fn)
if err != nil {
t.Fatalf("couldn't construct ParDo with Sig: %T %v", fn, err)
}
return &ParDo{Fn: dfn}
}

tests := []struct {
name string
start, want *coder.Coder
out Node
}{
{
name: "bytes",
start: coder.NewBytes(),
}, {
name: "W<bytes>",
start: coder.NewW(coder.NewBytes(), coder.NewGlobalWindow()),
}, {
name: "W<KV<bytes,bool>",
start: coder.NewW(
coder.NewKV([]*coder.Coder{coder.NewBytes(), coder.NewBool()}),
coder.NewGlobalWindow()),
}, {
name: "W<KV<bytes,Iterable<string>>_nil",
start: knownStart,
}, {
name: "W<KV<bytes,Iterable<string>>_Expand",
out: &Expand{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Combine",
out: &Combine{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_ReshuffleOutput",
out: &ReshuffleOutput{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_MergeAccumulators",
out: &MergeAccumulators{},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_Expand",
out: &Multiplex{Out: []Node{&Expand{}}},
start: knownStart,
want: knownWant,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_ParDo_KV",
out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, []string) {})}},
start: knownStart,
}, {
name: "W<KV<bytes,Iterable<string>>_Multiplex_ParDo_GBK",
out: &Multiplex{Out: []Node{makeParDo(t, func([]byte, func(*string) bool) {})}},
start: knownStart,
want: knownWant,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// If want is nil, we expect no changes.
if test.want == nil {
test.want = test.start
}

u := &DataSource{
Coder: test.start,
Out: test.out,
}
mayFixDataSourceCoder(u)
if !test.want.Equals(u.Coder) {
t.Errorf("mayFixDataSourceCoder(Datasource[Coder: %v, Out: %T]), got %v, want %v", test.start, test.out, u.Coder, test.want)
}

})
}
}

func TestUnmarshallWindowFn(t *testing.T) {
tests := []struct {
name string
Expand Down
30 changes: 18 additions & 12 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,44 +216,43 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}

id := components[1]
kind := coder.KV
root := typex.KVType

elm, err := b.peek(id)
if err != nil {
return nil, err
}

switch elm.GetSpec().GetUrn() {
case urnIterableCoder, urnStateBackedIterableCoder:
id = elm.GetComponentCoderIds()[0]
kind = coder.CoGBK
root = typex.CoGBKType
iterElmID := elm.GetComponentCoderIds()[0]

// TODO(https://github.com/apache/beam/issues/18032): If CoGBK with > 1 input, handle as special GBK. We expect
// it to be encoded as CoGBK<K,LP<CoGBKList<V,W,..>>>. Remove this handling once
// CoGBK has a first-class representation.

if ids, ok := b.isCoGBKList(id); ok {
// If the value is an iterable, and a special CoGBK type, then expand it to the real
// CoGBK signature, instead of the special type.
if ids, ok := b.isCoGBKList(iterElmID); ok {
// CoGBK<K,V,W,..>

values, err := b.Coders(ids)
if err != nil {
return nil, err
}

t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
t := typex.New(typex.CoGBKType, append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: coder.CoGBK, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
}
// It's valid to have a KV<k,Iter<v>> without being a CoGBK, and validating if we need to change to
// a CoGBK is done at the DataSource, since that's when we can check against the downstream nodes.
}

value, err := b.Coder(id)
if err != nil {
return nil, err
}

t := typex.New(root, key.T, value.T)
return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil
t := typex.New(typex.KVType, key.T, value.T)
return &coder.Coder{Kind: coder.KV, T: t, Components: []*coder.Coder{key, value}}, nil

case urnLengthPrefixCoder:
if len(components) != 1 {
Expand Down Expand Up @@ -338,7 +337,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
return c, nil

case urnIterableCoder:
case urnIterableCoder, urnStateBackedIterableCoder:
if len(components) != 1 {
return nil, errors.Errorf("could not unmarshal iterable coder from %v, expected one component but got %d", c, len(components))
}
Expand Down Expand Up @@ -553,6 +552,13 @@ func (b *CoderMarshaller) Add(c *coder.Coder) (string, error) {

return b.internBuiltInCoder(urnTimerCoder, comp...), nil

case coder.Iterable:
comp, err := b.AddMulti(c.Components)
if err != nil {
return "", errors.Wrapf(err, "failed to marshal iterable coder %v", c)
}
return b.internBuiltInCoder(urnIterableCoder, comp...), nil

default:
err := errors.Errorf("unexpected coder kind: %v", c.Kind)
return "", errors.WithContextf(err, "failed to marshal coder %v", c)
Expand Down
Loading