diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java index f391af453f..2a85ea391c 100644 --- a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java @@ -13,8 +13,10 @@ @Data public class Window { + /** Start timestamp (inclusive) of the time window. */ private final long startTime; + /** End timestamp (exclusive) of the time window. */ private final long endTime; /** diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java new file mode 100644 index 0000000000..c87647ce2d --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssigner.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A sliding window assigner assigns multiple overlapped window per event timestamp. + * The overlap size is determined by the given slide interval. + */ +public class SlidingWindowAssigner implements WindowAssigner { + + /** Window size in millisecond. */ + private final long windowSize; + + /** Slide size in millisecond. */ + private final long slideSize; + + /** + * Create sliding window assigner with the given window and slide size. + */ + public SlidingWindowAssigner(long windowSize, long slideSize) { + Preconditions.checkArgument(windowSize > 0, + "Window size [%s] must be positive number", windowSize); + Preconditions.checkArgument(slideSize > 0, + "Slide size [%s] must be positive number", slideSize); + this.windowSize = windowSize; + this.slideSize = slideSize; + } + + @Override + public List assign(long timestamp) { + List windows = new ArrayList<>(); + + // Assign window from the last start time to first until given timestamp outside current window + long startTime = timestamp - timestamp % slideSize; + for (Window win = window(startTime); win.maxTimestamp() >= timestamp; win = window(startTime)) { + windows.add(win); + startTime -= slideSize; + } + + // Reverse the window list for easy read and test + Collections.reverse(windows); + return windows; + } + + private Window window(long startTime) { + return new Window(startTime, startTime + windowSize); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java index 400812823e..d20c7ccd2b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java @@ -5,21 +5,29 @@ package org.opensearch.sql.planner.streaming.windowing.assigner; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; -import lombok.RequiredArgsConstructor; import org.opensearch.sql.planner.streaming.windowing.Window; /** * A tumbling window assigner assigns a single window per event timestamp * without overlap. */ -@RequiredArgsConstructor public class TumblingWindowAssigner implements WindowAssigner { /** Window size in millisecond. */ private final long windowSize; + /** + * Create tumbling window assigner with the given window size. + */ + public TumblingWindowAssigner(long windowSize) { + Preconditions.checkArgument(windowSize > 0, + "Window size [%s] must be positive number", windowSize); + this.windowSize = windowSize; + } + @Override public List assign(long timestamp) { long startTime = timestamp - timestamp % windowSize; diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java new file mode 100644 index 0000000000..fd69065742 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/SlidingWindowAssignerTest.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class SlidingWindowAssignerTest { + + @Test + void testAssignWindows() { + long windowSize = 1000; + long slideSize = 500; + SlidingWindowAssigner assigner = new SlidingWindowAssigner(windowSize, slideSize); + + assertEquals( + List.of( + new Window(0, 1000), + new Window(500, 1500)), + assigner.assign(500)); + + assertEquals( + List.of( + new Window(0, 1000), + new Window(500, 1500)), + assigner.assign(999)); + + assertEquals( + List.of( + new Window(500, 1500), + new Window(1000, 2000)), + assigner.assign(1000)); + } + + @Test + void testConstructWithIllegalArguments() { + IllegalArgumentException error1 = assertThrows(IllegalArgumentException.class, + () -> new SlidingWindowAssigner(-1, 100)); + assertEquals("Window size [-1] must be positive number", error1.getMessage()); + + IllegalArgumentException error2 = assertThrows(IllegalArgumentException.class, + () -> new SlidingWindowAssigner(1000, 0)); + assertEquals("Slide size [0] must be positive number", error2.getMessage()); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java index c727eee151..4c98c40f7a 100644 --- a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.planner.streaming.windowing.assigner; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Collections; import org.junit.jupiter.api.Test; @@ -14,18 +15,25 @@ class TumblingWindowAssignerTest { @Test - void testAssign() { + void testAssignWindow() { long windowSize = 1000; TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize); assertEquals( - Collections.singletonList(new Window(0, windowSize)), + Collections.singletonList(new Window(0, 1000)), assigner.assign(500)); assertEquals( - Collections.singletonList(new Window(1000, 1000 + windowSize)), + Collections.singletonList(new Window(1000, 2000)), assigner.assign(1999)); assertEquals( - Collections.singletonList(new Window(2000, 2000 + windowSize)), + Collections.singletonList(new Window(2000, 3000)), assigner.assign(2000)); } + + @Test + void testConstructWithIllegalWindowSize() { + IllegalArgumentException error = assertThrows(IllegalArgumentException.class, + () -> new TumblingWindowAssigner(-1)); + assertEquals("Window size [-1] must be positive number", error.getMessage()); + } } \ No newline at end of file