Skip to content

Commit

Permalink
refactor: process review comments
Browse files Browse the repository at this point in the history
Renamed runProcessingCallbacks() to notifyProcessingCallbacks()

Refactored the EngineStateMonitorTest
- Extracted some shared functionality to a beforeEach method
- Changed the structure of the tests to be more clear what it does. Given a callback, when the engine is busy/idle and we register a callback, then the callback should be notified.
- Explicitly change the to an idle state in the idle state tests (even if we are already idle)
  • Loading branch information
remcowesterhoud committed Mar 2, 2022
1 parent e69b281 commit 1c7b853
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ private void checkEngineStateAndNotifyCallbacks() {
scheduleIdleStateNotification();
} else {
cancelIdleStateNotification();
runProcessingCallbacks();
notifyProcessingCallbacks();
}
}
}

private void runProcessingCallbacks() {
private void notifyProcessingCallbacks() {
synchronized (processingCallbacks) {
processingCallbacks.forEach(Runnable::run);
processingCallbacks.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,32 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class EngineStateMonitorTest {

private InMemoryLogStorage logStorage;
private TestLogStreamReader logStreamReader;
private EngineStateMonitor monitor;

@BeforeEach
void beforeEach() {
logStorage = mock(InMemoryLogStorage.class);
logStreamReader = new TestLogStreamReader();
monitor = new EngineStateMonitor(logStorage, logStreamReader);
}

@Test
void testCallbackIsCalledWhenEngineIsAlreadyIdle()
void testOnIdleCallbackIsCalledWhenEngineIsAlreadyIdle()
throws ExecutionException, InterruptedException, TimeoutException {
// given
final InMemoryLogStorage logStorage = mock(InMemoryLogStorage.class);
final TestLogStreamReader logStreamReader = new TestLogStreamReader();
final EngineStateMonitor monitor = new EngineStateMonitor(logStorage, logStreamReader);

// when
final CompletableFuture<Void> callbackFuture = new CompletableFuture<>();
final Runnable callback = () -> callbackFuture.complete(null);

// when
changeToIdleState(monitor, logStreamReader, false);
monitor.addOnIdleCallback(callback);

// then
Expand All @@ -33,99 +44,92 @@ void testCallbackIsCalledWhenEngineIsAlreadyIdle()
}

@Test
void testCallbackIsCalledWhenIdleStateIsReached()
void testOnIdleCallbackIsCalledWhenIdleStateIsReached()
throws ExecutionException, InterruptedException, TimeoutException {
// given
final InMemoryLogStorage logStorage = mock(InMemoryLogStorage.class);
final TestLogStreamReader logStreamReader = new TestLogStreamReader();
final EngineStateMonitor monitor = new EngineStateMonitor(logStorage, logStreamReader);
changeToBusyState(monitor, logStreamReader);

// when
final CompletableFuture<Void> callbackFuture = new CompletableFuture<>();
final Runnable callback = () -> callbackFuture.complete(null);

// when
changeToBusyState(monitor, logStreamReader, false);
monitor.addOnIdleCallback(callback);

// then
callbackFuture.get(1L, TimeUnit.SECONDS);
callbackFuture.get(50L, TimeUnit.MILLISECONDS);
assertThat(callbackFuture).isCompleted();
}

@Test
void testCallbackIsCalledWhenEngineIsAlreadyBusy()
void testOnProcessingCallbackIsCalledWhenEngineIsAlreadyBusy()
throws ExecutionException, InterruptedException, TimeoutException {
// given
final InMemoryLogStorage logStorage = mock(InMemoryLogStorage.class);
final TestLogStreamReader logStreamReader = new TestLogStreamReader();
final EngineStateMonitor monitor = new EngineStateMonitor(logStorage, logStreamReader);
changeToBusyState(monitor, logStreamReader);
// We need to set the position of the reader high enough the so hasNext() resolves to false. If
// we don't do this adding the callback will make the engine idle because of the
// forwardToLastEvent() method in the EngineStateMonitor, and the callback will never get called
logStreamReader.setPosition(100L);

// when
final CompletableFuture<Void> callbackFuture = new CompletableFuture<>();
final Runnable callback = () -> callbackFuture.complete(null);

// when
changeToBusyState(monitor, logStreamReader, true);
monitor.addOnProcessingCallback(callback);

// then
callbackFuture.get(1L, TimeUnit.SECONDS);
callbackFuture.get(50L, TimeUnit.MILLISECONDS);
assertThat(callbackFuture).isCompleted();
}

@Test
void testCallbackIsCalledWhenBusyStateIsReached()
void testOnProcessingCallbackIsCalledWhenBusyStateIsReached()
throws ExecutionException, InterruptedException, TimeoutException {
// given
final InMemoryLogStorage logStorage = mock(InMemoryLogStorage.class);
final TestLogStreamReader logStreamReader = new TestLogStreamReader();
final EngineStateMonitor monitor = new EngineStateMonitor(logStorage, logStreamReader);

final CompletableFuture<Void> callbackFuture = new CompletableFuture<>();
final Runnable callback = () -> callbackFuture.complete(null);
monitor.addOnProcessingCallback(callback);

// when
changeToBusyState(monitor, logStreamReader);
changeToBusyState(monitor, logStreamReader, true);
monitor.addOnProcessingCallback(callback);

// then
callbackFuture.get(1L, TimeUnit.SECONDS);
callbackFuture.get(50L, TimeUnit.MILLISECONDS);
assertThat(callbackFuture).isCompleted();
}

private void changeToIdleState(
final EngineStateMonitor monitor, final TestLogStreamReader reader) {
final EngineStateMonitor monitor, final TestLogStreamReader reader, final boolean lockState) {
// We use onCommit here because it is an easy way to trigger the EngineStateMonitor to check the
// engine state and trigger the callbacks
reader.setStateLocked(lockState);
reader.setPosition(reader.getLastEventPosition());
monitor.onCommit();
}

private void changeToBusyState(
final EngineStateMonitor monitor, final TestLogStreamReader reader) {
final EngineStateMonitor monitor, final TestLogStreamReader reader, final boolean lockState) {
// We use onReplayed here because it is an easy way to update the lastProcessedEventPosition of
// the EngineStateMonitor
reader.setStateLocked(lockState);
final long position = reader.getLastEventPosition() + 1;
monitor.onReplayed(position, -1L);
reader.setLastEventPosition(position);
}

private class TestLogStreamReader implements LogStreamReader {

private boolean stateLocked = false;
private long position = 0L;
private long lastEventPosition = 0L;

void setStateLocked(final boolean stateLocked) {
this.stateLocked = stateLocked;
}

long getLastEventPosition() {
return lastEventPosition;
return lastEventPosition;
}

void setLastEventPosition(final long position) {
lastEventPosition = position;
lastEventPosition = position;
}

void setPosition(final long position) {
this.position = position;
this.position = position;
}

@Override
Expand Down Expand Up @@ -161,7 +165,7 @@ public void close() {}

@Override
public boolean hasNext() {
return position < lastEventPosition;
return !stateLocked && position < lastEventPosition;
}

@Override
Expand Down

0 comments on commit 1c7b853

Please sign in to comment.