diff --git a/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java index beee1c5ca21a0..3d6700118ea57 100644 --- a/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/opensearch/client/RestClientSingleHostIntegTests.java @@ -48,6 +48,7 @@ import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.http.message.BasicHeader; @@ -73,6 +74,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -298,37 +300,70 @@ public void testRequestResetAndAbort() throws Exception { httpGet.reset(); assertFalse(httpGet.isAborted()); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); - httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); - httpGet.abort(); + final Phaser phaser = new Phaser(2); + phaser.register(); try { - future.get(); - fail("expected cancellation exception"); - } catch (CancellationException e) { - // expected + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); + httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + httpGet.abort(); + + try { + phaser.arriveAndDeregister(); + future.get(); + fail("expected cancellation exception"); + } catch (CancellationException e) { + // expected + } + assertTrue(future.isCancelled()); + } finally { + // Forcing termination since the AsyncResponseConsumer may not be reached, + // the request is aborted right before + phaser.forceTermination(); } - assertTrue(future.isCancelled()); } { - httpGet.reset(); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); - assertFalse(httpGet.isAborted()); - httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); - httpGet.abort(); - assertTrue(httpGet.isAborted()); + final Phaser phaser = new Phaser(2); + phaser.register(); + try { - assertTrue(future.isCancelled()); - future.get(); - throw new AssertionError("exception should have been thrown"); - } catch (CancellationException e) { - // expected + httpGet.reset(); + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); + assertFalse(httpGet.isAborted()); + httpGet.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + httpGet.abort(); + assertTrue(httpGet.isAborted()); + try { + phaser.arriveAndDeregister(); + assertTrue(future.isCancelled()); + future.get(); + throw new AssertionError("exception should have been thrown"); + } catch (CancellationException e) { + // expected + } + } finally { + // Forcing termination since the AsyncResponseConsumer may not be reached, + // the request is aborted right before + phaser.forceTermination(); } } { httpGet.reset(); assertFalse(httpGet.isAborted()); - Future future = client.execute(getRequestProducer(httpGet, httpHost), getResponseConsumer(), null); + final Phaser phaser = new Phaser(0); + Future future = client.execute( + getRequestProducer(httpGet, httpHost), + getResponseConsumer(phaser), + null + ); assertFalse(httpGet.isAborted()); assertEquals(200, future.get().getCode()); assertFalse(future.isCancelled()); @@ -554,8 +589,15 @@ private Response bodyTest(RestClient restClient, String method, int statusCode, return esResponse; } - private AsyncResponseConsumer getResponseConsumer() { - return new HeapBufferedAsyncResponseConsumer(1024); + private AsyncResponseConsumer getResponseConsumer(Phaser phaser) { + phaser.register(); + return new HeapBufferedAsyncResponseConsumer(1024) { + @Override + protected ClassicHttpResponse buildResult(HttpResponse response, byte[] entity, ContentType contentType) { + phaser.arriveAndAwaitAdvance(); + return super.buildResult(response, entity, contentType); + } + }; } private HttpUriRequestProducer getRequestProducer(HttpUriRequestBase request, HttpHost host) {