Skip to content

Commit

Permalink
JAVA-3077: ListenersIT intermittently failing with: Wanted but not in…
Browse files Browse the repository at this point in the history
…voked: schemaListener1.onSessionReady (#1670)

ListenersIT.java:
- Add 500ms wait on SchemaListener#onSessionReady call verification
- Add latch to wait for MySchemaChangeListener#onSessionReady
- Add some comments around which listeners/methods need synchronizations and why/not
  • Loading branch information
hhughes authored Jul 20, 2023
1 parent 9d891b1 commit e8b25ab
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.datastax.oss.driver.api.core.CqlSession;
Expand All @@ -33,9 +34,12 @@
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -90,6 +94,9 @@ public void should_inject_session_in_listeners() throws Exception {
.build())
.build()) {

// These NodeStateListeners are wrapped with SafeInitNodeStateListener which delays #onUp
// callbacks until #onSessionReady is called, these will all happen during session
// initialization
InOrder inOrder1 = inOrder(nodeListener1);
inOrder1.verify(nodeListener1).onSessionReady(session);
inOrder1.verify(nodeListener1).onUp(nodeCaptor1.capture());
Expand All @@ -104,20 +111,29 @@ public void should_inject_session_in_listeners() throws Exception {
assertThat(nodeCaptor2.getValue().getEndPoint())
.isEqualTo(SIMULACRON_RULE.getContactPoints().iterator().next());

verify(schemaListener1).onSessionReady(session);
verify(schemaListener2).onSessionReady(session);
// SchemaChangeListener#onSessionReady is called asynchronously from AdminExecutor so we may
// have to wait a little
verify(schemaListener1, timeout(500).times(1)).onSessionReady(session);
verify(schemaListener2, timeout(500).times(1)).onSessionReady(session);

// Request tracker #onSessionReady is called synchronously during session initialization
verify(requestTracker1).onSessionReady(session);
verify(requestTracker2).onSessionReady(session);

assertThat(MyNodeStateListener.onSessionReadyCalled).isTrue();
assertThat(MyNodeStateListener.onUpCalled).isTrue();

assertThat(MySchemaChangeListener.onSessionReadyCalled).isTrue();
// SchemaChangeListener#onSessionReady is called asynchronously from AdminExecutor so we may
// have to wait a little
assertThat(
Uninterruptibles.awaitUninterruptibly(
MySchemaChangeListener.onSessionReadyLatch, 500, TimeUnit.MILLISECONDS))
.isTrue();

assertThat(MyRequestTracker.onSessionReadyCalled).isTrue();
}

// CqlSession#close waits for all listener close methods to be called
verify(nodeListener1).close();
verify(nodeListener2).close();

Expand Down Expand Up @@ -163,14 +179,14 @@ public void close() {

public static class MySchemaChangeListener extends SchemaChangeListenerBase {

private static volatile boolean onSessionReadyCalled = false;
private static CountDownLatch onSessionReadyLatch = new CountDownLatch(1);
private static volatile boolean closeCalled = false;

public MySchemaChangeListener(@SuppressWarnings("unused") DriverContext ignored) {}

@Override
public void onSessionReady(@NonNull Session session) {
onSessionReadyCalled = true;
onSessionReadyLatch.countDown();
}

@Override
Expand Down

0 comments on commit e8b25ab

Please sign in to comment.