diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 47de571672..09a0705e0e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -587,7 +587,7 @@ public Builder withTimeReceived(final Instant timeReceived) { * @return returns the builder * @since 2.10 */ - protected Builder withEventHandle(final EventHandle eventHandle) { + public Builder withEventHandle(final EventHandle eventHandle) { this.eventHandle = eventHandle; return this; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 551aed3d01..784758fa95 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -33,6 +33,15 @@ public interface Processor, OutputRecord extends R */ void prepareForShutdown(); + /** + * @since 2.11 + * Indicates if the processor holds the events or not + * Holding events indicates that the events are not ready to be released. + */ + default boolean holdsEvents() { + return false; + } + /** * @since 1.2 * Returns true if the Processor's internal state is safe to be shutdown. diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java new file mode 100644 index 0000000000..2fec941c4f --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.processor; + +import org.junit.jupiter.api.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProcessorTest { + + @Test + public void testDefault() { + Processor processor = mock(Processor.class); + when(processor.holdsEvents()).thenCallRealMethod(); + assertThat(processor.holdsEvents(), equalTo(false)); + } +} + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index b5d3f812cf..f1a247b717 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina * @param records records that needs to published to each sink * @return List of Future, each future for each sink */ - List> publishToSinks(final Collection records) { + public List> publishToSinks(final Collection records) { final int sinksSize = sinks.size(); final List> sinkFutures = new ArrayList<>(sinksSize); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..8fb314fd83 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -137,7 +137,8 @@ private void doRun() { try { records = processor.execute(records); - if (inputEvents != null) { + // acknowledge missing events only if the processor is not holding events + if (!processor.holdsEvents() && inputEvents != null) { processAcknowledgements(inputEvents, records); } } catch (final Exception e) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java index 230bb32780..1eb28f991b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java @@ -6,6 +6,8 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.atLeast; import org.opensearch.dataprepper.core.pipeline.common.FutureHelper; import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult; import org.opensearch.dataprepper.model.CheckpointState; @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() { } } + @Test + void testProcessWorkerWithProcessorsNotHoldingEvents() { + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + Event event = mock(Event.class); + Record record = mock(Record.class); + when(eventHandle.release(true)).thenReturn(true); + lenient().when(event.getEventHandle()).thenReturn(eventHandle); + when(record.getData()).thenReturn(event); + final List records = List.of(record); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor1 = mock(Processor.class); + when(processor1.holdsEvents()).thenReturn(false); + when(processor1.execute(records)).thenReturn(List.of()); + when(processor1.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor1); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + + verify(eventHandle, atLeast(1)).release(true); + } + + + @Test + void testProcessWorkerWithProcessorsHoldingEvents() { + EventHandle eventHandle = mock(EventHandle.class); + Event event = mock(Event.class); + Record record = mock(Record.class); + lenient().when(event.getEventHandle()).thenReturn(eventHandle); + when(record.getData()).thenReturn(event); + final List records = List.of(record); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor1 = mock(Processor.class); + when(processor1.holdsEvents()).thenReturn(true); + when(processor1.execute(records)).thenReturn(List.of()); + when(processor1.isReadyForShutdown()).thenReturn(true); + + processors = List.of(processor1); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + + verify(eventHandle, never()).release(true); + } + @Test void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() { diff --git a/data-prepper-plugins/aggregate-processor/build.gradle b/data-prepper-plugins/aggregate-processor/build.gradle index 9a3eb4551a..bc2f398b4b 100644 --- a/data-prepper-plugins/aggregate-processor/build.gradle +++ b/data-prepper-plugins/aggregate-processor/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation project(':data-prepper-expression') implementation project(':data-prepper-plugins:otel-proto-common') implementation project(':data-prepper-plugins:otel-metrics-raw-processor') + testImplementation project(':data-prepper-core') implementation libs.guava.core implementation libs.commons.lang3 implementation libs.opentelemetry.proto diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java index ae798af032..541cd15d3d 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import java.util.Collections; @@ -29,6 +30,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc return AggregateActionResponse.fromEvent(event); } + /** + * indicates if the action holds the events or not + * + */ + default boolean holdsEvents() { + return false; + } + /** * Concludes a group of Events * @@ -38,6 +47,12 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc * @since 1.3 */ default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { + if (aggregateActionInput != null) { + EventHandle eventHandle = aggregateActionInput.getEventHandle(); + if (eventHandle != null) { + eventHandle.release(true); + } + } return new AggregateActionOutput(Collections.emptyList()); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java index 0bec0b2350..cd7b47d66e 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.EventHandle; + import java.util.Map; import java.util.function.Function; import java.time.Duration; @@ -28,6 +30,12 @@ public interface AggregateActionInput { */ Map getIdentificationKeys(); + /** + * @return returns eventHandle held by the instance + * @since 2.11 + */ + EventHandle getEventHandle(); + /** * Sets custom shouldConclude function * diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java index 09e0e97223..14adde221f 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.Event; + import java.time.Duration; import java.time.Instant; import java.util.function.Function; @@ -19,6 +24,7 @@ class AggregateGroup implements AggregateActionInput { private final Lock handleEventForGroupLock; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; AggregateGroup(final Map identificationKeys) { this.groupState = new DefaultGroupState(); @@ -26,6 +32,19 @@ class AggregateGroup implements AggregateActionInput { this.groupStart = Instant.now(); this.concludeGroupLock = new ReentrantLock(); this.handleEventForGroupLock = new ReentrantLock(); + this.eventHandle = new AggregateEventHandle(Instant.now()); + } + + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + + public void attachToEventAcknowledgementSet(Event event) { + InternalEventHandle internalEventHandle; + EventHandle handle = event.getEventHandle(); + internalEventHandle = (InternalEventHandle)(handle); + internalEventHandle.addEventHandle(eventHandle); } public GroupState getGroupState() { @@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) { void resetGroup() { groupStart = Instant.now(); groupState.clear(); + this.eventHandle = new AggregateEventHandle(groupStart); } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java index 9d271aa40b..dedf1edde0 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java @@ -26,6 +26,7 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap())); } + List> getGroupsToConclude(final boolean forceConclude) { final List> groupsToConclude = new ArrayList<>(); for (final Map.Entry groupEntry : allGroups.entrySet()) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 2b19e98516..616c3c5ea8 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting); } + AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) { + AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + aggregateGroup.attachToEventAcknowledgementSet(event); + return aggregateGroup; + } + @Override public Collection> doExecute(Collection> records) { final List> recordsOut = new LinkedList<>(); @@ -124,7 +130,7 @@ public Collection> doExecute(Collection> records) { continue; } final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event); final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); @@ -149,6 +155,11 @@ public Collection> doExecute(Collection> records) { return recordsOut; } + @Override + public boolean holdsEvents() { + return aggregateAction.holdsEvents(); + } + public static long getTimeNanos(final Instant time) { final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java index ece5212ac4..0d930b94ba 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 8b67ca64cd..16bbf39c31 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { Integer countValue = (Integer)groupState.get(countKey); @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withValue((double)countValue) .withExemplars(List.of(exemplar)) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)sum; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index 22cfa7efb7..ac1a59a712 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { List explicitBoundsList = new ArrayList(); @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withExplicitBoundsList(explicitBoundsList) .withExemplars(exemplarList) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)histogram; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java index b243dd5ef0..9b27a49dee 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java @@ -47,4 +47,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } return AggregateActionResponse.nullEventResponse(); } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java index 78debabb35..54e0e2c72c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index 3ea0d0b8af..ce8131b95c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -42,4 +42,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } return new AggregateActionResponse(event); } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java index 26b245da73..fc347e0105 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; @@ -15,8 +16,9 @@ import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import java.util.List; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.time.Duration; import java.time.Instant; @@ -70,14 +72,27 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } + @Override + public boolean holdsEvents() { + return true; + } + @Override public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { GroupState groupState = aggregateActionInput.getGroupState(); int randomInt = random.nextInt(100); + aggregateActionInput.getEventHandle().release(true); if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) { return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of())); } - return new AggregateActionOutput(List.of()); + List events = (List)groupState.getOrDefault(EVENTS_KEY, List.of()); + for (final Event event : events) { + EventHandle eventHandle = event.getEventHandle(); + if (eventHandle != null) { + eventHandle.release(true); + } + } + return new AggregateActionOutput(Collections.emptyList()); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java index 21e49e05be..b46d2bdaab 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.time.Duration; +import java.time.Instant; import java.util.Map; import java.util.HashMap; import java.time.Duration; @@ -15,10 +20,12 @@ public static class TestAggregateActionInput implements AggregateActionInput { private final GroupState groupState; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; public TestAggregateActionInput(Map identificationKeys) { this.groupState = new AggregateActionTestUtils.TestGroupState(); this.identificationKeys = identificationKeys; + this.eventHandle = new AggregateEventHandle(Instant.now()); } @Override @@ -31,6 +38,11 @@ public GroupState getGroupState() { return groupState; } + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + @Override public Map getIdentificationKeys() { return identificationKeys; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index a7608decec..0a5dbd6117 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -11,9 +11,14 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSet; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetMetrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,6 +57,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadLocalRandom; @@ -105,6 +111,7 @@ public class AggregateProcessorIT { private Collection> eventBatch; private ConcurrentLinkedQueue> aggregatedResult; private Set> uniqueEventMaps; + private Set eventHandles; @Mock private PluginFactory pluginFactory; @@ -114,6 +121,7 @@ public class AggregateProcessorIT { @BeforeEach void setup() { + eventHandles = new HashSet<>(); aggregatedResult = new ConcurrentLinkedQueue<>(); uniqueEventMaps = new HashSet<>(); @@ -186,7 +194,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -213,7 +221,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted } } - @RepeatedTest(value = 2) + @RepeatedTest(value = 1) void aggregateWithPutAllActionAndCondition() throws InterruptedException { aggregateAction = new PutAllAggregateAction(); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) @@ -222,9 +230,12 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition); when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); int count = 0; for (Record record: eventBatch) { Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); boolean value = (count % 2 == 0) ? true : false; when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(value); if (!value) { @@ -238,8 +249,9 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + final List> allRecordsOut = new ArrayList<>(); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { final List> recordsOut = (List>) objectUnderTest.doExecute(eventBatch); @@ -247,6 +259,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { final Map map = record.getData().toMap(); aggregatedResult.add(map); } + allRecordsOut.addAll(recordsOut); countDownLatch.countDown(); }); } @@ -259,6 +272,11 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { for (final Map uniqueEventMap : uniqueEventMaps) { assertThat(aggregatedResult, hasItem(uniqueEventMap)); } + for (Record record: allRecordsOut) { + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); + } } @ParameterizedTest @@ -276,7 +294,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); AtomicInteger allowedEventsCount = new AtomicInteger(0); for (int i = 0; i < NUM_THREADS; i++) { @@ -309,7 +327,7 @@ void aggregateWithRateLimiterAction() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -344,7 +362,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -364,8 +382,8 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { assertThat(aggregatedResult.size(), equalTo(NUM_THREADS * NUM_EVENTS_PER_BATCH)); } - @RepeatedTest(value = 2) - void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + @RepeatedTest(value = 1) + void aggregateWithCountAggregateActionKK() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); @@ -373,6 +391,12 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + for (Record record: eventBatch) { + Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); + } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -385,7 +409,7 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -399,10 +423,13 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel final Record record = (Record)results.toArray()[0]; expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @RepeatedTest(value = 2) - void aggregateWithCountAggregateActionWithCondition() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + void aggregateWithCountAggregateActionWithConditionPP() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); @@ -414,16 +441,19 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); int count = 0; eventBatch = getBatchOfEvents(true); + + final AggregateProcessor objectUnderTest = createObjectUnderTest(); + + final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); for (Record record: eventBatch) { Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); boolean value = (count % 2 == 0) ? true : false; when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(value); count++; } - - final AggregateProcessor objectUnderTest = createObjectUnderTest(); - - final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { @@ -432,7 +462,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -446,6 +476,9 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio final Record record = (Record)results.toArray()[0]; expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @RepeatedTest(value = 2) @@ -460,6 +493,12 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + for (Record record: eventBatch) { + Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); + } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -474,7 +513,7 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte }); } // wait longer so that the raw events are processed. - Thread.sleep(2*GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(2*GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -489,6 +528,9 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte assertTrue(record.getData().getMetadata().hasTags(List.of(tag))); expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @@ -518,10 +560,13 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); for (final Record record : eventBatch) { final double value = ThreadLocalRandom.current().nextDouble(TEST_VALUE_RANGE_MIN-TEST_VALUE_RANGE_STEP, TEST_VALUE_RANGE_MAX+TEST_VALUE_RANGE_STEP); Event event = record.getData(); event.put(testKey, value); + acknowledgementSet.add(event.getEventHandle()); } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -556,6 +601,9 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch for (int i = 0; i < testBuckets.size(); i++) { assertThat(testBuckets.get(i).doubleValue(), equalTo(bucketsInResult.get(i))); } + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @ParameterizedTest @@ -581,7 +629,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc final int numberOfSpans = 5; eventBatch = getBatchOfEventsForTailSampling(numberOfErrorTraces, numberOfSpans); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { @@ -590,7 +638,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); List errorEventList = eventBatch.stream().map(Record::getData).filter(event -> { diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java new file mode 100644 index 0000000000..1534c65de3 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java @@ -0,0 +1,621 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.aggregate; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.core.pipeline.common.FutureHelper; +import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSet; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetMetrics; +import org.opensearch.dataprepper.core.pipeline.ProcessWorker; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.AppendAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.AppendAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.HistogramAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.HistogramAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterMode; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RemoveDuplicatesAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PutAllAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.OutputFormat; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; + +import static org.awaitility.Awaitility.await; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.hamcrest.MatcherAssert.assertThat; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; + + +import org.apache.commons.lang3.RandomStringUtils; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executors; + + +public class AggregateProcessorITWithAcks { + private static final int testValue = 1; + private static final int GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE = 2; + private static final int NUM_UNIQUE_EVENTS_PER_BATCH = 8; + private static final int NUM_EVENTS_PER_BATCH = 5; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + + @Mock + private Pipeline pipeline; + @Mock + private Buffer buffer; + @Mock + private Source source; + @Mock + private PluginFactory pluginFactory; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private CheckpointState checkpointState; + @Mock + private PluginModel actionConfiguration; + @Mock + private AggregateProcessorConfig aggregateProcessorConfig; + private int callCount; + private boolean aggregatedResultReceived; + List> records; + private String testKey; + + private PluginMetrics pluginMetrics; + private List processors; + private List> sinkFutures; + AcknowledgementSet acknowledgementSet; + ScheduledExecutorService scheduledExecutorService; + List> aggregatedResults; + + @BeforeEach + void setup() { + testKey = UUID.randomUUID().toString(); + pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + scheduledExecutorService = Executors.newScheduledThreadPool(3); + acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List identificationKeys = new ArrayList<>(); + identificationKeys.add("firstRandomNumber"); + identificationKeys.add("secondRandomNumber"); + identificationKeys.add("thirdRandomNumber"); + callCount = 0; + aggregatedResultReceived = false; + aggregatedResults = new ArrayList<>(); + + pipeline = mock(Pipeline.class); + source = mock(Source.class); + buffer = mock(Buffer.class); + processors = List.of(); + aggregateProcessorConfig = mock(AggregateProcessorConfig.class); + actionConfiguration = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + when(pipeline.getSource()).thenReturn(source); + when(buffer.isEmpty()).thenReturn(true); + when(pipeline.getPeerForwarderDrainTimeout()).thenReturn(Duration.ofMillis(100)); + when(pipeline.getReadBatchTimeoutInMillis()).thenReturn(500); + when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(false); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(identificationKeys); + when(aggregateProcessorConfig.getWhenCondition()).thenReturn(null); + + records = getRecords(testKey, testValue, acknowledgementSet); + acknowledgementSet.complete(); + checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + + final Future sinkFuture = mock(Future.class); + sinkFutures = List.of(sinkFuture); + doAnswer( a -> { + List> receivedRecords = (List>)a.getArgument(0); + if (receivedRecords.size() > 0) { + aggregatedResults = receivedRecords; + for (Record record: receivedRecords) { + if (record.getData().getEventHandle() instanceof AggregateEventHandle) { + aggregatedResultReceived = true; + } + record.getData().getEventHandle().release(true); + } + } + + return sinkFutures; + }).when(pipeline).publishToSinks(any()); + when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration); + when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); + when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); + } + + @Test + public void testHistogramAggregation() throws Exception { + HistogramAggregateActionConfig histogramAggregateActionConfig = new HistogramAggregateActionConfig(); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "key", testKey); + final String testKeyPrefix = RandomStringUtils.randomAlphabetic(4)+"_"; + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "generatedKeyPrefix", testKeyPrefix); + final String testUnits = RandomStringUtils.randomAlphabetic(3); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "units", testUnits); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "recordMinMax", true); + List testBuckets = new ArrayList(); + testBuckets.add(10.0); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "buckets", testBuckets); + AggregateAction aggregateAction = new HistogramAggregateAction(histogramAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofMillis(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testPercentSamplerAggregation() throws Exception { + double testPercent = 50.0; + PercentSamplerAggregateActionConfig percentSamplerAggregateActionConfig = new PercentSamplerAggregateActionConfig(); + setField(PercentSamplerAggregateActionConfig.class, percentSamplerAggregateActionConfig, "percent", testPercent); + AggregateAction aggregateAction = new PercentSamplerAggregateAction(percentSamplerAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), greaterThanOrEqualTo(1)); + assertThat(aggregatedResults.size(), lessThan(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testPutAllAggregation() throws Exception { + AggregateAction aggregateAction = new PutAllAggregateAction(); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testRateLimiterDropAggregation() throws Exception { + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 1; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testRemoveDuplicatesAggregation() { + AggregateAction aggregateAction = new RemoveDuplicatesAggregateAction(); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testRateLimiterNoDropAggregation() throws Exception { + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 50; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testRateLimiterNoDropAggregationWithMultipleAcknowledgementSets() throws Exception { + AcknowledgementSet acknowledgementSet2 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + AcknowledgementSet acknowledgementSet3 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List> records2 = getRecords(testKey, 1, acknowledgementSet2); + acknowledgementSet2.complete(); + final List> records3 = getRecords(testKey, 1, acknowledgementSet3); + acknowledgementSet3.complete(); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(List.of(testKey)); + + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 50; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + buffer = mock(Buffer.class); + when(buffer.isEmpty()).thenReturn(true); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else if (callCount == 1) { + callCount++; + return Map.entry(records2, checkpointState); + } else if (callCount == 2) { + callCount++; + return Map.entry(records3, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testCountAggregationWithMultipleAcknowledgementSets() throws Exception { + AcknowledgementSet acknowledgementSet2 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + AcknowledgementSet acknowledgementSet3 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List> records2 = getRecords(testKey, 1, acknowledgementSet2); + acknowledgementSet2.complete(); + final List> records3 = getRecords(testKey, 1, acknowledgementSet3); + acknowledgementSet3.complete(); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(List.of(testKey)); + + CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); + AggregateAction aggregateAction = new CountAggregateAction(countAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + callCount = 0; + buffer = mock(Buffer.class); + when(buffer.isEmpty()).thenReturn(true); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else if (callCount == 1) { + callCount++; + return Map.entry(records2, checkpointState); + } else if (callCount == 2) { + callCount++; + return Map.entry(records3, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet2).isDone()); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet3).isDone()); + }); + } + + @Test + public void testCountAggregation() throws Exception { + CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); + AggregateAction aggregateAction = new CountAggregateAction(countAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofMillis(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testTailSamplerAggregationWithNoErrors() throws Exception { + TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig = mock(TailSamplerAggregateActionConfig.class); + final Duration testWaitPeriod = Duration.ofMillis(1); + final String testCondition = "/status == 2"; + when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(100); + when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod); + when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(testCondition); + when(expressionEvaluator.evaluateConditional(eq(testCondition), any(Event.class))).thenReturn(false); + AggregateAction aggregateAction = new TailSamplerAggregateAction(tailSamplerAggregateActionConfig, expressionEvaluator); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + + @Test + public void testTailSamplerAggregation() throws Exception { + TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig = mock(TailSamplerAggregateActionConfig.class); + final Duration testWaitPeriod = Duration.ofMillis(1); + final String testCondition = "/status == 2"; + when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(50); + when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod); + when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(testCondition); + when(expressionEvaluator.evaluateConditional(eq(testCondition), any(Event.class))).thenReturn(true); + AggregateAction aggregateAction = new TailSamplerAggregateAction(tailSamplerAggregateActionConfig, expressionEvaluator); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testAppendAggregation() throws Exception { + AppendAggregateActionConfig appendAggregateActionConfig = mock(AppendAggregateActionConfig.class); + when(appendAggregateActionConfig.getKeysToAppend()).thenReturn(List.of(testKey)); + AggregateAction aggregateAction = new AppendAggregateAction(appendAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + private List> getRecords(String key, int value, AcknowledgementSet ackSet) { + final List> events = new ArrayList<>(); + final Map eventMap = Map.of(key, value); + + for (int i = 0; i < NUM_EVENTS_PER_BATCH; i++) { + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + events.add(new Record<>(event)); + ackSet.add(event); + } + return events; + } + +} +