diff --git a/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java b/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java
index 69c40abe..ce4250ca 100644
--- a/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java
+++ b/src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java
@@ -46,6 +46,7 @@
*
* @author James R. Perkins
*/
+@SuppressWarnings({"unused", "WeakerAccess"})
public class TcpOutputStream extends OutputStream implements FlushableCloseable {
private static final long retryTimeout = 5L;
private static final long maxRetryTimeout = 40L;
@@ -340,15 +341,21 @@ private class RetryConnector implements Runnable {
@Override
public void run() {
- if (socketFactory != null) {
+ boolean connected = false;
+ while (socketFactory != null && !connected) {
Socket socket = null;
- boolean connected = true;
try {
socket = socketFactory.createSocket(address, port);
synchronized (outputLock) {
- TcpOutputStream.this.socket = socket;
- TcpOutputStream.this.connected = connected;
- reconnectThread = null;
+ // Unlikely but if we've been interrupted due to a close, we should shutdown
+ if (Thread.currentThread().isInterrupted()) {
+ safeClose(socket);
+ break;
+ } else {
+ TcpOutputStream.this.socket = socket;
+ TcpOutputStream.this.connected = true;
+ connected = true;
+ }
}
} catch (IOException e) {
connected = false;
@@ -363,8 +370,11 @@ public void run() {
try {
TimeUnit.SECONDS.sleep(Math.min(timeout, maxRetryTimeout));
} catch (InterruptedException ignore) {
+ synchronized (outputLock) {
+ TcpOutputStream.this.connected = false;
+ }
+ break;
}
- run();
} finally {
// It's possible the thread was interrupted, if we're not connected we should clean up the socket
if (!connected) {
diff --git a/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java b/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java
index 69bd315e..66989068 100644
--- a/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java
+++ b/src/test/java/org/jboss/logmanager/handlers/SimpleServer.java
@@ -55,10 +55,14 @@ static SimpleServer createUdpServer(final int port) throws IOException {
return server;
}
- String poll() throws InterruptedException {
+ String timeoutPoll() throws InterruptedException {
return data.poll(10, TimeUnit.SECONDS);
}
+ String poll() throws InterruptedException {
+ return data.poll();
+ }
+
String peek() {
return data.peek();
}
diff --git a/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java b/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java
index 606d53b6..9a2eea01 100644
--- a/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java
+++ b/src/test/java/org/jboss/logmanager/handlers/SocketHandlerTests.java
@@ -3,6 +3,8 @@
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.jboss.logmanager.ExtLogRecord;
import org.jboss.logmanager.formatters.PatternFormatter;
@@ -33,7 +35,7 @@ public void testTcpConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
- final String msg = server.poll();
+ final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}
@@ -47,7 +49,7 @@ public void testTlsConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test TLS handler");
handler.doPublish(record);
- final String msg = server.poll();
+ final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TLS handler", msg);
}
@@ -61,7 +63,7 @@ public void testUdpConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test UDP handler");
handler.doPublish(record);
- final String msg = server.poll();
+ final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test UDP handler", msg);
}
@@ -74,9 +76,9 @@ public void testTcpPortChange() throws Exception {
SimpleServer server2 = SimpleServer.createTcpServer(altPort);
SocketHandler handler = createHandler(Protocol.TCP)
) {
- ExtLogRecord record = createLogRecord("Test TCP handler " + port);
+ ExtLogRecord record = createLogRecord("Test TCP handler " + port);
handler.doPublish(record);
- String msg = server1.poll();
+ String msg = server1.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler " + port, msg);
@@ -84,7 +86,7 @@ public void testTcpPortChange() throws Exception {
handler.setPort(altPort);
record = createLogRecord("Test TCP handler " + altPort);
handler.doPublish(record);
- msg = server2.poll();
+ msg = server2.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler " + altPort, msg);
@@ -100,7 +102,7 @@ public void testProtocolChange() throws Exception {
try (SimpleServer server = SimpleServer.createTcpServer(port)) {
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
- final String msg = server.poll();
+ final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}
@@ -111,13 +113,51 @@ public void testProtocolChange() throws Exception {
try (SimpleServer server = SimpleServer.createTlsServer(port)) {
final ExtLogRecord record = createLogRecord("Test TLS handler");
handler.doPublish(record);
- final String msg = server.poll();
+ final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TLS handler", msg);
}
}
}
+ @Test
+ public void testTcpReconnect() throws Exception {
+ try (SocketHandler handler = createHandler(Protocol.TCP)) {
+
+ // Publish a record to a running server
+ try (
+ SimpleServer server = SimpleServer.createTcpServer(port)
+ ) {
+ final ExtLogRecord record = createLogRecord("Test TCP handler");
+ handler.doPublish(record);
+ final String msg = server.timeoutPoll();
+ Assert.assertNotNull(msg);
+ Assert.assertEquals("Test TCP handler", msg);
+ }
+
+ // Publish a record to a down server, this likely won't put the handler in an error state yet. However once
+ // we restart the server and loop the first socket should fail before a reconnect is attempted.
+ final ExtLogRecord record = createLogRecord("Test TCP handler");
+ handler.doPublish(record);
+ try (
+ SimpleServer server = SimpleServer.createTcpServer(port)
+ ) {
+ // Keep writing a record until a successful record is published or a timeout occurs
+ final String msg = timeout(() -> {
+ final ExtLogRecord r = createLogRecord("Test TCP handler");
+ handler.doPublish(r);
+ try {
+ return server.poll();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }, 10);
+ Assert.assertNotNull(msg);
+ Assert.assertEquals("Test TCP handler", msg);
+ }
+ }
+ }
+
private SocketHandler createHandler(final Protocol protocol) throws UnsupportedEncodingException {
final SocketHandler handler = new SocketHandler(protocol, address, port);
handler.setAutoFlush(true);
@@ -126,4 +166,22 @@ private SocketHandler createHandler(final Protocol protocol) throws UnsupportedE
return handler;
}
+
+ private static R timeout(final Supplier supplier, final int timeout) throws InterruptedException {
+ R value = null;
+ long t = timeout * 1000;
+ final long sleep = 100L;
+ while (t > 0) {
+ final long before = System.currentTimeMillis();
+ value = supplier.get();
+ if (value != null) {
+ break;
+ }
+ t -= (System.currentTimeMillis() - before);
+ TimeUnit.MILLISECONDS.sleep(sleep);
+ t -= sleep;
+ }
+ Assert.assertFalse(String.format("Failed to get value in %d seconds.", timeout), (t <= 0));
+ return value;
+ }
}