Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #146 from kennknowles/backport-pr-38
Browse files Browse the repository at this point in the history
  • Loading branch information
lukecwik committed Mar 11, 2016
2 parents 69593df + 5fbe84d commit 05755bb
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@
*/
public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> {
private final WindowFn<Object, W> windowFn;

@Nullable
private Map<W, Set<W>> activeWindowToStateAddressWindows;
private final Map<W, Set<W>> activeWindowToStateAddressWindows;

/**
* As above, but only for EPHEMERAL windows. Does not need to be persisted.
Expand All @@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi
* MERGED. Otherwise W1 is EPHEMERAL.
* </ul>
*/
@Nullable
private Map<W, W> windowToActiveWindow;
private final Map<W, W> windowToActiveWindow;

/**
* Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit.
*
* <p>Used to avoid writing to state if no changes have been made during the work unit.
*/
@Nullable
private Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;

/**
* Handle representing our state in the backend.
Expand Down Expand Up @@ -195,7 +191,12 @@ public void addActive(W window) {

@Override
public void remove(W window) {
for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) {
Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
if (stateAddressWindows == null) {
// Window is no longer active.
return;
}
for (W stateAddressWindow : stateAddressWindows) {
windowToActiveWindow.remove(stateAddressWindow);
}
activeWindowToStateAddressWindows.remove(window);
Expand Down Expand Up @@ -522,7 +523,7 @@ private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> multimap)
private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
Map<W, Set<W>> newMultimap = new HashMap<>();
for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue()));
newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
}
return newMultimap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ public void onTimer(TimerData timer) throws Exception {
// - The trigger may implement isClosed as constant false.
// - If the window function does not support windowing then all windows will be considered
// active.
// So we must combine the above.
// So we must take conjunction of activeWindows and triggerRunner state.
boolean windowIsActive =
activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());

Expand Down Expand Up @@ -602,7 +602,8 @@ private void clearAllState(
boolean windowIsActive)
throws Exception {
if (windowIsActive) {
// Since window is still active the trigger has not closed.
// Since both the window is in the active window set AND the trigger was not yet closed,
// it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
Expand All @@ -622,7 +623,10 @@ private void clearAllState(
}
}
paneInfoTracker.clear(directContext.state());
activeWindows.remove(directContext.window());
if (activeWindows.isActive(directContext.window())) {
// Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
}
// We'll never need to test for the trigger being closed again.
triggerRunner.clearFinished(directContext.state());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,43 @@ public void testMergeBeforeFinalizing() throws Exception {
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* It is possible for a session window's trigger to be closed at the point at which
* the (merged) session window is garbage collected. Make sure we don't accidentally
* assume the window is still active.
*/
@Test
public void testMergingWithCloseBeforeGC() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
ClosingBehavior.FIRE_IF_NON_EMPTY);

// Two elements in two overlapping session windows.
tester.injectElements(
TimestampedValue.of(1, new Instant(1)), // in [1, 11)
TimestampedValue.of(10, new Instant(10))); // in [10, 20)

// Close the trigger, but the gargbage collection timer is still pending.
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTrigger);
tester.advanceInputWatermark(new Instant(30));

// Now the garbage collection timer will fire, finding the trigger already closed.
tester.advanceInputWatermark(new Instant(100));

List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output.size(), equalTo(1));
assertThat(output.get(0),
isSingleWindowedValue(containsInAnyOrder(1, 10),
1, // timestamp
1, // window start
20)); // window end
assertThat(
output.get(0).getPane(),
equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
}

/**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
Expand Down

0 comments on commit 05755bb

Please sign in to comment.