From d9056250c61206baa3c63bd525fbb723f2efaec3 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 28 Feb 2017 13:43:39 -0800 Subject: [PATCH] fix Intermittent test failures in ProxyPublishConsumeTest.socketTest --- .../proxy/ProxyAuthenticationTest.java | 17 ++++++++++++--- .../proxy/ProxyPublishConsumeTest.java | 21 +++++++++++++++---- .../proxy/ProxyPublishConsumeTls.java | 17 ++++++++++++--- .../ProxyPublishConsumeWithoutZKTest.java | 18 +++++++++++++--- 4 files changed, 60 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 5fb752f039031..cb5f947bd1aa2 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -16,10 +16,12 @@ */ package com.yahoo.pulsar.websocket.proxy; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -110,12 +112,21 @@ public void socketTest() throws InterruptedException { } catch (Throwable t) { log.error(t.getMessage()); } finally { + ExecutorService executor = newFixedThreadPool(1); try { - consumeClient.stop(); - produceClient.stop(); + executor.submit(() -> { + try { + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); } catch (Exception e) { - log.error(e.getMessage()); + log.error("failed to close clients ", e); } + executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 881c66f4b5bc9..cd6d7f084b3d7 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -19,7 +19,11 @@ import static org.mockito.Mockito.spy; import java.net.URI; +import static java.util.concurrent.Executors.newFixedThreadPool; + +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.test.PortManager; import org.eclipse.jetty.websocket.api.Session; @@ -71,7 +75,7 @@ protected void cleanup() throws Exception { log.info("Finished Cleaning Up Test setup"); } - @Test(timeOut=30000) + @Test(timeOut=10000) public void socketTest() throws Exception { URI consumeUri = URI.create(CONSUME_URI); URI produceUri = URI.create(PRODUCE_URI); @@ -101,12 +105,21 @@ public void socketTest() throws Exception { Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } finally { + ExecutorService executor = newFixedThreadPool(1); try { - consumeClient.stop(); - produceClient.stop(); + executor.submit(() -> { + try { + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); } catch (Exception e) { - log.error(e.getMessage()); + log.error("failed to close clients ", e); } + executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java index 404d9c8d69830..b79022c987488 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeTls.java @@ -15,6 +15,7 @@ */ package com.yahoo.pulsar.websocket.proxy; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; @@ -22,6 +23,7 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -129,12 +131,21 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException, } catch (Throwable t) { log.error(t.getMessage()); } finally { + ExecutorService executor = newFixedThreadPool(1); try { - consumeClient.stop(); - produceClient.stop(); + executor.submit(() -> { + try { + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); } catch (Exception e) { - log.error(e.getMessage()); + log.error("failed to close clients ", e); } + executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 791c53b31ae6a..43e3088f78e80 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -15,11 +15,14 @@ */ package com.yahoo.pulsar.websocket.proxy; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import java.net.URI; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.test.PortManager; import org.eclipse.jetty.websocket.api.Session; @@ -100,12 +103,21 @@ public void socketTest() throws Exception { Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } finally { + ExecutorService executor = newFixedThreadPool(1); try { - consumeClient.stop(); - produceClient.stop(); + executor.submit(() -> { + try { + consumeClient.stop(); + produceClient.stop(); + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); } catch (Exception e) { - log.error(e.getMessage()); + log.error("failed to close clients ", e); } + executor.shutdownNow(); } }