Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Further improvements to server starts #4787

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0a2911b
Fix logger name in CipherUtil
kwvanderlinde May 16, 2024
adab639
Check for a running server before assigning one.
kwvanderlinde May 21, 2024
24a2344
Set topology types on initial basic campaign
kwvanderlinde May 16, 2024
4e77223
Threading improvements for connections
kwvanderlinde May 18, 2024
62fa69f
Clean up disconnect logic for sockets and WebRTC
kwvanderlinde May 20, 2024
96ab80e
Factor message routing out of Server into Router
kwvanderlinde May 19, 2024
2547238
Now pull message handlers and handshakes out of AbstractServer
kwvanderlinde May 17, 2024
9ab3acc
Do not require handshakes for local connections
kwvanderlinde May 18, 2024
e1268d0
Bring back PersonalServerPlayerDatabase
kwvanderlinde May 17, 2024
081027c
Consolidate different client cases in MapTool
kwvanderlinde May 17, 2024
b0a71b5
Merge MapToolServerConnection into MapToolServer
kwvanderlinde May 18, 2024
33cc04f
Be more careful about calling setCampaign()
kwvanderlinde May 17, 2024
8bda988
Avoid transmitting the campaign to the local client on server start.
kwvanderlinde May 20, 2024
b502b9d
Consolidate MapToolClient constructors
kwvanderlinde May 20, 2024
9885e95
Disconnect handler should present messages on the Swing thread
kwvanderlinde May 20, 2024
e777afe
In MapToolClient, no need for check for isLocalServer before stopping…
kwvanderlinde May 20, 2024
c572bc4
Move some more server responsibilities into MapToolServer
kwvanderlinde May 20, 2024
7b837fd
MapToolServer now has states: New, Started, Stopped
kwvanderlinde May 20, 2024
4f5b47f
Do not null out the server field on disconnect, just stop it.
kwvanderlinde May 20, 2024
ef9c916
Rely on MapToolClient states to remove expectDisconnected()
kwvanderlinde May 21, 2024
286c3e7
MapTool.startPersonalServer() is now a thin wrapper around MapTool.st…
kwvanderlinde May 21, 2024
e347d07
Be a bit more particular about connection lifecycle
kwvanderlinde May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions clientserver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ apply from: rootProject.file('buildSrc/shared.gradle')

