From 317217fbe8a08e7bdb183c277cac1f94c4b363cc Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Tue, 27 Dec 2022 10:45:12 -0800 Subject: [PATCH] [#24789] Spot fix fullvalue wrapping. --- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 25 +++++++++++++---------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index be4f491b04cf8..273f1dc21b052 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -175,22 +175,25 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d // // Output Diagram: // -// *FullValue { -// Elm: *FullValue { -// Elm: *FullValue (original input) -// Elm2: *FullValue { -// Elm: Restriction -// Elm2: Watermark estimator state -// } +// *FullValue { +// Elm: *FullValue { +// Elm: *FullValue (original input) +// Elm2: *FullValue { +// Elm: Restriction +// Elm2: Watermark estimator state // } -// Elm2: float64 (size) -// Windows -// Timestamps // } +// Elm2: float64 (size) +// Windows +// Timestamps +// } func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { rest := elm.Elm2.(*FullValue).Elm ws := elm.Elm2.(*FullValue).Elm2 - mainElm := elm.Elm.(*FullValue) + + // If receiving directly from a datasource, + // the element may not be wrapped in a *FullValue + mainElm := convertIfNeeded(elm.Elm, &FullValue{}) splitRests := n.splitInv.Invoke(mainElm, rest)