Skip to content

Commit

Permalink
Add sliding window assigner
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen committed Oct 21, 2022
1 parent b2c51c4 commit 81ba090
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
@Data
public class Window {

/** Start timestamp (inclusive) of the time window. */
private final long startTime;

/** End timestamp (exclusive) of the time window. */
private final long endTime;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A sliding window assigner assigns multiple overlapped window per event timestamp.
* The overlap size is determined by the given slide interval.
*/
public class SlidingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;

/** Slide size in millisecond. */
private final long slideSize;

/**
* Create sliding window assigner with the given window and slide size.
*/
public SlidingWindowAssigner(long windowSize, long slideSize) {
Preconditions.checkArgument(windowSize > 0,
"Window size [%s] must be positive number", windowSize);
Preconditions.checkArgument(slideSize > 0,
"Slide size [%s] must be positive number", slideSize);
this.windowSize = windowSize;
this.slideSize = slideSize;
}

@Override
public List<Window> assign(long timestamp) {
List<Window> windows = new ArrayList<>();

// Assign window from the last start time to first until given timestamp outside current window
long startTime = timestamp - timestamp % slideSize;
for (Window win = window(startTime); win.maxTimestamp() >= timestamp; win = window(startTime)) {
windows.add(win);
startTime -= slideSize;
}

// Reverse the window list for easy read and test
Collections.reverse(windows);
return windows;
}

private Window window(long startTime) {
return new Window(startTime, startTime + windowSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@

package org.opensearch.sql.planner.streaming.windowing.assigner;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* A tumbling window assigner assigns a single window per event timestamp
* without overlap.
*/
@RequiredArgsConstructor
public class TumblingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;

/**
* Create tumbling window assigner with the given window size.
*/
public TumblingWindowAssigner(long windowSize) {
Preconditions.checkArgument(windowSize > 0,
"Window size [%s] must be positive number", windowSize);
this.windowSize = windowSize;
}

@Override
public List<Window> assign(long timestamp) {
long startTime = timestamp - timestamp % windowSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.streaming.windowing.assigner;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.planner.streaming.windowing.Window;

class SlidingWindowAssignerTest {

@Test
void testAssignWindows() {
long windowSize = 1000;
long slideSize = 500;
SlidingWindowAssigner assigner = new SlidingWindowAssigner(windowSize, slideSize);

assertEquals(
List.of(
new Window(0, 1000),
new Window(500, 1500)),
assigner.assign(500));

assertEquals(
List.of(
new Window(0, 1000),
new Window(500, 1500)),
assigner.assign(999));

assertEquals(
List.of(
new Window(500, 1500),
new Window(1000, 2000)),
assigner.assign(1000));
}

@Test
void testConstructWithIllegalArguments() {
IllegalArgumentException error1 = assertThrows(IllegalArgumentException.class,
() -> new SlidingWindowAssigner(-1, 100));
assertEquals("Window size [-1] must be positive number", error1.getMessage());

IllegalArgumentException error2 = assertThrows(IllegalArgumentException.class,
() -> new SlidingWindowAssigner(1000, 0));
assertEquals("Slide size [0] must be positive number", error2.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.planner.streaming.windowing.assigner;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Collections;
import org.junit.jupiter.api.Test;
Expand All @@ -14,18 +15,25 @@
class TumblingWindowAssignerTest {

@Test
void testAssign() {
void testAssignWindow() {
long windowSize = 1000;
TumblingWindowAssigner assigner = new TumblingWindowAssigner(windowSize);

assertEquals(
Collections.singletonList(new Window(0, windowSize)),
Collections.singletonList(new Window(0, 1000)),
assigner.assign(500));
assertEquals(
Collections.singletonList(new Window(1000, 1000 + windowSize)),
Collections.singletonList(new Window(1000, 2000)),
assigner.assign(1999));
assertEquals(
Collections.singletonList(new Window(2000, 2000 + windowSize)),
Collections.singletonList(new Window(2000, 3000)),
assigner.assign(2000));
}

@Test
void testConstructWithIllegalWindowSize() {
IllegalArgumentException error = assertThrows(IllegalArgumentException.class,
() -> new TumblingWindowAssigner(-1));
assertEquals("Window size [-1] must be positive number", error.getMessage());
}
}

0 comments on commit 81ba090

Please sign in to comment.