// In this section you declare the dependencies for your production and test code
dependencies {
implementation group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.2'

implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.22.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.22.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.20.0' // Bridges v1 to v2 for other code in other libs
Expand All @@ -30,4 +32,16 @@ dependencies {
// compression of messages between client and server
implementation 'org.apache.commons:commons-compress:1.26.0'
implementation 'com.github.luben:zstd-jni:1.5.5-11'

testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.10.2'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.2'
// For mocking features during unit tests
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.10.0'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

import java.io.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.rptools.clientserver.ActivityListener;
import net.rptools.clientserver.simple.DisconnectHandler;
import net.rptools.clientserver.simple.MessageHandler;
Expand All @@ -34,26 +33,26 @@
public abstract class AbstractConnection implements Connection {
private static final Logger log = LogManager.getLogger(AbstractConnection.class);

private final Map<Object, List<byte[]>> outQueueMap = new HashMap<>();
private final List<List<byte[]>> outQueueList = new LinkedList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final BlockingQueue<byte[]> outQueue = new LinkedBlockingQueue<>();

private final List<DisconnectHandler> disconnectHandlers = new CopyOnWriteArrayList<>();
private final List<ActivityListener> listeners = new CopyOnWriteArrayList<>();
private final List<MessageHandler> messageHandlers = new CopyOnWriteArrayList<>();

private List<byte[]> getOutQueue(Object channel) {
// Ordinarily I would synchronize this method, but I imagine the channels will be initialized
// once
// at the beginning of execution. Thus get(channel) will only return once right at the
// beginning
// no sense incurring the cost of synchronizing the method on the class for that.
List<byte[]> queue = outQueueMap.get(channel);
if (queue == null) {
queue = Collections.synchronizedList(new ArrayList<byte[]>());
outQueueMap.put(channel, queue);
@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
onClose();
}
return queue;
}

protected final boolean isClosed() {
return closed.get();
}

protected abstract void onClose();

private byte[] compress(byte[] message) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(message.length);
Expand All @@ -79,30 +78,17 @@ private byte[] inflate(byte[] compressedMessage) {
}
}

protected synchronized void addMessage(Object channel, byte[] message) {
List<byte[]> queue = getOutQueue(channel);
queue.add(compress(message));
// Queue up for sending
outQueueList.add(queue);
}

protected synchronized boolean hasMoreMessages() {
return !outQueueList.isEmpty();
protected void addMessage(Object channel, byte[] message) {
outQueue.add(compress(message));
}

protected synchronized byte[] nextMessage() {
if (!hasMoreMessages()) {
protected byte[] nextMessage() {
try {
// Bit paranoid, but don't wait forever for a message - that can perpetually block the thread.
return outQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
List<byte[]> queue = outQueueList.remove(0);

if (queue.isEmpty()) return null;

byte[] message = queue.remove(0);
if (!queue.isEmpty()) {
outQueueList.add(queue);
}
return message;
}

public final void addMessageHandler(MessageHandler handler) {
Expand All @@ -113,19 +99,20 @@ public final void removeMessageHandler(MessageHandler handler) {
messageHandlers.remove(handler);
}

private void dispatchMessage(String id, byte[] message) {
if (messageHandlers.size() == 0) {
log.warn("message received but not messageHandlers registered.");
protected void dispatchMessage(byte[] message) {
var id = getId();
if (messageHandlers.isEmpty()) {
log.warn("message received but not messageHandlers registered for {}.", id);
}

for (MessageHandler handler : messageHandlers) {
handler.handleMessage(id, message);
}
}

protected final void dispatchCompressedMessage(String id, byte[] compressedMessage) {
protected final void dispatchCompressedMessage(byte[] compressedMessage) {
var message = inflate(compressedMessage);
dispatchMessage(id, message);
dispatchMessage(message);
}

protected final void writeMessage(OutputStream out, byte[] message) throws IOException {
Expand Down Expand Up @@ -239,7 +226,7 @@ public final void removeActivityListener(ActivityListener listener) {
listeners.remove(listener);
}

private void notifyListeners(
protected void notifyListeners(
ActivityListener.Direction direction,
ActivityListener.State state,
int totalTransferSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* This software Copyright by the RPTools.net development team, and
* licensed under the Affero GPL Version 3 or, at your option, any later
* version.
*
* MapTool Source Code is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public
* License * along with this source Code. If not, please visit
* <http://www.gnu.org/licenses/> and specifically the Affero license
* text at <http://www.gnu.org/licenses/agpl.html>.
*/
package net.rptools.clientserver.simple.connection;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class DirectConnection extends AbstractConnection {
private static final Logger log = LogManager.getLogger(DirectConnection.class);

public record Pair(DirectConnection clientSide, DirectConnection serverSide) {}

public static Pair create(String id) {
var closed = new AtomicBoolean(false);
var clientToServerQueue = new ArrayBlockingQueue<byte[]>(128);
var serverToClientQueue = new ArrayBlockingQueue<byte[]>(128);

var clientSide =
new DirectConnection(closed, id + "-client", clientToServerQueue, serverToClientQueue);
var serverSide =
new DirectConnection(closed, id + "-server", serverToClientQueue, clientToServerQueue);

return new Pair(clientSide, serverSide);
}

private final AtomicBoolean sharedClosedFlag;
private final String id;
private final BlockingQueue<byte[]> writeQueue;
private final ReceiveThread receiveThread;

private DirectConnection(
AtomicBoolean sharedClosedFlag,
String id,
BlockingQueue<byte[]> writeQueue,
BlockingQueue<byte[]> readQueue) {
this.sharedClosedFlag = sharedClosedFlag;
this.id = id;
this.writeQueue = writeQueue;
this.receiveThread = new ReceiveThread(readQueue);
}

@Override
public void open() {
receiveThread.start();
}

@Override
protected void onClose() {
// Tell the other end about our closure.
sharedClosedFlag.set(true);
receiveThread.interrupt();
}

@Override
public void sendMessage(Object channel, byte[] message) {
if (message.length == 0) {
return;
}

boolean written = false;
while (!written) {
try {
// Set a timeout so we have a chance to escape in case we weren't notified properly.
written = writeQueue.offer(message, 10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Just try again.
}
}
}

@Override
public boolean isAlive() {
return !sharedClosedFlag.get();
}

@Override
public String getId() {
return id;
}

@Override
public String getError() {
return null;
}

private final class ReceiveThread extends Thread {
private final BlockingQueue<byte[]> readQueue;

public ReceiveThread(BlockingQueue<byte[]> readQueue) {
super("DirectConnection.ReceiveThread");
this.readQueue = readQueue;
}

@Override
public void run() {
try {
while (!DirectConnection.this.isClosed() && DirectConnection.this.isAlive()) {
try {
// Set a timeout so we have a chance to escape in case we weren't notified properly.
byte[] message;
try {
message = readQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Just try again.
continue;
}

if (message != null) {
DirectConnection.this.dispatchMessage(message);
}
} catch (Throwable t) {
// don't let anything kill this thread via exception
log.error("Unexpected error in receive thread", t);
}
}
} finally {
DirectConnection.this.close();
fireDisconnect();
}
}
}
}
Loading
Loading