From 85ebc2f46b07ed245c6b6094781e4b512aeeb5c7 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 20 Feb 2023 08:57:02 -0800 Subject: [PATCH] [#24789][prism] Handlers for combine, ParDo, GBK, Flatten (#25558) * [prism] add in handlers * [prism] executor interface * delint - rm processor * [prism] comments --- .../beam/runners/prism/internal/execute.go | 12 + .../runners/prism/internal/handlecombine.go | 209 ++++++++++++ .../runners/prism/internal/handlepardo.go | 244 ++++++++++++++ .../runners/prism/internal/handlerunner.go | 298 ++++++++++++++++++ 4 files changed, 763 insertions(+) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/handlecombine.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/handlepardo.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/handlerunner.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index b685df63cf64..7c979ebf730e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -15,8 +15,20 @@ package internal +import ( + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" +) + // stage represents a fused subgraph. // temporary implementation to break up PRs. type stage struct { transforms []string } + +type transformExecuter interface { + ExecuteUrns() []string + ExecuteWith(t *pipepb.PTransform) string + ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go new file mode 100644 index 000000000000..ff9bd1e1c88a --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "fmt" + "reflect" + + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "google.golang.org/protobuf/proto" +) + +// This file retains the logic for the combine handler + +// CombineCharacteristic holds the configuration for Combines. +type CombineCharacteristic struct { + EnableLifting bool // Sets whether a combine composite does combiner lifting or not. +} + +// TODO figure out the factory we'd like. + +func Combine(config any) *combine { + return &combine{config: config.(CombineCharacteristic)} +} + +// combine represents an instance of the combine handler. +type combine struct { + config CombineCharacteristic +} + +// ConfigURN returns the name for combine in the configuration file. +func (*combine) ConfigURN() string { + return "combine" +} + +func (*combine) ConfigCharacteristic() reflect.Type { + return reflect.TypeOf((*CombineCharacteristic)(nil)).Elem() +} + +var _ transformPreparer = (*combine)(nil) + +func (*combine) PrepareUrns() []string { + return []string{urns.TransformCombinePerKey} +} + +// PrepareTransform returns lifted combines and removes the leaves if enabled. Otherwise returns nothing. +func (h *combine) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) { + // If we aren't lifting, the "default impl" for combines should be sufficient. + if !h.config.EnableLifting { + return nil, nil + } + + // To lift a combine, the spec should contain a CombinePayload. + // That contains the actual FunctionSpec for the DoFn, and the + // id for the accumulator coder. + // We can synthetically produce/determine the remaining coders for + // the Input and Output types from the existing PCollections. + // + // This means we also need to synthesize pcollections with the accumulator coder too. + + // What we have: + // Input PCol: KV -- INPUT + // -> GBK := KV> -- GROUPED_I + // -> Combine := KV -- OUTPUT + // + // What we want: + // Input PCol: KV -- INPUT + // -> PreCombine := KV -- LIFTED + // -> GBK -> KV> -- GROUPED_A + // -> MergeAccumulators := KV -- MERGED_A + // -> ExtractOutput -> KV -- OUTPUT + // + // First we need to produce new coders for Iter, KV>, and KV. + // The A coder ID is in the combine payload. + // + // Then we can produce the PCollections. + // We can reuse the INPUT and OUTPUT PCollections. + // We need LIFTED to have KV kv_k_a + // We need GROUPED_A to have KV> kv_k_iter_a + // We need MERGED_A to have KV kv_k_a + // + // GROUPED_I ends up unused. + // + // The PCollections inherit the properties of the Input PCollection + // such as Boundedness, and Windowing Strategy. + // + // With these, we can produce the PTransforms with the appropriate URNs for the + // different parts of the composite, and return the new components. + + cmbPayload := t.GetSpec().GetPayload() + cmb := &pipepb.CombinePayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(cmbPayload, cmb); err != nil { + panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName())) + } + + // First lets get the key coder ID. + var pcolInID string + // There's only one input. + for _, pcol := range t.GetInputs() { + pcolInID = pcol + } + inputPCol := comps.GetPcollections()[pcolInID] + kvkiID := inputPCol.GetCoderId() + kID := comps.GetCoders()[kvkiID].GetComponentCoderIds()[0] + + // Now we can start synthesis! + // Coder IDs + aID := cmb.AccumulatorCoderId + + ckvprefix := "c" + tid + "_kv_" + + iterACID := "c" + tid + "_iter_" + aID + kvkaCID := ckvprefix + kID + "_" + aID + kvkIterACID := ckvprefix + kID + "_iter" + aID + + // PCollection IDs + nprefix := "n" + tid + "_" + liftedNID := nprefix + "lifted" + groupedNID := nprefix + "grouped" + mergedNID := nprefix + "merged" + + // Now we need the output collection ID + var pcolOutID string + // There's only one input. + for _, pcol := range t.GetOutputs() { + pcolOutID = pcol + } + + // Transform IDs + eprefix := "e" + tid + "_" + liftEID := eprefix + "lift" + gbkEID := eprefix + "gbk" + mergeEID := eprefix + "merge" + extractEID := eprefix + "extract" + + coder := func(urn string, componentIDs ...string) *pipepb.Coder { + return &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urn, + }, + ComponentCoderIds: componentIDs, + } + } + + pcol := func(name, coderID string) *pipepb.PCollection { + return &pipepb.PCollection{ + UniqueName: name, + CoderId: coderID, + IsBounded: inputPCol.GetIsBounded(), + WindowingStrategyId: inputPCol.GetWindowingStrategyId(), + } + } + + tform := func(name, urn, in, out, env string) *pipepb.PTransform { + return &pipepb.PTransform{ + UniqueName: name, + Spec: &pipepb.FunctionSpec{ + Urn: urn, + Payload: cmbPayload, + }, + Inputs: map[string]string{ + "i0": in, + }, + Outputs: map[string]string{ + "i0": out, + }, + EnvironmentId: env, + } + } + + newComps := &pipepb.Components{ + Coders: map[string]*pipepb.Coder{ + iterACID: coder(urns.CoderIterable, aID), + kvkaCID: coder(urns.CoderKV, kID, aID), + kvkIterACID: coder(urns.CoderKV, kID, iterACID), + }, + Pcollections: map[string]*pipepb.PCollection{ + liftedNID: pcol(liftedNID, kvkaCID), + groupedNID: pcol(groupedNID, kvkIterACID), + mergedNID: pcol(mergedNID, kvkaCID), + }, + Transforms: map[string]*pipepb.PTransform{ + liftEID: tform(liftEID, urns.TransformPreCombine, pcolInID, liftedNID, t.GetEnvironmentId()), + gbkEID: tform(gbkEID, urns.TransformGBK, liftedNID, groupedNID, ""), + mergeEID: tform(mergeEID, urns.TransformMerge, groupedNID, mergedNID, t.GetEnvironmentId()), + extractEID: tform(mergeEID, urns.TransformExtract, mergedNID, pcolOutID, t.GetEnvironmentId()), + }, + } + + // Now we return everything! + // TODO recurse through sub transforms to remove? + // We don't need to remove the composite, since we don't add it in + // when we return the new transforms, so it's not in the topology. + return newComps, t.GetSubtransforms() +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go new file mode 100644 index 000000000000..2ac5ca5bbf59 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "fmt" + "reflect" + + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" +) + +// This file retains the logic for the pardo handler + +// ParDoCharacteristic holds the configuration for ParDos. +type ParDoCharacteristic struct { + DisableSDF bool // Sets whether a pardo supports SDFs or not. +} + +func ParDo(config any) *pardo { + return &pardo{config: config.(ParDoCharacteristic)} +} + +// pardo represents an instance of the pardo handler. +type pardo struct { + config ParDoCharacteristic +} + +// ConfigURN returns the name for combine in the configuration file. +func (*pardo) ConfigURN() string { + return "pardo" +} + +func (*pardo) ConfigCharacteristic() reflect.Type { + return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem() +} + +var _ transformPreparer = (*pardo)(nil) + +func (*pardo) PrepareUrns() []string { + return []string{urns.TransformParDo} +} + +// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features +// and requirements. +func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) { + + // ParDos are a pain in the butt. + // Combines, by comparison, are dramatically simpler. + // This is because for ParDos, how they are handled, and what kinds of transforms are in + // and around the ParDo, the actual shape of the graph will change. + // At their simplest, it's something a DoFn will handle on their own. + // At their most complex, they require intimate interaction with the subgraph + // bundling process, the data layer, state layers, and control layers. + // But unlike combines, which have a clear urn for composite + special payload, + // ParDos have the standard URN for composites with the standard payload. + // So always, we need to first unmarshal the payload. + + pardoPayload := t.GetSpec().GetPayload() + pdo := &pipepb.ParDoPayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil { + panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName())) + } + + // Lets check for and remove anything that makes things less simple. + if pdo.OnWindowExpirationTimerFamilySpec == "" && + !pdo.RequestsFinalization && + !pdo.RequiresStableInput && + !pdo.RequiresTimeSortedInput && + len(pdo.StateSpecs) == 0 && + len(pdo.TimerFamilySpecs) == 0 && + pdo.RestrictionCoderId == "" { + // Which inputs are Side inputs don't change the graph further, + // so they're not included here. Any nearly any ParDo can have them. + + // At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal. + return &pipepb.Components{ + Transforms: map[string]*pipepb.PTransform{ + tid: t, + }, + }, nil + } + + // Side inputs add to topology and make fusion harder to deal with + // (side input producers can't be in the same stage as their consumers) + // But we don't have fusion yet, so no worries. + + // State, Timers, Stable Input, Time Sorted Input, and some parts of SDF + // Are easier to deal including a fusion break. But We can do that with a + // runner specific transform for stable input, and another for timesorted + // input. + + // SplittableDoFns have 3 required phases and a 4th optional phase. + // + // PAIR_WITH_RESTRICTION which pairs elements with their restrictions + // Input: element; := INPUT + // Output: KV(element, restriction) := PWR + // + // SPLIT_AND_SIZE_RESTRICTIONS splits the pairs into sub element ranges + // and a relative size for each, in a float64 format. + // Input: KV(element, restriction) := PWR + // Output: KV(KV(element, restriction), float64) := SPLITnSIZED + // + // PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS actually processes the + // elements. This is also where splits need to be handled. + // In particular, primary and residual splits have the same format as the input. + // Input: KV(KV(element, restriction), size) := SPLITnSIZED + // Output: DoFn's output. := OUTPUT + // + // TRUNCATE_SIZED_RESTRICTION is how the runner has an SDK turn an + // unbounded transform into a bound one. Not needed until the pipeline + // is told to drain. + // Input: KV(KV(element, restriction), float64) := synthetic split results from above + // Output: KV(KV(element, restriction), float64). := synthetic, truncated results sent as Split n Sized + // + // So with that, we can figure out the coders we need. + // + // cE - Element Coder (same as input coder) + // cR - Restriction Coder + // cS - Size Coder (float64) + // ckvER - KV + // ckvERS - KV, Size> + // + // There could be a few output coders, but the outputs can be copied from + // the original transform directly. + + // First lets get the parallel input coder ID. + var pcolInID, inputLocalID string + for localID, globalID := range t.GetInputs() { + // The parallel input is the one that isn't a side input. + if _, ok := pdo.SideInputs[localID]; !ok { + inputLocalID = localID + pcolInID = globalID + break + } + } + inputPCol := comps.GetPcollections()[pcolInID] + cEID := inputPCol.GetCoderId() + cRID := pdo.RestrictionCoderId + cSID := "c" + tid + "size" + ckvERID := "c" + tid + "kv_ele_rest" + ckvERSID := ckvERID + "_size" + + coder := func(urn string, componentIDs ...string) *pipepb.Coder { + return &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urn, + }, + ComponentCoderIds: componentIDs, + } + } + + coders := map[string]*pipepb.Coder{ + ckvERID: coder(urns.CoderKV, cEID, cRID), + cSID: coder(urns.CoderDouble), + ckvERSID: coder(urns.CoderKV, ckvERID, cSID), + } + + // PCollections only have two new ones. + // INPUT -> same as ordinary DoFn + // PWR, uses ckvER + // SPLITnSIZED, uses ckvERS + // OUTPUT -> same as ordinary outputs + + nPWRID := "n" + tid + "_pwr" + nSPLITnSIZEDID := "n" + tid + "_splitnsized" + + pcol := func(name, coderID string) *pipepb.PCollection { + return &pipepb.PCollection{ + UniqueName: name, + CoderId: coderID, + IsBounded: inputPCol.GetIsBounded(), + WindowingStrategyId: inputPCol.GetWindowingStrategyId(), + } + } + + pcols := map[string]*pipepb.PCollection{ + nPWRID: pcol(nPWRID, ckvERID), + nSPLITnSIZEDID: pcol(nSPLITnSIZEDID, ckvERSID), + } + + // PTransforms have 3 new ones, with process sized elements and restrictions + // taking the brunt of the complexity, consuming the inputs + + ePWRID := "e" + tid + "_pwr" + eSPLITnSIZEDID := "e" + tid + "_splitnsize" + eProcessID := "e" + tid + "_processandsplit" + + tform := func(name, urn, in, out string) *pipepb.PTransform { + return &pipepb.PTransform{ + UniqueName: name, + Spec: &pipepb.FunctionSpec{ + Urn: urn, + Payload: pardoPayload, + }, + Inputs: map[string]string{ + inputLocalID: in, + }, + Outputs: map[string]string{ + "i0": out, + }, + EnvironmentId: t.GetEnvironmentId(), + } + } + + newInputs := maps.Clone(t.GetInputs()) + newInputs[inputLocalID] = nSPLITnSIZEDID + + tforms := map[string]*pipepb.PTransform{ + ePWRID: tform(ePWRID, urns.TransformPairWithRestriction, pcolInID, nPWRID), + eSPLITnSIZEDID: tform(eSPLITnSIZEDID, urns.TransformSplitAndSize, nPWRID, nSPLITnSIZEDID), + eProcessID: { + UniqueName: eProcessID, + Spec: &pipepb.FunctionSpec{ + Urn: urns.TransformProcessSizedElements, + Payload: pardoPayload, + }, + Inputs: newInputs, + Outputs: t.GetOutputs(), + EnvironmentId: t.GetEnvironmentId(), + }, + } + + return &pipepb.Components{ + Coders: coders, + Pcollections: pcols, + Transforms: tforms, + }, t.GetSubtransforms() +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go new file mode 100644 index 000000000000..e841620625e9 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "bytes" + "fmt" + "io" + "reflect" + "sort" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" + "golang.org/x/exp/slog" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" +) + +// This file retains the logic for the pardo handler + +// RunnerCharacteristic holds the configuration for Runner based transforms, +// such as GBKs, Flattens. +type RunnerCharacteristic struct { + SDKFlatten bool // Sets whether we should force an SDK side flatten. + SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK. +} + +func Runner(config any) *runner { + return &runner{config: config.(RunnerCharacteristic)} +} + +// runner represents an instance of the runner transform handler. +type runner struct { + config RunnerCharacteristic +} + +// ConfigURN returns the name for combine in the configuration file. +func (*runner) ConfigURN() string { + return "runner" +} + +func (*runner) ConfigCharacteristic() reflect.Type { + return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem() +} + +var _ transformExecuter = (*runner)(nil) + +func (*runner) ExecuteUrns() []string { + return []string{urns.TransformFlatten, urns.TransformGBK} +} + +// ExecuteWith returns what environment the +func (h *runner) ExecuteWith(t *pipepb.PTransform) string { + urn := t.GetSpec().GetUrn() + if urn == urns.TransformFlatten && !h.config.SDKFlatten { + return "" + } + if urn == urns.TransformGBK && !h.config.SDKGBK { + return "" + } + return t.GetEnvironmentId() +} + +// ExecuteTransform handles special processing with respect to runner specific transforms +func (h *runner) ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B { + urn := t.GetSpec().GetUrn() + var data [][]byte + var onlyOut string + for _, out := range t.GetOutputs() { + onlyOut = out + } + + switch urn { + case urns.TransformFlatten: + // Already done and collated. + data = inputData + + case urns.TransformGBK: + ws := windowingStrategy(comps, tid) + kvc := onlyInputCoderForTransform(comps, tid) + + coders := map[string]*pipepb.Coder{} + + // TODO assert this is a KV. It's probably fine, but we should fail anyway. + wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders()) + kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders()) + ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders()) + reconcileCoders(coders, comps.GetCoders()) + + wc := coders[wcID] + kc := coders[kcID] + ec := coders[ecID] + + data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark)) + if len(data[0]) == 0 { + panic("no data for GBK") + } + default: + panic(fmt.Sprintf("unimplemented runner transform[%v]", urn)) + } + + // To avoid conflicts with these single transform + // bundles, we suffix the transform IDs. + var localID string + for key := range t.GetOutputs() { + localID = key + } + + if localID == "" { + panic(fmt.Sprintf("bad transform: %v", prototext.Format(t))) + } + output := engine.TentativeData{} + for _, d := range data { + output.WriteData(onlyOut, d) + } + + dataID := tid + "_" + localID // The ID from which the consumer will read from. + b := &worker.B{ + InputTransformID: dataID, + SinkToPCollection: map[string]string{ + dataID: onlyOut, + }, + OutputData: output, + } + return b +} + +// windowingStrategy sources the transform's windowing strategy from a single parallel input. +func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingStrategy { + t := comps.GetTransforms()[tid] + var inputPColID string + for _, pcolID := range t.GetInputs() { + inputPColID = pcolID + } + pcol := comps.GetPcollections()[inputPColID] + return comps.GetWindowingStrategies()[pcol.GetWindowingStrategyId()] +} + +// gbkBytes re-encodes gbk inputs in a gbk result. +func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte { + var outputTime func(typex.Window, mtime.Time) mtime.Time + switch ws.GetOutputTime() { + case pipepb.OutputTime_END_OF_WINDOW: + outputTime = func(w typex.Window, et mtime.Time) mtime.Time { + return w.MaxTimestamp() + } + default: + // TODO need to correct session logic if output time is different. + panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime())) + } + wDec, wEnc := makeWindowCoders(wc) + + type keyTime struct { + key []byte + w typex.Window + time mtime.Time + values [][]byte + } + // Map windows to a map of keys to a map of keys to time. + // We ultimately emit the window, the key, the time, and the iterable of elements, + // all contained in the final value. + windows := map[typex.Window]map[string]keyTime{} + + kd := pullDecoder(kc, coders) + vd := pullDecoder(vc, coders) + + // Right, need to get the key coder, and the element coder. + // Cus I'll need to pull out anything the runner knows how to deal with. + // And repeat. + for _, data := range toAggregate { + // Parse out each element's data, and repeat. + buf := bytes.NewBuffer(data) + for { + ws, tm, _, err := exec.DecodeWindowedValueHeader(wDec, buf) + if err == io.EOF { + break + } + if err != nil { + panic(fmt.Sprintf("can't decode windowed value header with %v: %v", wc, err)) + } + + keyByt := kd(buf) + key := string(keyByt) + value := vd(buf) + for _, w := range ws { + ft := outputTime(w, tm) + wk, ok := windows[w] + if !ok { + wk = make(map[string]keyTime) + windows[w] = wk + } + kt := wk[key] + kt.time = ft + kt.key = keyByt + kt.w = w + kt.values = append(kt.values, value) + wk[key] = kt + } + } + } + + // If the strategy is session windows, then we need to get all the windows, sort them + // and see which ones need to be merged together. + if ws.GetWindowFn().GetUrn() == urns.WindowFnSession { + slog.Debug("sorting by session window") + session := &pipepb.SessionWindowsPayload{} + if err := (proto.UnmarshalOptions{}).Unmarshal(ws.GetWindowFn().GetPayload(), session); err != nil { + panic("unable to decode SessionWindowsPayload") + } + gapSize := mtime.Time(session.GetGapSize().AsDuration()) + + ordered := make([]window.IntervalWindow, 0, len(windows)) + for k := range windows { + ordered = append(ordered, k.(window.IntervalWindow)) + } + // Use a decreasing sort (latest to earliest) so we can correct + // the output timestamp to the new end of window immeadiately. + // TODO need to correct this if output time is different. + sort.Slice(ordered, func(i, j int) bool { + return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp() + }) + + cur := ordered[0] + sessionData := windows[cur] + for _, iw := range ordered[1:] { + // If they overlap, then we merge the data. + if iw.End+gapSize < cur.Start { + // Start a new session. + windows[cur] = sessionData + cur = iw + sessionData = windows[iw] + continue + } + // Extend the session + cur.Start = iw.Start + toMerge := windows[iw] + delete(windows, iw) + for k, kt := range toMerge { + skt := sessionData[k] + skt.key = kt.key + skt.w = cur + skt.values = append(skt.values, kt.values...) + sessionData[k] = skt + } + } + } + // Everything's aggregated! + // Time to turn things into a windowed KV> + + var buf bytes.Buffer + for _, w := range windows { + for _, kt := range w { + exec.EncodeWindowedValueHeader( + wEnc, + []typex.Window{kt.w}, + kt.time, + typex.NoFiringPane(), + &buf, + ) + buf.Write(kt.key) + coder.EncodeInt32(int32(len(kt.values)), &buf) + for _, value := range kt.values { + buf.Write(value) + } + } + } + return buf.Bytes() +} + +func onlyInputCoderForTransform(comps *pipepb.Components, tid string) *pipepb.Coder { + t := comps.GetTransforms()[tid] + var inputPColID string + for _, pcolID := range t.GetInputs() { + inputPColID = pcolID + } + pcol := comps.GetPcollections()[inputPColID] + return comps.GetCoders()[pcol.GetCoderId()] +}