From 5e6434238ae92defc47813abc23ec7947c6833a3 Mon Sep 17 00:00:00 2001 From: James Perkins Date: Mon, 14 Aug 2017 16:12:09 -0700 Subject: [PATCH] [LOGMGR-170] Ensure the reconnect thread exits if it was interrupted. --- .../logmanager/handlers/TcpOutputStream.java | 22 ++++-- .../logmanager/handlers/SimpleServer.java | 6 +- .../handlers/SocketHandlerTests.java | 74 +++++++++++++++++-- 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java b/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java index 69c40abe..ce4250ca 100644 --- a/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java +++ b/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java @@ -46,6 +46,7 @@ * * @author James R. Perkins */ +@SuppressWarnings({"unused", "WeakerAccess"}) public class TcpOutputStream extends OutputStream implements FlushableCloseable { private static final long retryTimeout = 5L; private static final long maxRetryTimeout = 40L; @@ -340,15 +341,21 @@ private class RetryConnector implements Runnable { @Override public void run() { - if (socketFactory != null) { + boolean connected = false; + while (socketFactory != null && !connected) { Socket socket = null; - boolean connected = true; try { socket = socketFactory.createSocket(address, port); synchronized (outputLock) { - TcpOutputStream.this.socket = socket; - TcpOutputStream.this.connected = connected; - reconnectThread = null; + // Unlikely but if we've been interrupted due to a close, we should shutdown + if (Thread.currentThread().isInterrupted()) { + safeClose(socket); + break; + } else { + TcpOutputStream.this.socket = socket; + TcpOutputStream.this.connected = true; + connected = true; + } } } catch (IOException e) { connected = false; @@ -363,8 +370,11 @@ public void run() { try { TimeUnit.SECONDS.sleep(Math.min(timeout, maxRetryTimeout)); } catch (InterruptedException ignore) { + synchronized (outputLock) { + TcpOutputStream.this.connected = false; + } + break; } - run(); } finally { // It's possible the thread was interrupted, if we're not connected we should clean up the socket if (!connected) { diff --git a/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java b/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java index 69bd315e..66989068 100644 --- a/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java +++ b/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java @@ -55,10 +55,14 @@ static SimpleServer createUdpServer(final int port) throws IOException { return server; } - String poll() throws InterruptedException { + String timeoutPoll() throws InterruptedException { return data.poll(10, TimeUnit.SECONDS); } + String poll() throws InterruptedException { + return data.poll(); + } + String peek() { return data.peek(); } diff --git a/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java b/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java index 606d53b6..9a2eea01 100644 --- a/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java +++ b/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java @@ -3,6 +3,8 @@ import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.jboss.logmanager.ExtLogRecord; import org.jboss.logmanager.formatters.PatternFormatter; @@ -33,7 +35,7 @@ public void testTcpConnection() throws Exception { ) { final ExtLogRecord record = createLogRecord("Test TCP handler"); handler.doPublish(record); - final String msg = server.poll(); + final String msg = server.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TCP handler", msg); } @@ -47,7 +49,7 @@ public void testTlsConnection() throws Exception { ) { final ExtLogRecord record = createLogRecord("Test TLS handler"); handler.doPublish(record); - final String msg = server.poll(); + final String msg = server.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TLS handler", msg); } @@ -61,7 +63,7 @@ public void testUdpConnection() throws Exception { ) { final ExtLogRecord record = createLogRecord("Test UDP handler"); handler.doPublish(record); - final String msg = server.poll(); + final String msg = server.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test UDP handler", msg); } @@ -74,9 +76,9 @@ public void testTcpPortChange() throws Exception { SimpleServer server2 = SimpleServer.createTcpServer(altPort); SocketHandler handler = createHandler(Protocol.TCP) ) { - ExtLogRecord record = createLogRecord("Test TCP handler " + port); + ExtLogRecord record = createLogRecord("Test TCP handler " + port); handler.doPublish(record); - String msg = server1.poll(); + String msg = server1.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TCP handler " + port, msg); @@ -84,7 +86,7 @@ public void testTcpPortChange() throws Exception { handler.setPort(altPort); record = createLogRecord("Test TCP handler " + altPort); handler.doPublish(record); - msg = server2.poll(); + msg = server2.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TCP handler " + altPort, msg); @@ -100,7 +102,7 @@ public void testProtocolChange() throws Exception { try (SimpleServer server = SimpleServer.createTcpServer(port)) { final ExtLogRecord record = createLogRecord("Test TCP handler"); handler.doPublish(record); - final String msg = server.poll(); + final String msg = server.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TCP handler", msg); } @@ -111,13 +113,51 @@ public void testProtocolChange() throws Exception { try (SimpleServer server = SimpleServer.createTlsServer(port)) { final ExtLogRecord record = createLogRecord("Test TLS handler"); handler.doPublish(record); - final String msg = server.poll(); + final String msg = server.timeoutPoll(); Assert.assertNotNull(msg); Assert.assertEquals("Test TLS handler", msg); } } } + @Test + public void testTcpReconnect() throws Exception { + try (SocketHandler handler = createHandler(Protocol.TCP)) { + + // Publish a record to a running server + try ( + SimpleServer server = SimpleServer.createTcpServer(port) + ) { + final ExtLogRecord record = createLogRecord("Test TCP handler"); + handler.doPublish(record); + final String msg = server.timeoutPoll(); + Assert.assertNotNull(msg); + Assert.assertEquals("Test TCP handler", msg); + } + + // Publish a record to a down server, this likely won't put the handler in an error state yet. However once + // we restart the server and loop the first socket should fail before a reconnect is attempted. + final ExtLogRecord record = createLogRecord("Test TCP handler"); + handler.doPublish(record); + try ( + SimpleServer server = SimpleServer.createTcpServer(port) + ) { + // Keep writing a record until a successful record is published or a timeout occurs + final String msg = timeout(() -> { + final ExtLogRecord r = createLogRecord("Test TCP handler"); + handler.doPublish(r); + try { + return server.poll(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, 10); + Assert.assertNotNull(msg); + Assert.assertEquals("Test TCP handler", msg); + } + } + } + private SocketHandler createHandler(final Protocol protocol) throws UnsupportedEncodingException { final SocketHandler handler = new SocketHandler(protocol, address, port); handler.setAutoFlush(true); @@ -126,4 +166,22 @@ private SocketHandler createHandler(final Protocol protocol) throws UnsupportedE return handler; } + + private static R timeout(final Supplier supplier, final int timeout) throws InterruptedException { + R value = null; + long t = timeout * 1000; + final long sleep = 100L; + while (t > 0) { + final long before = System.currentTimeMillis(); + value = supplier.get(); + if (value != null) { + break; + } + t -= (System.currentTimeMillis() - before); + TimeUnit.MILLISECONDS.sleep(sleep); + t -= sleep; + } + Assert.assertFalse(String.format("Failed to get value in %d seconds.", timeout), (t <= 0)); + return value; + } }