Skip to content

Commit

Permalink
Merge pull request #4787 from kwvanderlinde/bugfix/4542-immediate-ser…
Browse files Browse the repository at this point in the history
…ver-start

Further improvements to server starts
  • Loading branch information
cwisniew authored May 22, 2024
2 parents b7117da + e347d07 commit 3951887
Show file tree
Hide file tree
Showing 35 changed files with 1,256 additions and 1,050 deletions.
14 changes: 14 additions & 0 deletions clientserver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ apply from: rootProject.file('buildSrc/shared.gradle')

// In this section you declare the dependencies for your production and test code
dependencies {
implementation group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'

implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.22.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.22.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.20.0' // Bridges v1 to v2 for other code in other libs
Expand All @@ -30,4 +32,16 @@ dependencies {
// compression of messages between client and server
implementation 'org.apache.commons:commons-compress:1.26.0'
implementation 'com.github.luben:zstd-jni:1.5.5-11'

testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2'
// For mocking features during unit tests
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.10.0'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.rptools.clientserver.ActivityListener;
import net.rptools.clientserver.simple.DisconnectHandler;
import net.rptools.clientserver.simple.MessageHandler;
Expand All @@ -34,26 +33,26 @@
public abstract class AbstractConnection implements Connection {
private static final Logger log = LogManager.getLogger(AbstractConnection.class);

private final Map<Object, List<byte[]>> outQueueMap = new HashMap<>();
private final List<List<byte[]>> outQueueList = new LinkedList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<byte[]> outQueue = new LinkedBlockingQueue<>();

private final List<DisconnectHandler> disconnectHandlers = new CopyOnWriteArrayList<>();
private final List<ActivityListener> listeners = new CopyOnWriteArrayList<>();
private final List<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();

private List<byte[]> getOutQueue(Object channel) {
// Ordinarily I would synchronize this method, but I imagine the channels will be initialized
// once
// at the beginning of execution. Thus get(channel) will only return once right at the
// beginning
// no sense incurring the cost of synchronizing the method on the class for that.
List<byte[]> queue = outQueueMap.get(channel);
if (queue == null) {
queue = Collections.synchronizedList(new ArrayList<byte[]>());
outQueueMap.put(channel, queue);
@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
onClose();
}
return queue;
}

protected final boolean isClosed() {
return closed.get();
}

protected abstract void onClose();

private byte[] compress(byte[] message) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(message.length);
Expand All @@ -79,30 +78,17 @@ private byte[] inflate(byte[] compressedMessage) {
}
}

protected synchronized void addMessage(Object channel, byte[] message) {
List<byte[]> queue = getOutQueue(channel);
queue.add(compress(message));
// Queue up for sending
outQueueList.add(queue);
}

protected synchronized boolean hasMoreMessages() {
return !outQueueList.isEmpty();
protected void addMessage(Object channel, byte[] message) {
outQueue.add(compress(message));
}

protected synchronized byte[] nextMessage() {
if (!hasMoreMessages()) {
protected byte[] nextMessage() {
try {
// Bit paranoid, but don't wait forever for a message - that can perpetually block the thread.
return outQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
List<byte[]> queue = outQueueList.remove(0);

if (queue.isEmpty()) return null;

byte[] message = queue.remove(0);
if (!queue.isEmpty()) {
outQueueList.add(queue);
}
return message;
}

public final void addMessageHandler(MessageHandler handler) {
Expand All @@ -113,19 +99,20 @@ public final void removeMessageHandler(MessageHandler handler) {
messageHandlers.remove(handler);
}

private void dispatchMessage(String id, byte[] message) {
if (messageHandlers.size() == 0) {
log.warn("message received but not messageHandlers registered.");
protected void dispatchMessage(byte[] message) {
var id = getId();
if (messageHandlers.isEmpty()) {
log.warn("message received but not messageHandlers registered for {}.", id);
}

for (MessageHandler handler : messageHandlers) {
handler.handleMessage(id, message);
}
}

protected final void dispatchCompressedMessage(String id, byte[] compressedMessage) {
protected final void dispatchCompressedMessage(byte[] compressedMessage) {
var message = inflate(compressedMessage);
dispatchMessage(id, message);
dispatchMessage(message);
}

protected final void writeMessage(OutputStream out, byte[] message) throws IOException {
Expand Down Expand Up @@ -239,7 +226,7 @@ public final void removeActivityListener(ActivityListener listener) {
listeners.remove(listener);
}

private void notifyListeners(
protected void notifyListeners(
ActivityListener.Direction direction,
ActivityListener.State state,
int totalTransferSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* This software Copyright by the RPTools.net development team, and
* licensed under the Affero GPL Version 3 or, at your option, any later
* version.
*
* MapTool Source Code is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public
* License * along with this source Code. If not, please visit
* <http://www.gnu.org/licenses/> and specifically the Affero license
* text at <http://www.gnu.org/licenses/agpl.html>.
*/
package net.rptools.clientserver.simple.connection;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DirectConnection extends AbstractConnection {
private static final Logger log = LogManager.getLogger(DirectConnection.class);

public record Pair(DirectConnection clientSide, DirectConnection serverSide) {}

public static Pair create(String id) {
var closed = new AtomicBoolean(false);
var clientToServerQueue = new ArrayBlockingQueue<byte[]>(128);
var serverToClientQueue = new ArrayBlockingQueue<byte[]>(128);

var clientSide =
new DirectConnection(closed, id + "-client", clientToServerQueue, serverToClientQueue);
var serverSide =
new DirectConnection(closed, id + "-server", serverToClientQueue, clientToServerQueue);

return new Pair(clientSide, serverSide);
}

private final AtomicBoolean sharedClosedFlag;
private final String id;
private final BlockingQueue<byte[]> writeQueue;
private final ReceiveThread receiveThread;

private DirectConnection(
AtomicBoolean sharedClosedFlag,
String id,
BlockingQueue<byte[]> writeQueue,
BlockingQueue<byte[]> readQueue) {
this.sharedClosedFlag = sharedClosedFlag;
this.id = id;
this.writeQueue = writeQueue;
this.receiveThread = new ReceiveThread(readQueue);
}

@Override
public void open() {
receiveThread.start();
}

@Override
protected void onClose() {
// Tell the other end about our closure.
sharedClosedFlag.set(true);
receiveThread.interrupt();
}

@Override
public void sendMessage(Object channel, byte[] message) {
if (message.length == 0) {
return;
}

boolean written = false;
while (!written) {
try {
// Set a timeout so we have a chance to escape in case we weren't notified properly.
written = writeQueue.offer(message, 10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Just try again.
}
}
}

@Override
public boolean isAlive() {
return !sharedClosedFlag.get();
}

@Override
public String getId() {
return id;
}

@Override
public String getError() {
return null;
}

private final class ReceiveThread extends Thread {
private final BlockingQueue<byte[]> readQueue;

public ReceiveThread(BlockingQueue<byte[]> readQueue) {
super("DirectConnection.ReceiveThread");
this.readQueue = readQueue;
}

@Override
public void run() {
try {
while (!DirectConnection.this.isClosed() && DirectConnection.this.isAlive()) {
try {
// Set a timeout so we have a chance to escape in case we weren't notified properly.
byte[] message;
try {
message = readQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Just try again.
continue;
}

if (message != null) {
DirectConnection.this.dispatchMessage(message);
}
} catch (Throwable t) {
// don't let anything kill this thread via exception
log.error("Unexpected error in receive thread", t);
}
}
} finally {
DirectConnection.this.close();
fireDisconnect();
}
}
}
}
Loading

0 comments on commit 3951887

Please sign in to comment.