Skip to content

Commit

Permalink
fix: wait for processing start when increasing time
Browse files Browse the repository at this point in the history
When we increase the time it takes some time before the engine has started processing again, and before it is writing new records to the stream. If we immediately wait for an idle state after increasting the time it will immediately say that it is idle, even though any potential timers have not yet been triggered.
By waiting for up to 1 second to see if the engine has started processing again we can make sure the timers have been triggered. After this we still need to wait until the engine is idle again, to not modify the existing functionality.
  • Loading branch information
remcowesterhoud committed Feb 28, 2022
1 parent d8af22b commit a5cd3f2
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -94,7 +95,7 @@ void testHasMessageStartEventNotBeenCorrelated() {
}

@Test
void testHasExpired() {
void testHasExpired() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackMessageEvent.RESOURCE_NAME);
final Duration timeToLive = Duration.ofDays(1);
Expand Down Expand Up @@ -260,7 +261,7 @@ void testHasExpiredFailure() {
}

@Test
void testHasNotExpiredFailure() {
void testHasNotExpiredFailure() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackMessageEvent.RESOURCE_NAME);
final Duration timeToLive = Duration.ofDays(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.process.test.qa.util.Utilities.ProcessPackTimerStartEvent;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -23,7 +24,7 @@ class ProcessEventInspectionsTest {
private InMemoryEngine engine;

@Test
void testFindFirstProcessInstance() {
void testFindFirstProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deploymentEvent =
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);
Expand All @@ -43,7 +44,7 @@ void testFindFirstProcessInstance() {
}

@Test
void testFindLastProcessInstance() {
void testFindLastProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deploymentEvent =
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);
Expand All @@ -63,7 +64,7 @@ void testFindLastProcessInstance() {
}

@Test
void testFindFirstProcessInstance_wrongTimer() {
void testFindFirstProcessInstance_wrongTimer() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);

Expand All @@ -79,7 +80,7 @@ void testFindFirstProcessInstance_wrongTimer() {
}

@Test
void testFindProcessInstance_highIndex() {
void testFindProcessInstance_highIndex() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -94,7 +95,7 @@ void testHasMessageStartEventNotBeenCorrelated() {
}

@Test
void testHasExpired() {
void testHasExpired() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackMessageEvent.RESOURCE_NAME);
final Duration timeToLive = Duration.ofDays(1);
Expand Down Expand Up @@ -260,7 +261,7 @@ void testHasExpiredFailure() {
}

@Test
void testHasNotExpiredFailure() {
void testHasNotExpiredFailure() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackMessageEvent.RESOURCE_NAME);
final Duration timeToLive = Duration.ofDays(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.process.test.qa.util.Utilities.ProcessPackTimerStartEvent;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -23,7 +24,7 @@ class ProcessEventInspectionsTest {
private InMemoryEngine engine;

@Test
void testFindFirstProcessInstance() {
void testFindFirstProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deploymentEvent =
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);
Expand All @@ -43,7 +44,7 @@ void testFindFirstProcessInstance() {
}

@Test
void testFindLastProcessInstance() {
void testFindLastProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deploymentEvent =
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);
Expand All @@ -63,7 +64,7 @@ void testFindLastProcessInstance() {
}

@Test
void testFindFirstProcessInstance_wrongTimer() {
void testFindFirstProcessInstance_wrongTimer() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);

Expand All @@ -79,7 +80,7 @@ void testFindFirstProcessInstance_wrongTimer() {
}

@Test
void testFindProcessInstance_highIndex() {
void testFindProcessInstance_highIndex() throws InterruptedException, TimeoutException {
// given
Utilities.deployProcess(client, ProcessPackTimerStartEvent.RESOURCE_NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public class Utilities {
Expand Down Expand Up @@ -149,14 +150,10 @@ public static PublishMessageResponse sendMessage(
return response;
}

public static void increaseTime(final InMemoryEngine engine, final Duration duration) {
public static void increaseTime(final InMemoryEngine engine, final Duration duration)
throws InterruptedException, TimeoutException {
engine.increaseTime(duration);
try {
// we need to wait some physical time so that InMemoryEngine has a chance to fire the timers
Thread.sleep(100);
} catch (InterruptedException e) {
// do nothing
}
engine.waitForProcessingState(Duration.ofSeconds(1));
waitForIdleState(engine);
}

Expand Down

0 comments on commit a5cd3f2

Please sign in to comment.