Skip to content

Commit

Permalink
Add window and tumbling window assigner
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Oct 21, 2022
1 parent 3a9d217 commit b2c51c4
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing;

import lombok.Data;

/**
* A time window is a window of time interval with inclusive start time and exclusive end time.
*/
@Data
public class Window {

private final long startTime;

private final long endTime;

/**
* Return the maximum timestamp (inclusive) of the window.
*/
public long maxTimestamp() {
return endTime - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A tumbling window assigner assigns a single window per event timestamp
* without overlap.
*/
@RequiredArgsConstructor
public class TumblingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;

@Override
public List<Window> assign(long timestamp) {
long startTime = timestamp - timestamp % windowSize;
return Collections.singletonList(new Window(startTime, startTime + windowSize));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import java.util.List;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A window assigner assigns zero or more window to an event timestamp
* based on different windowing approach.
*/
public interface WindowAssigner {

/**
* Return window(s) assigned to the timestamp.
* @param timestamp given event timestamp
* @return windows assigned
*/
List<Window> assign(long timestamp);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

class WindowTest {

@Test
void test() {
Window window = new Window(1000, 2000);
assertEquals(1000, window.getStartTime());
assertEquals(2000, window.getEndTime());
assertEquals(1999, window.maxTimestamp());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.windowing.Window;

class TumblingWindowAssignerTest {

@Test
void testAssign() {
long windowSize = 1000;
TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize);

assertEquals(
Collections.singletonList(new Window(0, windowSize)),
assigner.assign(500));
assertEquals(
Collections.singletonList(new Window(1000, 1000 + windowSize)),
assigner.assign(1999));
assertEquals(
Collections.singletonList(new Window(2000, 2000 + windowSize)),
assigner.assign(2000));
}
}

0 comments on commit b2c51c4

Please sign in to comment.