Skip to content

Commit

Permalink
fix Intermittent test failures in ProxyPublishConsumeTest.socketTest (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Mar 1, 2017
1 parent d89563c commit 8ec61b2
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package com.yahoo.pulsar.websocket.proxy;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -110,12 +112,21 @@ public void socketTest() throws InterruptedException {
} catch (Throwable t) {
log.error(t.getMessage());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
consumeClient.stop();
produceClient.stop();
executor.submit(() -> {
try {
consumeClient.stop();
produceClient.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage());
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import static org.mockito.Mockito.spy;

import java.net.URI;
import static java.util.concurrent.Executors.newFixedThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.test.PortManager;
import org.eclipse.jetty.websocket.api.Session;
Expand Down Expand Up @@ -71,7 +75,7 @@ protected void cleanup() throws Exception {
log.info("Finished Cleaning Up Test setup");
}

@Test(timeOut=30000)
@Test(timeOut=10000)
public void socketTest() throws Exception {
URI consumeUri = URI.create(CONSUME_URI);
URI produceUri = URI.create(PRODUCE_URI);
Expand Down Expand Up @@ -101,12 +105,21 @@ public void socketTest() throws Exception {
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
consumeClient.stop();
produceClient.stop();
executor.submit(() -> {
try {
consumeClient.stop();
produceClient.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage());
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package com.yahoo.pulsar.websocket.proxy;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -129,12 +131,21 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
} catch (Throwable t) {
log.error(t.getMessage());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
consumeClient.stop();
produceClient.stop();
executor.submit(() -> {
try {
consumeClient.stop();
produceClient.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage());
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package com.yahoo.pulsar.websocket.proxy;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.test.PortManager;
import org.eclipse.jetty.websocket.api.Session;
Expand Down Expand Up @@ -100,12 +103,21 @@ public void socketTest() throws Exception {
Assert.assertTrue(produceSocket.getBuffer().size() > 0);
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
consumeClient.stop();
produceClient.stop();
executor.submit(() -> {
try {
consumeClient.stop();
produceClient.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
log.error(e.getMessage());
}
}).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage());
log.error("failed to close clients ", e);
}
executor.shutdownNow();
}
}

Expand Down

0 comments on commit 8ec61b2

Please sign in to comment.