Skip to content

Commit

Permalink
Merge pull request #132 from jamezp/LOGMGR-170
Browse files Browse the repository at this point in the history
[LOGMGR-170] Ensure the reconnect thread exits if it was interrupted.
  • Loading branch information
jamezp authored Aug 15, 2017
2 parents bbb9d22 + 5e64342 commit fbd552b
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
22 changes: 16 additions & 6 deletions src/main/java/org/jboss/logmanager/handlers/TcpOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
*
* @author <a href="mailto:jperkins@redhat.com">James R. Perkins</a>
*/
@SuppressWarnings({"unused", "WeakerAccess"})
public class TcpOutputStream extends OutputStream implements FlushableCloseable {
private static final long retryTimeout = 5L;
private static final long maxRetryTimeout = 40L;
Expand Down Expand Up @@ -340,15 +341,21 @@ private class RetryConnector implements Runnable {

@Override
public void run() {
if (socketFactory != null) {
boolean connected = false;
while (socketFactory != null && !connected) {
Socket socket = null;
boolean connected = true;
try {
socket = socketFactory.createSocket(address, port);
synchronized (outputLock) {
TcpOutputStream.this.socket = socket;
TcpOutputStream.this.connected = connected;
reconnectThread = null;
// Unlikely but if we've been interrupted due to a close, we should shutdown
if (Thread.currentThread().isInterrupted()) {
safeClose(socket);
break;
} else {
TcpOutputStream.this.socket = socket;
TcpOutputStream.this.connected = true;
connected = true;
}
}
} catch (IOException e) {
connected = false;
Expand All @@ -363,8 +370,11 @@ public void run() {
try {
TimeUnit.SECONDS.sleep(Math.min(timeout, maxRetryTimeout));
} catch (InterruptedException ignore) {
synchronized (outputLock) {
TcpOutputStream.this.connected = false;
}
break;
}
run();
} finally {
// It's possible the thread was interrupted, if we're not connected we should clean up the socket
if (!connected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ static SimpleServer createUdpServer(final int port) throws IOException {
return server;
}

String poll() throws InterruptedException {
String timeoutPoll() throws InterruptedException {
return data.poll(10, TimeUnit.SECONDS);
}

String poll() throws InterruptedException {
return data.poll();
}

String peek() {
return data.peek();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.jboss.logmanager.ExtLogRecord;
import org.jboss.logmanager.formatters.PatternFormatter;
Expand Down Expand Up @@ -33,7 +35,7 @@ public void testTcpConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
final String msg = server.poll();
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}
Expand All @@ -47,7 +49,7 @@ public void testTlsConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test TLS handler");
handler.doPublish(record);
final String msg = server.poll();
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TLS handler", msg);
}
Expand All @@ -61,7 +63,7 @@ public void testUdpConnection() throws Exception {
) {
final ExtLogRecord record = createLogRecord("Test UDP handler");
handler.doPublish(record);
final String msg = server.poll();
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test UDP handler", msg);
}
Expand All @@ -74,17 +76,17 @@ public void testTcpPortChange() throws Exception {
SimpleServer server2 = SimpleServer.createTcpServer(altPort);
SocketHandler handler = createHandler(Protocol.TCP)
) {
ExtLogRecord record = createLogRecord("Test TCP handler " + port);
ExtLogRecord record = createLogRecord("Test TCP handler " + port);
handler.doPublish(record);
String msg = server1.poll();
String msg = server1.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler " + port, msg);

// Change the port on the handler which should close the first connection and open a new one
handler.setPort(altPort);
record = createLogRecord("Test TCP handler " + altPort);
handler.doPublish(record);
msg = server2.poll();
msg = server2.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler " + altPort, msg);

Expand All @@ -100,7 +102,7 @@ public void testProtocolChange() throws Exception {
try (SimpleServer server = SimpleServer.createTcpServer(port)) {
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
final String msg = server.poll();
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}
Expand All @@ -111,13 +113,51 @@ public void testProtocolChange() throws Exception {
try (SimpleServer server = SimpleServer.createTlsServer(port)) {
final ExtLogRecord record = createLogRecord("Test TLS handler");
handler.doPublish(record);
final String msg = server.poll();
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TLS handler", msg);
}
}
}

@Test
public void testTcpReconnect() throws Exception {
try (SocketHandler handler = createHandler(Protocol.TCP)) {

// Publish a record to a running server
try (
SimpleServer server = SimpleServer.createTcpServer(port)
) {
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
final String msg = server.timeoutPoll();
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}

// Publish a record to a down server, this likely won't put the handler in an error state yet. However once
// we restart the server and loop the first socket should fail before a reconnect is attempted.
final ExtLogRecord record = createLogRecord("Test TCP handler");
handler.doPublish(record);
try (
SimpleServer server = SimpleServer.createTcpServer(port)
) {
// Keep writing a record until a successful record is published or a timeout occurs
final String msg = timeout(() -> {
final ExtLogRecord r = createLogRecord("Test TCP handler");
handler.doPublish(r);
try {
return server.poll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, 10);
Assert.assertNotNull(msg);
Assert.assertEquals("Test TCP handler", msg);
}
}
}

private SocketHandler createHandler(final Protocol protocol) throws UnsupportedEncodingException {
final SocketHandler handler = new SocketHandler(protocol, address, port);
handler.setAutoFlush(true);
Expand All @@ -126,4 +166,22 @@ private SocketHandler createHandler(final Protocol protocol) throws UnsupportedE

return handler;
}

private static <R> R timeout(final Supplier<R> supplier, final int timeout) throws InterruptedException {
R value = null;
long t = timeout * 1000;
final long sleep = 100L;
while (t > 0) {
final long before = System.currentTimeMillis();
value = supplier.get();
if (value != null) {
break;
}
t -= (System.currentTimeMillis() - before);
TimeUnit.MILLISECONDS.sleep(sleep);
t -= sleep;
}
Assert.assertFalse(String.format("Failed to get value in %d seconds.", timeout), (t <= 0));
return value;
}
}

0 comments on commit fbd552b

Please sign in to comment.