From b2c51c49173183287a50169116d6f4105ac2c0cb Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 21 Oct 2022 14:13:09 -0700 Subject: [PATCH] Add window and tumbling window assigner Signed-off-by: Chen Dai --- .../planner/streaming/windowing/Window.java | 26 ++++++++++++++++ .../assigner/TumblingWindowAssigner.java | 28 +++++++++++++++++ .../windowing/assigner/WindowAssigner.java | 24 ++++++++++++++ .../streaming/windowing/WindowTest.java | 21 +++++++++++++ .../assigner/TumblingWindowAssignerTest.java | 31 +++++++++++++++++++ 5 files changed, 130 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java create mode 100644 core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java create mode 100644 core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java new file mode 100644 index 0000000000..f391af453f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/Window.java @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing; + +import lombok.Data; + +/** + * A time window is a window of time interval with inclusive start time and exclusive end time. + */ +@Data +public class Window { + + private final long startTime; + + private final long endTime; + + /** + * Return the maximum timestamp (inclusive) of the window. + */ + public long maxTimestamp() { + return endTime - 1; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java new file mode 100644 index 0000000000..400812823e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssigner.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import java.util.Collections; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A tumbling window assigner assigns a single window per event timestamp + * without overlap. + */ +@RequiredArgsConstructor +public class TumblingWindowAssigner implements WindowAssigner { + + /** Window size in millisecond. */ + private final long windowSize; + + @Override + public List assign(long timestamp) { + long startTime = timestamp - timestamp % windowSize; + return Collections.singletonList(new Window(startTime, startTime + windowSize)); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java new file mode 100644 index 0000000000..dac882c5ff --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/streaming/windowing/assigner/WindowAssigner.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import java.util.List; +import org.opensearch.sql.planner.streaming.windowing.Window; + +/** + * A window assigner assigns zero or more window to an event timestamp + * based on different windowing approach. + */ +public interface WindowAssigner { + + /** + * Return window(s) assigned to the timestamp. + * @param timestamp given event timestamp + * @return windows assigned + */ + List assign(long timestamp); + +} diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java new file mode 100644 index 0000000000..9b9aafa933 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/WindowTest.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class WindowTest { + + @Test + void test() { + Window window = new Window(1000, 2000); + assertEquals(1000, window.getStartTime()); + assertEquals(2000, window.getEndTime()); + assertEquals(1999, window.maxTimestamp()); + } +} \ No newline at end of file diff --git a/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java new file mode 100644 index 0000000000..c727eee151 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/streaming/windowing/assigner/TumblingWindowAssignerTest.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.streaming.windowing.assigner; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.streaming.windowing.Window; + +class TumblingWindowAssignerTest { + + @Test + void testAssign() { + long windowSize = 1000; + TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize); + + assertEquals( + Collections.singletonList(new Window(0, windowSize)), + assigner.assign(500)); + assertEquals( + Collections.singletonList(new Window(1000, 1000 + windowSize)), + assigner.assign(1999)); + assertEquals( + Collections.singletonList(new Window(2000, 2000 + windowSize)), + assigner.assign(2000)); + } +} \ No newline at end of file