diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index efe80922dfa8c..5a3332042938d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import lombok.Cleanup; @@ -185,6 +186,7 @@ public void testUserExceptionThrowingAsyncFunction() throws Exception { @Test public void testAsyncFunctionMaxPending() throws Exception { + CountDownLatch count = new CountDownLatch(1); InstanceConfig instanceConfig = new InstanceConfig(); int pendingQueueSize = 3; instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize); @@ -196,7 +198,7 @@ public void testAsyncFunctionMaxPending() throws Exception { CompletableFuture result = new CompletableFuture<>(); executor.submit(() -> { try { - Thread.sleep(500); + count.await(); result.complete(String.format("%s-lambda", input)); } catch (Exception e) { result.completeExceptionally(e); @@ -222,8 +224,13 @@ public void testAsyncFunctionMaxPending() throws Exception { // no space left assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity()); + AsyncFuncRequest[] asyncFuncRequests = new AsyncFuncRequest[3]; for (int i = 0; i < 3; i++) { - AsyncFuncRequest request = instance.getPendingAsyncRequests().poll(); + asyncFuncRequests[i] = instance.getPendingAsyncRequests().poll(); + } + + count.countDown(); + for (AsyncFuncRequest request : asyncFuncRequests) { Assert.assertEquals(request.getProcessResult().get(), testString + "-lambda"); }