From fe2a22461530d2a5f2f71ca50f3518ac6f02abfa Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Fri, 29 Mar 2024 06:21:04 -0700 Subject: [PATCH] Add Ordered Processing PTransform to Java SDK (#30735) * Initial check-in of the ordered processing extension in Java. * Address PR comments. * Address PR comments. * Added JavaDocs to OrderedProcessingStatus.java * Added batch tests. Added DLQ for events with the sequence outside of the valid range. * Added tests for windowed input. Added references to the unresolved TODO's captured as Beam's issues. * Added DLQ handling of checked exceptions happening during the state mutations. --- sdks/java/extensions/ordered/build.gradle | 33 + .../sdk/extensions/ordered/EventExaminer.java | 62 ++ .../sdk/extensions/ordered/MutableState.java | 42 + .../ordered/OrderedEventProcessor.java | 675 +++++++++++++ .../ordered/OrderedEventProcessorResult.java | 107 +++ .../ordered/OrderedProcessingHandler.java | 220 +++++ .../ordered/OrderedProcessingStatus.java | 152 +++ .../extensions/ordered/ProcessingState.java | 345 +++++++ .../extensions/ordered/UnprocessedEvent.java | 119 +++ .../sdk/extensions/ordered/package-info.java | 23 + .../beam/sdk/extensions/ordered/Event.java | 47 + .../ordered/OrderedEventProcessorTest.java | 906 ++++++++++++++++++ .../StringBufferOrderedProcessingHandler.java | 42 + .../ordered/StringBuilderState.java | 152 +++ .../ordered/StringEventExaminer.java | 46 + settings.gradle.kts | 1 + 16 files changed, 2972 insertions(+) create mode 100644 sdks/java/extensions/ordered/build.gradle create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java diff --git a/sdks/java/extensions/ordered/build.gradle b/sdks/java/extensions/ordered/build.gradle new file mode 100644 index 000000000000..3c183f03162c --- /dev/null +++ b/sdks/java/extensions/ordered/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +plugins { id 'org.apache.beam.module' } +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sorter') + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Ordered" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.slf4j_api + implementation library.java.joda_time + implementation library.java.commons_lang3 + implementation library.java.vendored_guava_32_1_2_jre + testImplementation library.java.junit + testImplementation library.java.hamcrest + testImplementation project(path: ':sdks:java:core') + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java new file mode 100644 index 000000000000..1e4fe7565517 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * Classes extending this interface will be called by {@link OrderedEventProcessor} to examine every + * incoming event. + * + * @param + * @param + */ +public interface EventExaminer> + extends Serializable { + + /** + * Is this event the first expected event for the given key and window? + * + * @param sequenceNumber the sequence number of the event as defined by the key of the input + * PCollection to {@link OrderedEventProcessor} + * @param event being processed + * @return true if this is the initial sequence. + */ + boolean isInitialEvent(long sequenceNumber, EventT event); + + /** + * If the event was the first event in the sequence, create the state to hold the required data + * needed for processing. This data will be persisted. + * + * @param event the first event in the sequence. + * @return the state to persist. + */ + @NonNull + StateT createStateOnInitialEvent(EventT event); + + /** + * Is this event the last expected event for a given key and window? + * + * @param sequenceNumber of the event + * @param event being processed + * @return true if the last event. There are cases where it's impossible to know whether it's the + * last event. False should be returned in those cases. + */ + boolean isLastEvent(long sequenceNumber, EventT event); +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java new file mode 100644 index 000000000000..3055bf7a446b --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; + +/** Mutable state mutates when events apply to it. It is stored in a Beam state. */ +public interface MutableState extends Serializable { + + /** + * The interface assumes that events will mutate the state without the possibility of throwing an + * error. + * + * @param event to be processed + * @throws Exception if a checked exception is thrown, the event will be output into {@link + * OrderedEventProcessorResult#unprocessedEvents()} with + */ + void mutate(EventT event) throws Exception; + + /** + * This method is called after each state mutation. + * + * @return Result of the processing. Can be null if nothing needs to be output after this + * mutation. + */ + ResultT produceResult(); +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java new file mode 100644 index 000000000000..935647c0e7e5 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.ordered.ProcessingState.ProcessingStateCoder; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.UnprocessedEventCoder; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.state.OrderedListState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transform for processing ordered events. Events are grouped by the key and within each key they + * are applied according to the provided sequence. Events which arrive out of sequence are buffered + * and processed after all the missing events for a given key have arrived. + * + * @param + * @param + * @param + */ +@AutoValue +@SuppressWarnings({"nullness", "TypeNameShadowing"}) +public abstract class OrderedEventProcessor< + EventT, EventKeyT, ResultT, StateT extends MutableState> + extends PTransform< + PCollection>>, + OrderedEventProcessorResult> { + + public static < + EventTypeT, + EventKeyTypeT, + ResultTypeT, + StateTypeT extends MutableState> + OrderedEventProcessor create( + OrderedProcessingHandler handler) { + return new AutoValue_OrderedEventProcessor<>(handler); + } + + @Nullable + abstract OrderedProcessingHandler getHandler(); + + @Override + public OrderedEventProcessorResult expand( + PCollection>> input) { + final TupleTag> mainOutput = + new TupleTag>("mainOutput") {}; + final TupleTag> statusOutput = + new TupleTag>("status") {}; + + final TupleTag>>> unprocessedEventOutput = + new TupleTag>>>("unprocessed-events") {}; + + OrderedProcessingHandler handler = getHandler(); + Pipeline pipeline = input.getPipeline(); + + Coder keyCoder; + try { + keyCoder = handler.getKeyCoder(pipeline, input.getCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get key coder", e); + } + + Coder eventCoder; + try { + eventCoder = handler.getEventCoder(pipeline, input.getCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get event coder", e); + } + + Coder stateCoder; + try { + stateCoder = handler.getStateCoder(pipeline); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get state coder", e); + } + + Coder resultCoder; + try { + resultCoder = handler.getResultCoder(pipeline); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get result coder", e); + } + + PCollectionTuple processingResult = + input.apply( + ParDo.of( + new OrderedProcessorDoFn<>( + handler.getEventExaminer(), + eventCoder, + stateCoder, + keyCoder, + mainOutput, + statusOutput, + handler.getStatusUpdateFrequency(), + unprocessedEventOutput, + handler.isProduceStatusUpdateOnEveryEvent(), + handler.getMaxOutputElementsPerBundle())) + .withOutputTags( + mainOutput, + TupleTagList.of(Arrays.asList(statusOutput, unprocessedEventOutput)))); + + KvCoder mainOutputCoder = KvCoder.of(keyCoder, resultCoder); + KvCoder processingStatusCoder = + KvCoder.of(keyCoder, getOrderedProcessingStatusCoder(pipeline)); + KvCoder>> unprocessedEventsCoder = + KvCoder.of( + keyCoder, KvCoder.of(VarLongCoder.of(), new UnprocessedEventCoder<>(eventCoder))); + return new OrderedEventProcessorResult<>( + pipeline, + processingResult.get(mainOutput).setCoder(mainOutputCoder), + mainOutput, + processingResult.get(statusOutput).setCoder(processingStatusCoder), + statusOutput, + processingResult.get(unprocessedEventOutput).setCoder(unprocessedEventsCoder), + unprocessedEventOutput); + } + + private static Coder getOrderedProcessingStatusCoder(Pipeline pipeline) { + SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry(); + Coder result; + try { + result = + SchemaCoder.of( + schemaRegistry.getSchema(OrderedProcessingStatus.class), + TypeDescriptor.of(OrderedProcessingStatus.class), + schemaRegistry.getToRowFunction(OrderedProcessingStatus.class), + schemaRegistry.getFromRowFunction(OrderedProcessingStatus.class)); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + return result; + } + + /** + * Main DoFn for processing ordered events. + * + * @param + * @param + * @param + */ + static class OrderedProcessorDoFn< + EventTypeT, + EventKeyTypeT, + ResultTypeT, + StateTypeT extends MutableState> + extends DoFn>, KV> { + + private static final Logger LOG = LoggerFactory.getLogger(OrderedProcessorDoFn.class); + + private static final String PROCESSING_STATE = "processingState"; + private static final String MUTABLE_STATE = "mutableState"; + private static final String BUFFERED_EVENTS = "bufferedEvents"; + private static final String STATUS_EMISSION_TIMER = "statusTimer"; + private static final String LARGE_BATCH_EMISSION_TIMER = "largeBatchTimer"; + private static final String WINDOW_CLOSED = "windowClosed"; + private final EventExaminer eventExaminer; + + @StateId(BUFFERED_EVENTS) + @SuppressWarnings("unused") + private final StateSpec> bufferedEventsSpec; + + @StateId(PROCESSING_STATE) + @SuppressWarnings("unused") + private final StateSpec>> processingStateSpec; + + @SuppressWarnings("unused") + @StateId(MUTABLE_STATE) + private final StateSpec> mutableStateSpec; + + @StateId(WINDOW_CLOSED) + @SuppressWarnings("unused") + private final StateSpec> windowClosedSpec; + + @TimerId(STATUS_EMISSION_TIMER) + @SuppressWarnings("unused") + private final TimerSpec statusEmissionTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @TimerId(LARGE_BATCH_EMISSION_TIMER) + @SuppressWarnings("unused") + private final TimerSpec largeBatchEmissionTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + private final TupleTag> statusTupleTag; + private final Duration statusUpdateFrequency; + + private final TupleTag> mainOutputTupleTag; + private final TupleTag>>> + unprocessedEventsTupleTag; + private final boolean produceStatusUpdateOnEveryEvent; + + private final long maxNumberOfResultsToProduce; + + private Long numberOfResultsBeforeBundleStart; + + /** + * Stateful DoFn to do the bulk of processing. + * + * @param eventExaminer + * @param eventCoder + * @param stateCoder + * @param keyCoder + * @param mainOutputTupleTag + * @param statusTupleTag + * @param statusUpdateFrequency + * @param unprocessedEventTupleTag + * @param produceStatusUpdateOnEveryEvent + * @param maxNumberOfResultsToProduce + */ + OrderedProcessorDoFn( + EventExaminer eventExaminer, + Coder eventCoder, + Coder stateCoder, + Coder keyCoder, + TupleTag> mainOutputTupleTag, + TupleTag> statusTupleTag, + Duration statusUpdateFrequency, + TupleTag>>> + unprocessedEventTupleTag, + boolean produceStatusUpdateOnEveryEvent, + long maxNumberOfResultsToProduce) { + this.eventExaminer = eventExaminer; + this.bufferedEventsSpec = StateSpecs.orderedList(eventCoder); + this.mutableStateSpec = StateSpecs.value(stateCoder); + this.processingStateSpec = StateSpecs.value(ProcessingStateCoder.of(keyCoder)); + this.windowClosedSpec = StateSpecs.value(BooleanCoder.of()); + this.mainOutputTupleTag = mainOutputTupleTag; + this.statusTupleTag = statusTupleTag; + this.unprocessedEventsTupleTag = unprocessedEventTupleTag; + this.statusUpdateFrequency = statusUpdateFrequency; + this.produceStatusUpdateOnEveryEvent = produceStatusUpdateOnEveryEvent; + this.maxNumberOfResultsToProduce = maxNumberOfResultsToProduce; + } + + @StartBundle + public void onBundleStart() { + numberOfResultsBeforeBundleStart = null; + } + + @FinishBundle + public void onBundleFinish() { + // This might be necessary because this field is also used in a Timer + numberOfResultsBeforeBundleStart = null; + } + + @ProcessElement + public void processElement( + @StateId(BUFFERED_EVENTS) OrderedListState bufferedEventsState, + @AlwaysFetched @StateId(PROCESSING_STATE) + ValueState> processingStateState, + @StateId(MUTABLE_STATE) ValueState mutableStateState, + @TimerId(STATUS_EMISSION_TIMER) Timer statusEmissionTimer, + @TimerId(LARGE_BATCH_EMISSION_TIMER) Timer largeBatchEmissionTimer, + @Element KV> eventAndSequence, + MultiOutputReceiver outputReceiver, + BoundedWindow window) { + + EventKeyTypeT key = eventAndSequence.getKey(); + long sequence = eventAndSequence.getValue().getKey(); + EventTypeT event = eventAndSequence.getValue().getValue(); + + ProcessingState processingState = processingStateState.read(); + + if (processingState == null) { + // This is the first time we see this key/window pair + processingState = new ProcessingState<>(key); + if (statusUpdateFrequency != null) { + // Set up the timer to produce the status of the processing on a regular basis + statusEmissionTimer.offset(statusUpdateFrequency).setRelative(); + } + } + + if (numberOfResultsBeforeBundleStart == null) { + // Per key processing is synchronized by Beam. There is no need to have it here. + numberOfResultsBeforeBundleStart = processingState.getResultCount(); + } + + processingState.eventReceived(); + + StateTypeT state = + processNewEvent( + sequence, + event, + processingState, + mutableStateState, + bufferedEventsState, + outputReceiver); + + processBufferedEvents( + processingState, state, bufferedEventsState, outputReceiver, largeBatchEmissionTimer); + + saveStates( + processingStateState, + processingState, + mutableStateState, + state, + outputReceiver, + window.maxTimestamp()); + + checkIfProcessingIsCompleted(processingState); + } + + private boolean checkIfProcessingIsCompleted(ProcessingState processingState) { + boolean result = processingState.isProcessingCompleted(); + if (result) { + LOG.info("Processing for key '" + processingState.getKey() + "' is completed."); + } + return result; + } + + private void saveStates( + ValueState> processingStatusState, + ProcessingState processingStatus, + ValueState currentStateState, + StateTypeT state, + MultiOutputReceiver outputReceiver, + Instant windowTimestamp) { + // There is always a change to the processing status + processingStatusState.write(processingStatus); + + // Stored state may not have changes if the element was out of sequence. + if (state != null) { + currentStateState.write(state); + } + + if (produceStatusUpdateOnEveryEvent) { + // During pipeline draining the window timestamp is set to a large value in the future. + // Producing an event before that results in error, that's why this logic exist. + Instant statusTimestamp = windowTimestamp; + + emitProcessingStatus(processingStatus, outputReceiver, statusTimestamp); + } + } + + private void emitProcessingStatus( + ProcessingState processingState, + MultiOutputReceiver outputReceiver, + Instant statusTimestamp) { + outputReceiver + .get(statusTupleTag) + .outputWithTimestamp( + KV.of( + processingState.getKey(), + OrderedProcessingStatus.create( + processingState.getLastOutputSequence(), + processingState.getBufferedEventCount(), + processingState.getEarliestBufferedSequence(), + processingState.getLatestBufferedSequence(), + processingState.getEventsReceived(), + processingState.getResultCount(), + processingState.getDuplicates(), + processingState.isLastEventReceived())), + statusTimestamp); + } + + /** + * Process the just received event. + * + * @return newly created or updated State. If null is returned - the event wasn't processed. + */ + private StateTypeT processNewEvent( + long currentSequence, + EventTypeT currentEvent, + ProcessingState processingState, + ValueState currentStateState, + OrderedListState bufferedEventsState, + MultiOutputReceiver outputReceiver) { + if (currentSequence == Long.MAX_VALUE) { + // OrderedListState can't handle the timestamp based on MAX_VALUE. + // To avoid exceptions, we DLQ this event. + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + currentSequence, + UnprocessedEvent.create( + currentEvent, Reason.sequence_id_outside_valid_range)))); + return null; + } + + if (processingState.hasAlreadyBeenProcessed(currentSequence)) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + currentSequence, UnprocessedEvent.create(currentEvent, Reason.duplicate)))); + return null; + } + + StateTypeT state; + boolean thisIsTheLastEvent = eventExaminer.isLastEvent(currentSequence, currentEvent); + if (eventExaminer.isInitialEvent(currentSequence, currentEvent)) { + // First event of the key/window + // What if it's a duplicate event - it will reset everything. Shall we drop/DLQ anything + // that's before the processingState.lastOutputSequence? + state = eventExaminer.createStateOnInitialEvent(currentEvent); + + processingState.eventAccepted(currentSequence, thisIsTheLastEvent); + + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + + // Nothing else to do. We will attempt to process buffered events later. + return state; + } + + if (processingState.isNextEvent(currentSequence)) { + // Event matches expected sequence + state = currentStateState.read(); + + try { + state.mutate(currentEvent); + } catch (Exception e) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of(currentSequence, UnprocessedEvent.create(currentEvent, e)))); + return null; + } + + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + processingState.eventAccepted(currentSequence, thisIsTheLastEvent); + + return state; + } + + // Event is not ready to be processed yet + Instant eventTimestamp = Instant.ofEpochMilli(currentSequence); + bufferedEventsState.add(TimestampedValue.of(currentEvent, eventTimestamp)); + processingState.eventBuffered(currentSequence, thisIsTheLastEvent); + + // This will signal that the state hasn't been mutated and we don't need to save it. + return null; + } + + /** Process buffered events. */ + private void processBufferedEvents( + ProcessingState processingState, + StateTypeT state, + OrderedListState bufferedEventsState, + MultiOutputReceiver outputReceiver, + Timer largeBatchEmissionTimer) { + if (state == null) { + // Only when the current event caused a state mutation and the state is passed to this + // method should we attempt to process buffered events + return; + } + + if (!processingState.readyToProcessBufferedEvents()) { + return; + } + + if (reachedMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + // No point in trying to process buffered events + return; + } + + Instant startRange = Instant.ofEpochMilli(processingState.getEarliestBufferedSequence()); + Instant endRange = Instant.ofEpochMilli(processingState.getLatestBufferedSequence() + 1); + Instant endClearRange = null; + + // readRange is efficiently implemented and will bring records in batches + Iterable> events = + bufferedEventsState.readRange(startRange, endRange); + + Iterator> bufferedEventsIterator = events.iterator(); + while (bufferedEventsIterator.hasNext()) { + TimestampedValue timestampedEvent = bufferedEventsIterator.next(); + Instant eventTimestamp = timestampedEvent.getTimestamp(); + long eventSequence = eventTimestamp.getMillis(); + + EventTypeT bufferedEvent = timestampedEvent.getValue(); + if (processingState.checkForDuplicateBatchedEvent(eventSequence)) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + eventSequence, + UnprocessedEvent.create(bufferedEvent, Reason.duplicate)))); + continue; + } + + if (eventSequence > processingState.getLastOutputSequence() + 1) { + processingState.foundSequenceGap(eventSequence); + // Records will be cleared up to this element + endClearRange = Instant.ofEpochMilli(eventSequence); + break; + } + + // This check needs to be done after we checked for sequence gap and before we + // attempt to process the next element which can result in a new result. + if (reachedMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + endClearRange = Instant.ofEpochMilli(eventSequence); + break; + } + + try { + state.mutate(bufferedEvent); + } catch (Exception e) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of(eventSequence, UnprocessedEvent.create(bufferedEvent, e)))); + // There is a chance that the next event will have the same sequence number and will + // process successfully. + continue; + } + + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + processingState.processedBufferedEvent(eventSequence); + // Remove this record also + endClearRange = Instant.ofEpochMilli(eventSequence + 1); + } + + bufferedEventsState.clearRange(startRange, endClearRange); + } + + private boolean reachedMaxResultCountForBundle( + ProcessingState processingState, Timer largeBatchEmissionTimer) { + boolean exceeded = + processingState.resultsProducedInBundle(numberOfResultsBeforeBundleStart) + >= maxNumberOfResultsToProduce; + if (exceeded) { + LOG.info( + "Setting the timer to output next batch of events for key '" + + processingState.getKey() + + "'"); + // See GroupIntoBatches for examples on how to hold the timestamp. + // TODO: test that on draining the pipeline all the results are still produced correctly. + // See: https://github.com/apache/beam/issues/30781 + largeBatchEmissionTimer.offset(Duration.millis(1)).setRelative(); + } + return exceeded; + } + + @OnTimer(LARGE_BATCH_EMISSION_TIMER) + public void onBatchEmission( + OnTimerContext context, + @StateId(BUFFERED_EVENTS) OrderedListState bufferedEventsState, + @AlwaysFetched @StateId(PROCESSING_STATE) + ValueState> processingStatusState, + @AlwaysFetched @StateId(MUTABLE_STATE) ValueState currentStateState, + @TimerId(LARGE_BATCH_EMISSION_TIMER) Timer largeBatchEmissionTimer, + MultiOutputReceiver outputReceiver) { + ProcessingState processingState = processingStatusState.read(); + if (processingState == null) { + LOG.warn("Processing state is empty. Ignore it if the pipeline is being cancelled."); + return; + } + StateTypeT state = currentStateState.read(); + if (state == null) { + LOG.warn("Mutable state is empty. Ignore it if the pipeline is being cancelled."); + return; + } + + LOG.debug("Starting to process batch for key '" + processingState.getKey() + "'"); + + this.numberOfResultsBeforeBundleStart = processingState.getResultCount(); + + processBufferedEvents( + processingState, state, bufferedEventsState, outputReceiver, largeBatchEmissionTimer); + + saveStates( + processingStatusState, + processingState, + currentStateState, + state, + outputReceiver, + // TODO: validate that this is correct. + context.window().maxTimestamp()); + + checkIfProcessingIsCompleted(processingState); + } + + @OnTimer(STATUS_EMISSION_TIMER) + @SuppressWarnings("unused") + public void onStatusEmission( + MultiOutputReceiver outputReceiver, + @TimerId(STATUS_EMISSION_TIMER) Timer statusEmissionTimer, + @StateId(WINDOW_CLOSED) ValueState windowClosedState, + @StateId(PROCESSING_STATE) + ValueState> processingStateState) { + + ProcessingState currentState = processingStateState.read(); + if (currentState == null) { + // This could happen if the state has been purged already during the draining. + // It means that there is nothing that we can do and we just need to return. + LOG.warn( + "Current processing state is null in onStatusEmission() - most likely the pipeline is shutting down."); + return; + } + + emitProcessingStatus(currentState, outputReceiver, Instant.now()); + + Boolean windowClosed = windowClosedState.read(); + if (!currentState.isProcessingCompleted() + // Stop producing statuses if we are finished for a particular key + && (windowClosed == null || !windowClosed)) { + statusEmissionTimer.offset(statusUpdateFrequency).setRelative(); + } + } + + @OnWindowExpiration + public void onWindowExpiration(@StateId(WINDOW_CLOSED) ValueState windowClosedState) { + windowClosedState.write(true); + } + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java new file mode 100644 index 000000000000..f61df6254b25 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * The result of the ordered processing. Two PCollections are returned: + *
  • output - the key/value of the mutated states + *
  • processingStatuses - the key/value of the status of processing for a particular key + * + * @param + * @param + */ +public class OrderedEventProcessorResult implements POutput { + + private final PCollection> outputPCollection; + private final TupleTag> outputPCollectionTupleTag; + + private final PCollection> eventProcessingStatusPCollection; + private final TupleTag> eventProcessingStatusTupleTag; + + private final PCollection>>> + unprocessedEventPCollection; + private final TupleTag>>> unprocessedEventTupleTag; + + OrderedEventProcessorResult( + Pipeline pipeline, + PCollection> outputPCollection, + TupleTag> outputPCollectionTupleTag, + PCollection> eventProcessingStatusPCollection, + TupleTag> eventProcessingStatusTupleTag, + PCollection>>> unprocessedEventPCollection, + TupleTag>>> unprocessedEventTupleTag) { + + this.pipeline = pipeline; + this.outputPCollection = outputPCollection; + this.outputPCollectionTupleTag = outputPCollectionTupleTag; + this.eventProcessingStatusPCollection = eventProcessingStatusPCollection; + this.eventProcessingStatusTupleTag = eventProcessingStatusTupleTag; + this.unprocessedEventPCollection = unprocessedEventPCollection; + this.unprocessedEventTupleTag = unprocessedEventTupleTag; + } + + private final Pipeline pipeline; + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + eventProcessingStatusTupleTag, + eventProcessingStatusPCollection, + outputPCollectionTupleTag, + outputPCollection, + unprocessedEventTupleTag, + unprocessedEvents()); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + + /** + * @return processing status for a particular key. The elements will have the timestamp of the + * instant the status was emitted. + */ + public PCollection> processingStatuses() { + return eventProcessingStatusPCollection; + } + + /** @return processed states keyed by the original key */ + public PCollection> output() { + return outputPCollection; + } + + public PCollection>>> unprocessedEvents() { + return unprocessedEventPCollection; + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java new file mode 100644 index 000000000000..444fdb118091 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * Parent class for Ordered Processing configuration handlers. + * + * @param type of events to be processed + * @param type of keys which will be used to group the events + * @param type of internal State which will be used for processing + * @param type of the result of the processing which will be output + */ +public abstract class OrderedProcessingHandler< + EventT, KeyT, StateT extends MutableState, ResultT> + implements Serializable { + + private static final int DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS = 5; + private static final boolean DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT = false; + public static final int DEFAULT_MAX_ELEMENTS_TO_OUTPUT = 10_000; + + private final Class eventTClass; + private final Class keyTClass; + private final Class stateTClass; + private final Class resultTClass; + + private int maxOutputElementsPerBundle = DEFAULT_MAX_ELEMENTS_TO_OUTPUT; + private Duration statusUpdateFrequency = + Duration.standardSeconds(DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS); + private boolean produceStatusUpdateOnEveryEvent = DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT; + + /** + * Provide concrete classes which will be used by the ordered processing transform. + * + * @param eventTClass class of the events + * @param keyTClass class of the keys + * @param stateTClass class of the state + * @param resultTClass class of the results + */ + public OrderedProcessingHandler( + Class eventTClass, + Class keyTClass, + Class stateTClass, + Class resultTClass) { + this.eventTClass = eventTClass; + this.keyTClass = keyTClass; + this.stateTClass = stateTClass; + this.resultTClass = resultTClass; + } + + /** @return the event examiner instance which will be used by the transform. */ + public abstract @NonNull EventExaminer getEventExaminer(); + + /** + * Provide the event coder. + * + *

    The default implementation of the method will use the event coder from the input + * PCollection. If the input PCollection doesn't use KVCoder, it will attempt to get the coder + * from the pipeline's coder registry. + * + * @param pipeline of the transform + * @param inputCoder input coder of the transform + * @return event coder + * @throws CannotProvideCoderException if the method can't determine the coder based on the above + * algorithm. + */ + public @NonNull Coder getEventCoder( + Pipeline pipeline, Coder>> inputCoder) + throws CannotProvideCoderException { + if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { + Coder> valueCoder = + ((KvCoder>) inputCoder).getValueCoder(); + if (KV.class.isAssignableFrom(valueCoder.getClass())) { + return ((KvCoder) valueCoder).getValueCoder(); + } + } + return pipeline.getCoderRegistry().getCoder(eventTClass); + } + + /** + * Provide the state coder. + * + *

    The default implementation will attempt to get the coder from the pipeline's code registry. + * + * @param pipeline of the transform + * @return the state coder + * @throws CannotProvideCoderException + */ + public @NonNull Coder getStateCoder(Pipeline pipeline) + throws CannotProvideCoderException { + return pipeline.getCoderRegistry().getCoder(stateTClass); + } + + /** + * Provide the key coder. + * + *

    The default implementation of the method will use the event coder from the input + * PCollection. If the input PCollection doesn't use KVCoder, it will attempt to get the coder + * from the pipeline's coder registry. + * + * @param pipeline + * @param inputCoder + * @return + * @throws CannotProvideCoderException if the method can't determine the coder based on the above + * algorithm. + */ + public @NonNull Coder getKeyCoder( + Pipeline pipeline, Coder>> inputCoder) + throws CannotProvideCoderException { + if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { + return ((KvCoder>) inputCoder).getKeyCoder(); + } + return pipeline.getCoderRegistry().getCoder(keyTClass); + } + + /** + * Provide the result coder. + * + *

    The default implementation will attempt to get the coder from the pipeline's code registry. + * + * @param pipeline + * @return result coder + * @throws CannotProvideCoderException + */ + public @NonNull Coder getResultCoder(Pipeline pipeline) + throws CannotProvideCoderException { + return pipeline.getCoderRegistry().getCoder(resultTClass); + } + + /** + * Determines the frequency of emission of the {@link OrderedProcessingStatus} elements. + * + *

    Default is 5 seconds. + * + * @return the frequency of updates. If null is returned, no updates will be emitted on a + * scheduled basis. + */ + public @Nullable Duration getStatusUpdateFrequency() { + return statusUpdateFrequency; + } + + /** + * Changes the default status update frequency. Updates will be disabled if set to null. + * + * @param statusUpdateFrequency + */ + public void setStatusUpdateFrequency(Duration statusUpdateFrequency) { + this.statusUpdateFrequency = statusUpdateFrequency; + } + + /** + * Indicates if the status update needs to be sent after each event's processing. + * + *

    Default is false. + * + * @return + * @see OrderedProcessingHandler#getStatusUpdateFrequency() getStatusUpdateFrequency + * @see OrderedEventProcessorResult#processingStatuses() PCollection of processing statuses + */ + public boolean isProduceStatusUpdateOnEveryEvent() { + return produceStatusUpdateOnEveryEvent; + } + + /** + * Sets the indicator of whether the status notification needs to be produced on every event. + * + * @param value + */ + public void setProduceStatusUpdateOnEveryEvent(boolean value) { + this.produceStatusUpdateOnEveryEvent = value; + } + + /** + * Returns the maximum number of elements which will be output per each bundle. The default is + * 10,000 elements. + * + *

    This is used to limit the amount of data produced for each bundle - many runners have + * limitations on how much data can be output from a single bundle. If many events arrive out of + * sequence and are buffered then at some point a single event can cause processing of a large + * number of buffered events. + * + * @return + */ + public int getMaxOutputElementsPerBundle() { + return maxOutputElementsPerBundle; + } + + /** + * Overrides the default value. + * + * @param maxOutputElementsPerBundle + */ + public void setMaxOutputElementsPerBundle(int maxOutputElementsPerBundle) { + this.maxOutputElementsPerBundle = maxOutputElementsPerBundle; + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java new file mode 100644 index 000000000000..6659bd2e2b92 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.joda.time.Instant; + +/** Indicates the status of ordered processing for a particular key. */ +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class OrderedProcessingStatus { + + public static OrderedProcessingStatus create( + Long lastOutputSequence, + long numberOfBufferedEvents, + Long earliestBufferedSequence, + Long latestBufferedSequence, + long numberOfReceivedEvents, + long resultCount, + long duplicateCount, + boolean lastEventReceived) { + return new AutoValue_OrderedProcessingStatus.Builder() + .setLastProcessedSequence(lastOutputSequence) + .setNumberOfBufferedEvents(numberOfBufferedEvents) + .setEarliestBufferedSequence(earliestBufferedSequence) + .setLatestBufferedSequence(latestBufferedSequence) + .setNumberOfReceivedEvents(numberOfReceivedEvents) + .setLastEventReceived(lastEventReceived) + .setDuplicateCount(duplicateCount) + .setResultCount(resultCount) + .setStatusDate(Instant.now()) + .build(); + } + + /** + * @return Last sequence processed. If null is returned - no elements for the given key and window + * have been processed yet. + */ + @Nullable + public abstract Long getLastProcessedSequence(); + + /** @return Number of events received out of sequence and buffered. */ + public abstract long getNumberOfBufferedEvents(); + + /** @return Earliest buffered sequence. If null is returned - there are no buffered events. */ + @Nullable + public abstract Long getEarliestBufferedSequence(); + + /** @return Latest buffered sequence. If null is returned - there are no buffered events. */ + @Nullable + public abstract Long getLatestBufferedSequence(); + + /** @return Total number of events received for the given key and window. */ + public abstract long getNumberOfReceivedEvents(); + + /** + * @return Number of duplicate events which were output in {@link + * OrderedEventProcessorResult#unprocessedEvents()} PCollection + */ + public abstract long getDuplicateCount(); + + /** @return Number of output results produced. */ + public abstract long getResultCount(); + + /** + * @return Indicator that the last event for the given key and window has been received. It + * doesn't necessarily mean that all the events for the given key and window have been + * processed. Use {@link OrderedProcessingStatus#getNumberOfBufferedEvents()} == 0 and this + * indicator as the sign that the processing is complete. + */ + public abstract boolean isLastEventReceived(); + + /** + * @return Timestamp of when the status was produced. It is not related to the event's timestamp. + */ + public abstract Instant getStatusDate(); + + @Override + public final boolean equals(@Nullable Object obj) { + if (obj == null) { + return false; + } + if (!OrderedProcessingStatus.class.isAssignableFrom(obj.getClass())) { + return false; + } + OrderedProcessingStatus that = (OrderedProcessingStatus) obj; + boolean result = + Objects.equals(this.getEarliestBufferedSequence(), that.getEarliestBufferedSequence()) + && Objects.equals(this.getLastProcessedSequence(), that.getLastProcessedSequence()) + && Objects.equals(this.getLatestBufferedSequence(), that.getLatestBufferedSequence()) + && this.getNumberOfBufferedEvents() == that.getNumberOfBufferedEvents() + && this.getDuplicateCount() == that.getDuplicateCount() + && this.getResultCount() == that.getResultCount() + && this.getNumberOfReceivedEvents() == that.getNumberOfReceivedEvents(); + return result; + } + + @Override + public final int hashCode() { + return Objects.hash( + this.getEarliestBufferedSequence(), + this.getLastProcessedSequence(), + this.getLatestBufferedSequence(), + this.getNumberOfBufferedEvents(), + this.getNumberOfReceivedEvents(), + this.getDuplicateCount(), + this.getResultCount()); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setLastProcessedSequence(Long value); + + public abstract Builder setNumberOfBufferedEvents(long value); + + public abstract Builder setEarliestBufferedSequence(Long value); + + public abstract Builder setLatestBufferedSequence(Long value); + + public abstract Builder setNumberOfReceivedEvents(long value); + + public abstract Builder setDuplicateCount(long value); + + public abstract Builder setResultCount(long value); + + public abstract Builder setLastEventReceived(boolean value); + + public abstract Builder setStatusDate(Instant value); + + public abstract OrderedProcessingStatus build(); + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java new file mode 100644 index 000000000000..4b591a37faab --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.Initialized; + +/** + * Class used to store the status of processing for a particular key. + * + * @param + */ +class ProcessingState { + + @Nullable private Long lastOutputSequence; + @Nullable private Long latestBufferedSequence; + @Nullable private Long earliestBufferedSequence; + private long bufferedEventCount; + + private boolean lastEventReceived; + + private long eventsReceived; + + private long duplicates; + + private long resultCount; + + private KeyT key; + + public ProcessingState(KeyT key) { + this.key = key; + this.bufferedEventCount = 0; + this.lastOutputSequence = null; + this.earliestBufferedSequence = null; + this.latestBufferedSequence = null; + } + + /** + * Only to be used by the coder. + * + * @param key + * @param lastOutputSequence + * @param earliestBufferedSequence + * @param latestBufferedSequence + * @param bufferedEventCount + */ + ProcessingState( + KeyT key, + @Nullable Long lastOutputSequence, + @Nullable Long earliestBufferedSequence, + @Nullable Long latestBufferedSequence, + long bufferedEventCount, + long eventsReceived, + long duplicates, + long resultCount, + boolean lastEventReceived) { + this(key); + this.lastOutputSequence = lastOutputSequence; + this.earliestBufferedSequence = earliestBufferedSequence; + this.latestBufferedSequence = latestBufferedSequence; + this.bufferedEventCount = bufferedEventCount; + this.eventsReceived = eventsReceived; + this.duplicates = duplicates; + this.resultCount = resultCount; + this.lastEventReceived = lastEventReceived; + } + + @Nullable + public Long getLastOutputSequence() { + return lastOutputSequence; + } + + @Nullable + public Long getLatestBufferedSequence() { + return latestBufferedSequence; + } + + @Nullable + public Long getEarliestBufferedSequence() { + return earliestBufferedSequence; + } + + public long getBufferedEventCount() { + return bufferedEventCount; + } + + public long getEventsReceived() { + return eventsReceived; + } + + public boolean isLastEventReceived() { + return lastEventReceived; + } + + public long getResultCount() { + return resultCount; + } + + public long getDuplicates() { + return duplicates; + } + + public KeyT getKey() { + return key; + } + + /** + * Current event matched the sequence and was processed. + * + * @param sequence + * @param lastEvent + */ + public void eventAccepted(long sequence, boolean lastEvent) { + this.lastOutputSequence = sequence; + setLastEventReceived(lastEvent); + } + + private void setLastEventReceived(boolean lastEvent) { + // Only one last event can be received. + this.lastEventReceived = this.lastEventReceived ? true : lastEvent; + } + + /** + * New event added to the buffer. + * + * @param sequenceNumber of the event + * @param isLastEvent + */ + void eventBuffered(long sequenceNumber, boolean isLastEvent) { + bufferedEventCount++; + latestBufferedSequence = + Math.max( + sequenceNumber, + latestBufferedSequence == null ? Long.MIN_VALUE : latestBufferedSequence); + earliestBufferedSequence = + Math.min( + sequenceNumber, + earliestBufferedSequence == null ? Long.MAX_VALUE : earliestBufferedSequence); + + setLastEventReceived(isLastEvent); + } + + /** + * An event was processed and removed from the buffer. + * + * @param sequence of the processed event + */ + public void processedBufferedEvent(long sequence) { + bufferedEventCount--; + lastOutputSequence = sequence; + + if (bufferedEventCount == 0) { + earliestBufferedSequence = latestBufferedSequence = null; + } else { + // We don't know for sure that it's the earliest record yet, but OrderedEventProcessor will + // read the next + // buffered event and call foundSequenceGap() and adjust this value. + earliestBufferedSequence = sequence + 1; + } + } + + /** + * A set of records was pulled from the buffer, but it turned out that the element is not + * sequential. + * + * @param newEarliestSequence + */ + public void foundSequenceGap(long newEarliestSequence) { + earliestBufferedSequence = newEarliestSequence; + } + + @Override + public boolean equals(@Nullable @Initialized Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProcessingState)) { + return false; + } + ProcessingState that = (ProcessingState) o; + return bufferedEventCount == that.bufferedEventCount + && lastEventReceived == that.lastEventReceived + && eventsReceived == that.eventsReceived + && duplicates == that.duplicates + && Objects.equals(lastOutputSequence, that.lastOutputSequence) + && Objects.equals(latestBufferedSequence, that.latestBufferedSequence) + && Objects.equals(earliestBufferedSequence, that.earliestBufferedSequence) + && Objects.equals(key, that.key) + && resultCount == that.resultCount; + } + + @Override + public int hashCode() { + return Objects.hash( + lastOutputSequence, + latestBufferedSequence, + earliestBufferedSequence, + bufferedEventCount, + lastEventReceived, + eventsReceived, + duplicates, + resultCount, + key); + } + + public boolean isProcessingCompleted() { + return lastEventReceived && bufferedEventCount == 0; + } + + public void eventReceived() { + eventsReceived++; + } + + public boolean isNextEvent(long sequence) { + return lastOutputSequence != null && sequence == lastOutputSequence + 1; + } + + public boolean hasAlreadyBeenProcessed(long currentSequence) { + boolean result = lastOutputSequence != null && lastOutputSequence >= currentSequence; + if (result) { + duplicates++; + } + return result; + } + + public boolean checkForDuplicateBatchedEvent(long currentSequence) { + boolean result = lastOutputSequence != null && lastOutputSequence == currentSequence; + if (result) { + duplicates++; + if (--bufferedEventCount == 0) { + earliestBufferedSequence = latestBufferedSequence = null; + } + } + return result; + } + + public boolean readyToProcessBufferedEvents() { + return earliestBufferedSequence != null + && lastOutputSequence != null + && earliestBufferedSequence == lastOutputSequence + 1; + } + + public void resultProduced() { + resultCount++; + } + + public long resultsProducedInBundle(long numberOfResultsBeforeBundleStart) { + return resultCount - numberOfResultsBeforeBundleStart; + } + + /** + * Coder for the processing status. + * + * @param + */ + static class ProcessingStateCoder extends Coder> { + + private static final NullableCoder NULLABLE_LONG_CODER = + NullableCoder.of(VarLongCoder.of()); + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final VarIntCoder INTEGER_CODER = VarIntCoder.of(); + private static final BooleanCoder BOOLEAN_CODER = BooleanCoder.of(); + + private Coder keyCoder; + + private ProcessingStateCoder(Coder keyCoder) { + this.keyCoder = keyCoder; + } + + public static ProcessingStateCoder of(Coder keyCoder) { + return new ProcessingStateCoder<>(keyCoder); + } + + @Override + public void encode(ProcessingState value, OutputStream outStream) throws IOException { + NULLABLE_LONG_CODER.encode(value.getLastOutputSequence(), outStream); + NULLABLE_LONG_CODER.encode(value.getEarliestBufferedSequence(), outStream); + NULLABLE_LONG_CODER.encode(value.getLatestBufferedSequence(), outStream); + LONG_CODER.encode(value.getBufferedEventCount(), outStream); + LONG_CODER.encode(value.getEventsReceived(), outStream); + LONG_CODER.encode(value.getDuplicates(), outStream); + LONG_CODER.encode(value.getResultCount(), outStream); + BOOLEAN_CODER.encode(value.isLastEventReceived(), outStream); + keyCoder.encode(value.getKey(), outStream); + } + + @Override + public ProcessingState decode(InputStream inStream) throws IOException { + Long lastOutputSequence = NULLABLE_LONG_CODER.decode(inStream); + Long earliestBufferedSequence = NULLABLE_LONG_CODER.decode(inStream); + Long latestBufferedSequence = NULLABLE_LONG_CODER.decode(inStream); + int bufferedRecordCount = INTEGER_CODER.decode(inStream); + long recordsReceivedCount = LONG_CODER.decode(inStream); + long duplicates = LONG_CODER.decode(inStream); + long resultCount = LONG_CODER.decode(inStream); + boolean isLastEventReceived = BOOLEAN_CODER.decode(inStream); + KeyT key = keyCoder.decode(inStream); + + return new ProcessingState<>( + key, + lastOutputSequence, + earliestBufferedSequence, + latestBufferedSequence, + bufferedRecordCount, + recordsReceivedCount, + duplicates, + resultCount, + isLastEventReceived); + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(); + } + + @Override + public void verifyDeterministic() {} + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java new file mode 100644 index 000000000000..2131ef384e22 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.commons.lang3.exception.ExceptionUtils; + +/** + * Combines the source event which failed to process with the failure reason. + * + * @param + */ +@AutoValue +public abstract class UnprocessedEvent { + + /** + * Create new unprocessed event. + * + * @param event failed event + * @param reason for failure + * @param type of the event + * @return + */ + public static UnprocessedEvent create(EventT event, Reason reason) { + return new AutoValue_UnprocessedEvent<>(event, reason, null); + } + + /** + * Create new unprocessed event which failed due to an exception thrown. + * + * @param event which failed + * @param exception which caused the failure + * @param type of the event + * @return + */ + public static UnprocessedEvent create(EventT event, Exception exception) { + return new AutoValue_UnprocessedEvent<>( + event, Reason.exception_thrown, ExceptionUtils.getStackTrace(exception)); + } + + static UnprocessedEvent create( + EventT event, Reason reason, @Nullable String failureDetails) { + return new AutoValue_UnprocessedEvent<>(event, reason, failureDetails); + } + + public enum Reason { + duplicate, + buffered, + sequence_id_outside_valid_range, + exception_thrown + }; + + public abstract EventT getEvent(); + + public abstract Reason getReason(); + + public abstract @Nullable String getExplanation(); + + static class UnprocessedEventCoder extends Coder> { + + private final Coder eventCoder; + private final NullableCoder explanationCoder = NullableCoder.of(StringUtf8Coder.of()); + + UnprocessedEventCoder(Coder eventCoder) { + this.eventCoder = eventCoder; + } + + @Override + public void encode(UnprocessedEvent value, OutputStream outStream) throws IOException { + ByteCoder.of().encode((byte) value.getReason().ordinal(), outStream); + explanationCoder.encode(value.getExplanation(), outStream); + eventCoder.encode(value.getEvent(), outStream); + } + + @Override + public UnprocessedEvent decode(InputStream inputStream) throws IOException { + Reason reason = Reason.values()[ByteCoder.of().decode(inputStream)]; + String explanation = explanationCoder.decode(inputStream); + EventT event = eventCoder.decode(inputStream); + return UnprocessedEvent.create(event, reason, explanation); + } + + @Override + public List> getCoderArguments() { + return Arrays.asList(eventCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + this, "Unprocessed event coder requires deterministic event coder", eventCoder); + } + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java new file mode 100644 index 000000000000..f9d7e3d67bff --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides a transform for ordered processing. + * + * @see org.apache.beam.sdk.extensions.ordered.OrderedEventProcessor + */ +package org.apache.beam.sdk.extensions.ordered; diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java new file mode 100644 index 000000000000..3cf879d8239b --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** + * Event class to be used in testing. + * + *

    The event simulate a string being emitted for a particular key, e.g., sensor id or customer + * id. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class Event implements Serializable { + + public static Event create(long sequence, String groupId, String value) { + return new AutoValue_Event(sequence, groupId, value); + } + + /** @return event sequence number */ + public abstract long getSequence(); + + /** @return the group id event is associated with */ + public abstract String getKey(); + + /** @return value of the event */ + public abstract String getValue(); +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java new file mode 100644 index 000000000000..6a24021ad667 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java @@ -0,0 +1,906 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Ordered Processing tests use the same testing scenario. Events are sent in or out of sequence. + * Each event is a string for a particular key. The output is a concatenation of all strings. + */ +@RunWith(JUnit4.class) +public class OrderedEventProcessorTest { + + public static final boolean LAST_EVENT_RECEIVED = true; + public static final int EMISSION_FREQUENCY_ON_EVERY_ELEMENT = 1; + public static final int INITIAL_SEQUENCE_OF_0 = 0; + public static final boolean DONT_PRODUCE_STATUS_ON_EVERY_EVENT = false; + public static final int LARGE_MAX_RESULTS_PER_OUTPUT = 1000; + public static final int EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT = 2; + public static final boolean PRODUCE_STATUS_ON_EVERY_EVENT = true; + public static final boolean STREAMING = true; + public static final boolean BATCH = false; + public static final Set>>> NO_EXPECTED_DLQ_EVENTS = + Collections.emptySet(); + @Rule public final transient TestPipeline streamingPipeline = TestPipeline.create(); + @Rule public final transient TestPipeline batchPipeline = TestPipeline.create(); + + static class MapEventsToKV extends DoFn>> { + + @ProcessElement + public void convert( + @Element Event event, OutputReceiver>> outputReceiver) { + outputReceiver.output(KV.of(event.getKey(), KV.of(event.getSequence(), event.getValue()))); + } + } + + static class MapStringBufferStateToString + extends DoFn, KV> { + + @ProcessElement + public void map( + @Element KV element, + OutputReceiver> outputReceiver) { + outputReceiver.output(KV.of(element.getKey(), element.getValue().toString())); + } + } + + @Test + public void testPerfectOrderingProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", "c"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, + 0, + null, + null, + 4, + Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), + 0, + false))); + expectedStatuses.add( + KV.of( + "id-2", + OrderedProcessingStatus.create( + 1L, + 0, + null, + null, + 2, + Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), + 0, + false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testOutOfSequenceProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(1, "id-2", "b"), + Event.create(2, "id-2", "c"), + Event.create(4, "id-2", "e"), + Event.create(0, "id-2", "a"), + Event.create(3, "id-2", "d") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, + 0, + null, + null, + 4, + Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), + 0, + false))); + expectedStatuses.add( + KV.of( + "id-2", + OrderedProcessingStatus.create( + 4L, + 0, + null, + null, + 5, + Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), + 0, + false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + expectedOutput.add(KV.of("id-2", "abc")); + expectedOutput.add(KV.of("id-2", "abcd")); + expectedOutput.add(KV.of("id-2", "abcde")); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testUnfinishedProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + // Excluded Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(0L, 2, 2L, 3L, 3, 1L, 0, false))); + expectedStatuses.add( + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2L, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + + testProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); + } + + @Test + public void testHandlingOfDuplicateSequences() throws CannotProvideCoderException { + Event[] events = { + Event.create(3, "id-1", "d"), + Event.create(2, "id-1", "c"), + // Duplicates to be buffered + Event.create(3, "id-1", "d"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + + // Duplicates after the events are processed + Event.create(1, "id-1", "b"), + Event.create(3, "id-1", "d"), + }; + int resultCount = 4; + int duplicateCount = 4; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, 0, null, null, events.length, resultCount, duplicateCount, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + + Collection>>> duplicates = new ArrayList<>(); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(1L, UnprocessedEvent.create("b", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + duplicates, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testHandlingOfCheckedExceptions() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", StringBuilderState.BAD_VALUE), + Event.create(3, "id-1", "c"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(1L, 1, 3L, 3L, events.length, 2, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + + Collection>>> failedEvents = new ArrayList<>(); + failedEvents.add( + KV.of( + "id-1", + KV.of( + 2L, + UnprocessedEvent.create(StringBuilderState.BAD_VALUE, Reason.exception_thrown)))); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + failedEvents, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(3L, 0, null, null, 4, 2L, 0, false))); + expectedStatuses.add( + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 1L, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + // Skipped KV.of("id-1", "ab"), + expectedOutput.add(KV.of("id-1", "abc")); + // Skipped KV.of("id-1", "abcd"), + expectedOutput.add(KV.of("id-2", "a")); + // Skipped KV.of("id-2", "ab") + testProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException { + int maxResultsPerOutput = 100; + + // Array of sequences starting with 2 and the last element - 1. + // Output will be buffered until the last event arrives + long[] sequences = new long[maxResultsPerOutput * 3]; + for (int i = 0; i < sequences.length - 1; i++) { + sequences[i] = i + 2L; + } + sequences[sequences.length - 1] = 1; + + List events = new ArrayList<>(sequences.length); + Collection> expectedOutput = new ArrayList<>(sequences.length); + Collection> expectedStatuses = + new ArrayList<>(sequences.length + 10); + + StringBuilder output = new StringBuilder(); + String outputPerElement = "."; + String key = "id-1"; + + int bufferedEventCount = 0; + + for (long sequence : sequences) { + ++bufferedEventCount; + + events.add(Event.create(sequence, key, outputPerElement)); + output.append(outputPerElement); + expectedOutput.add(KV.of(key, output.toString())); + + if (bufferedEventCount < sequences.length) { + // Last event will result in a batch of events being produced. That's why it's excluded + // here. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, bufferedEventCount, 2L, sequence, bufferedEventCount, 0L, 0, false))); + } + } + + // Statuses produced by the batched processing + for (int i = maxResultsPerOutput; i < sequences.length; i += maxResultsPerOutput) { + long lastOutputSequence = i; + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + lastOutputSequence, + sequences.length - lastOutputSequence, + lastOutputSequence + 1, + (long) sequences.length, + sequences.length, + lastOutputSequence, + 0, + false))); + } + + // -- Final status - indicates that everything has been fully processed + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + (long) sequences.length, + 0, + null, + null, + sequences.length, + sequences.length, + 0, + false))); + + testProcessing( + events.toArray(new Event[events.size()]), + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + 1L /* This dataset assumes 1 as the starting sequence */, + maxResultsPerOutput, + PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCoderException { + int maxResultsPerOutput = 3; + + long[] sequences = new long[] {2, 3, 7, 8, 9, 10, 1, 4, 5, 6}; + + List events = new ArrayList<>(sequences.length); + List> expectedOutput = new ArrayList<>(sequences.length); + + StringBuilder output = new StringBuilder(); + String outputPerElement = "."; + String key = "id-1"; + + for (long sequence : sequences) { + events.add(Event.create(sequence, key, outputPerElement)); + output.append(outputPerElement); + expectedOutput.add(KV.of(key, output.toString())); + } + + int numberOfReceivedEvents = 0; + Collection> expectedStatuses = new ArrayList<>(); + + // First elements are out-of-sequence and they just get buffered. Earliest and latest sequence + // numbers keep changing. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 1, 2L, 2L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 2, 2L, 3L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 3, 2L, 7L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 4, 2L, 8L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 5, 2L, 9L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 6, 2L, 10L, ++numberOfReceivedEvents, 0L, 0, false))); + // --- 1 has appeared and caused the batch to be sent out. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 3L, 4, 7L, 10L, ++numberOfReceivedEvents, 3L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 4L, 4, 7L, 10L, ++numberOfReceivedEvents, 4L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 5L, 4, 7L, 10L, ++numberOfReceivedEvents, 5L, 0, false))); + // --- 6 came and 6, 7, and 8 got output + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 8L, 2, 9L, 10L, ++numberOfReceivedEvents, 8L, 0, false))); + // Last timer run produces the final status. Number of received events doesn't + // increase, + // this is the result of a timer processing + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 10L, 0, null, null, numberOfReceivedEvents, 10L, 0, false))); + + testProcessing( + events.toArray(new Event[events.size()]), + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + 1L /* This dataset assumes 1 as the starting sequence */, + maxResultsPerOutput, + PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testHandlingOfMaxSequenceNumber() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(Long.MAX_VALUE, "id-1", "c") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 3, 2, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + + Collection>>> unprocessedEvents = + new ArrayList<>(); + unprocessedEvents.add( + KV.of( + "id-1", + KV.of( + Long.MAX_VALUE, + UnprocessedEvent.create("c", Reason.sequence_id_outside_valid_range)))); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + unprocessedEvents, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testProcessingOfTheLastInput() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", StringEventExaminer.LAST_INPUT) + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 2L, 0, null, null, events.length, events.length, 0, LAST_EVENT_RECEIVED))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "ab" + StringEventExaminer.LAST_INPUT)); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testWindowedProcessing() throws CannotProvideCoderException { + + Instant base = new Instant(0); + TestStream values = + TestStream.create(streamingPipeline.getCoderRegistry().getCoder(Event.class)) + .advanceWatermarkTo(base) + .addElements( + // Start of first window + TimestampedValue.of( + Event.create(0, "id-1", "a"), base.plus(Duration.standardSeconds(1))), + TimestampedValue.of( + Event.create(1, "id-1", "b"), base.plus(Duration.standardSeconds(2))), + TimestampedValue.of( + Event.create(0, "id-2", "x"), base.plus(Duration.standardSeconds(1))), + TimestampedValue.of( + Event.create(1, "id-2", "y"), base.plus(Duration.standardSeconds(2))), + TimestampedValue.of( + Event.create(2, "id-2", "z"), base.plus(Duration.standardSeconds(2))), + + // Start of second window. Numbering must start with 0 again. + TimestampedValue.of( + Event.create(0, "id-1", "c"), base.plus(Duration.standardSeconds(10))), + TimestampedValue.of( + Event.create(1, "id-1", "d"), base.plus(Duration.standardSeconds(11)))) + .advanceWatermarkToInfinity(); + + Pipeline pipeline = streamingPipeline; + + PCollection rawInput = pipeline.apply("Create Streaming Events", values); + PCollection>> input = + rawInput.apply("To KV", ParDo.of(new MapEventsToKV())); + + input = input.apply("Window input", Window.into(FixedWindows.of(Duration.standardSeconds(5)))); + + StringBufferOrderedProcessingHandler handler = + new StringBufferOrderedProcessingHandler( + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, INITIAL_SEQUENCE_OF_0); + handler.setMaxOutputElementsPerBundle(LARGE_MAX_RESULTS_PER_OUTPUT); + handler.setStatusUpdateFrequency(null); + handler.setProduceStatusUpdateOnEveryEvent(true); + + OrderedEventProcessor orderedEventProcessor = + OrderedEventProcessor.create(handler); + + OrderedEventProcessorResult processingResult = + input.apply("Process Events", orderedEventProcessor); + + IntervalWindow window1 = new IntervalWindow(base, base.plus(Duration.standardSeconds(5))); + PAssert.that("Output matches in window 1", processingResult.output()) + .inWindow(window1) + .containsInAnyOrder( + KV.of("id-1", "a"), + KV.of("id-1", "ab"), + KV.of("id-2", "x"), + KV.of("id-2", "xy"), + KV.of("id-2", "xyz")); + + IntervalWindow window2 = + new IntervalWindow( + base.plus(Duration.standardSeconds(10)), base.plus(Duration.standardSeconds(15))); + PAssert.that("Output matches in window 2", processingResult.output()) + .inWindow(window2) + .containsInAnyOrder(KV.of("id-1", "c"), KV.of("id-1", "cd")); + + PAssert.that("Statuses match in window 1", processingResult.processingStatuses()) + .inWindow(window1) + .containsInAnyOrder( + KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(2L, 0, null, null, 3, 3, 0, false))); + + PAssert.that("Statuses match in window 2", processingResult.processingStatuses()) + .inWindow(window2) + .containsInAnyOrder( + KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false))); + + PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) + .containsInAnyOrder(NO_EXPECTED_DLQ_EVENTS); + + pipeline.run(); + } + + private void testProcessing( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent) + throws CannotProvideCoderException { + testProcessing( + events, + expectedStatuses, + expectedOutput, + NO_EXPECTED_DLQ_EVENTS, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent); + } + + private void testProcessing( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + Collection>>> expectedUnprocessedEvents, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent) + throws CannotProvideCoderException { + doTest( + events, + expectedStatuses, + expectedOutput, + expectedUnprocessedEvents, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent, + STREAMING); + doTest( + events, + expectedStatuses, + expectedOutput, + expectedUnprocessedEvents, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent, + BATCH); + } + + /** + * The majority of the tests use this method. Testing is done in the global window. + * + * @param events + * @param expectedStatuses + * @param expectedOutput + * @param expectedUnprocessedEvents + * @param emissionFrequency + * @param initialSequence + * @param maxResultsPerOutput + * @param produceStatusOnEveryEvent + * @param streaming + * @throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException + */ + private void doTest( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + Collection>>> expectedUnprocessedEvents, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent, + boolean streaming) + throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException { + + Pipeline pipeline = streaming ? streamingPipeline : batchPipeline; + + PCollection rawInput = + streaming + ? createStreamingPCollection(pipeline, events) + : createBatchPCollection(pipeline, events); + PCollection>> input = + rawInput.apply("To KV", ParDo.of(new MapEventsToKV())); + + StringBufferOrderedProcessingHandler handler = + new StringBufferOrderedProcessingHandler(emissionFrequency, initialSequence); + handler.setMaxOutputElementsPerBundle(maxResultsPerOutput); + if (produceStatusOnEveryEvent) { + handler.setProduceStatusUpdateOnEveryEvent(true); + // This disables status updates emitted on timers. + handler.setStatusUpdateFrequency(null); + } else { + handler.setStatusUpdateFrequency( + streaming ? Duration.standardMinutes(5) : Duration.standardSeconds(1)); + } + OrderedEventProcessor orderedEventProcessor = + OrderedEventProcessor.create(handler); + + OrderedEventProcessorResult processingResult = + input.apply("Process Events", orderedEventProcessor); + + PAssert.that("Output matches", processingResult.output()).containsInAnyOrder(expectedOutput); + + if (streaming) { + // Only in streaming the events will arrive in a pre-determined order and the statuses + // will be deterministic. In batch pipelines events can be processed in any order, + // so we skip status verification and rely on the output and unprocessed event matches. + PAssert.that("Statuses match", processingResult.processingStatuses()) + .containsInAnyOrder(expectedStatuses); + } + + // This is a temporary workaround until PAssert changes. + boolean unprocessedEventsHaveExceptionStackTrace = false; + for (KV>> event : expectedUnprocessedEvents) { + if (event.getValue().getValue().getReason() == Reason.exception_thrown) { + unprocessedEventsHaveExceptionStackTrace = true; + break; + } + } + + if (unprocessedEventsHaveExceptionStackTrace) { + PAssert.thatSingleton( + "Unprocessed event count", + processingResult + .unprocessedEvents() + .apply( + "Window", + Window.>>>into( + new GlobalWindows()) + .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .discardingFiredPanes()) + .apply("Count", Count.globally())) + .isEqualTo((long) expectedUnprocessedEvents.size()); + } else { + PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) + .containsInAnyOrder(expectedUnprocessedEvents); + } + pipeline.run(); + } + + private @UnknownKeyFor @NonNull @Initialized PCollection createBatchPCollection( + Pipeline pipeline, Event[] events) { + return pipeline + .apply("Create Batch Events", Create.of(Arrays.asList(events))) + .apply("Reshuffle", Reshuffle.viaRandomKey()); + } + + private @UnknownKeyFor @NonNull @Initialized PCollection createStreamingPCollection( + Pipeline pipeline, Event[] events) + throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException { + Instant now = Instant.now().minus(Duration.standardMinutes(20)); + TestStream.Builder messageFlow = + TestStream.create(pipeline.getCoderRegistry().getCoder(Event.class)) + .advanceWatermarkTo(now); + + int delayInMilliseconds = 0; + for (Event e : events) { + messageFlow = + messageFlow + .advanceWatermarkTo(now.plus(Duration.millis(++delayInMilliseconds))) + .addElements(e); + } + + // Needed to force the processing time based timers. + messageFlow = messageFlow.advanceProcessingTime(Duration.standardMinutes(15)); + return pipeline.apply("Create Streaming Events", messageFlow.advanceWatermarkToInfinity()); + } + + /** + * Unprocessed event's explanation contains stacktraces which makes tests very brittle because it + * requires hardcoding the line numbers in the code. We use this matcher to only compare on the + * first line of the explanation. + */ + static class UnprocessedEventMatcher + extends BaseMatcher>>> + implements SerializableMatcher>>> { + + private KV>> element; + + public UnprocessedEventMatcher(KV>> element) { + this.element = element; + } + + @Override + public boolean matches(Object actual) { + KV>> toMatch = + (KV>>) actual; + + UnprocessedEvent originalEvent = element.getValue().getValue(); + UnprocessedEvent eventToMatch = toMatch.getValue().getValue(); + + return element.getKey().equals(toMatch.getKey()) + && element.getValue().getKey().equals(toMatch.getValue().getKey()) + && originalEvent.getEvent().equals(eventToMatch.getEvent()) + && originalEvent.getReason() == eventToMatch.getReason() + && normalizeExplanation(originalEvent.getExplanation()) + .equals(normalizeExplanation(eventToMatch.getExplanation())); + } + + @Override + public void describeTo(Description description) { + description.appendText("Just some text..."); + } + + static String normalizeExplanation(String value) { + if (value == null) { + return ""; + } + String firstLine = value.split("\n", 1)[0]; + if (firstLine.contains("Exception")) { + return firstLine; + } + return value; + } + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java new file mode 100644 index 000000000000..72f3a3cf21b6 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * Ordered processing handler used for testing. + * + *

    It uses all the defaults of the parent class. + */ +public class StringBufferOrderedProcessingHandler + extends OrderedProcessingHandler { + + private final EventExaminer eventExaminer; + + public StringBufferOrderedProcessingHandler(int emissionFrequency, long initialSequence) { + super(String.class, String.class, StringBuilderState.class, String.class); + this.eventExaminer = new StringEventExaminer(initialSequence, emissionFrequency); + } + + @Override + @NonNull + public EventExaminer getEventExaminer() { + return eventExaminer; + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java new file mode 100644 index 000000000000..c88730aa8f0a --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.ordered.StringBuilderState.StringBuilderStateCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +/** + * State used for processing test events. Uses StringBuilder to accumulate the output. + * + *

    Can be configured to produce output with certain frequency. + */ +@DefaultCoder(StringBuilderStateCoder.class) +class StringBuilderState implements MutableState { + + public static final String BAD_VALUE = "throw exception if you see me"; + + private int emissionFrequency = 1; + private long currentlyEmittedElementNumber; + + private final StringBuilder state = new StringBuilder(); + + StringBuilderState(String initialEvent, int emissionFrequency) { + this(initialEvent, emissionFrequency, 0L); + } + + StringBuilderState( + String initialEvent, int emissionFrequency, long currentlyEmittedElementNumber) { + this.emissionFrequency = emissionFrequency; + this.currentlyEmittedElementNumber = currentlyEmittedElementNumber; + try { + mutate(initialEvent); + } catch (Exception e) { + // this shouldn't happen because the input should be pre-validated. + throw new RuntimeException(e); + } + } + + @Override + public void mutate(String event) throws Exception { + if (event.equals(BAD_VALUE)) { + throw new Exception("Validation failed"); + } + state.append(event); + } + + @Override + public String produceResult() { + return currentlyEmittedElementNumber++ % emissionFrequency == 0 ? state.toString() : null; + } + + @Override + public String toString() { + return state.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StringBuilderState)) { + return false; + } + StringBuilderState that = (StringBuilderState) o; + return emissionFrequency == that.emissionFrequency + && currentlyEmittedElementNumber == that.currentlyEmittedElementNumber + && state.toString().equals(that.state.toString()); + } + + @Override + public int hashCode() { + return Objects.hash(state); + } + + /** Coder for the StringBuilderState. */ + static class StringBuilderStateCoder extends Coder { + + private static final Coder STRING_CODER = StringUtf8Coder.of(); + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final Coder INT_CODER = VarIntCoder.of(); + + @Override + public void encode( + StringBuilderState value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws IOException { + INT_CODER.encode(value.emissionFrequency, outStream); + LONG_CODER.encode(value.currentlyEmittedElementNumber, outStream); + STRING_CODER.encode(value.state.toString(), outStream); + } + + @Override + public StringBuilderState decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws IOException { + int emissionFrequency = INT_CODER.decode(inStream); + long currentlyEmittedElementNumber = LONG_CODER.decode(inStream); + String decoded = STRING_CODER.decode(inStream); + StringBuilderState result = + new StringBuilderState(decoded, emissionFrequency, currentlyEmittedElementNumber); + return result; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List< + ? extends + @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized ?>> + getCoderArguments() { + return ImmutableList.of(); + } + + @Override + public void verifyDeterministic() {} + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Object structuralValue(StringBuilderState value) { + return super.structuralValue(value); + } + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java new file mode 100644 index 000000000000..7cf9a6e70572 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +/** Test event examiner. */ +class StringEventExaminer implements EventExaminer { + + public static final String LAST_INPUT = "z"; + private final long initialSequence; + private final int emissionFrequency; + + public StringEventExaminer(long initialSequence, int emissionFrequency) { + this.initialSequence = initialSequence; + this.emissionFrequency = emissionFrequency; + } + + @Override + public boolean isInitialEvent(long sequenceNumber, String input) { + return sequenceNumber == initialSequence; + } + + @Override + public StringBuilderState createStateOnInitialEvent(String input) { + return new StringBuilderState(input, emissionFrequency); + } + + @Override + public boolean isLastEvent(long sequenceNumber, String input) { + return input.equals(LAST_INPUT); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 417fc87baa2d..ec11fd32fdd3 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -184,6 +184,7 @@ include(":sdks:java:extensions:google-cloud-platform-core") include(":sdks:java:extensions:jackson") include(":sdks:java:extensions:join-library") include(":sdks:java:extensions:ml") +include(":sdks:java:extensions:ordered") include(":sdks:java:extensions:protobuf") include(":sdks:java:extensions:python") include(":sdks:java:extensions:sbe")