From d4b6965348dea9ea002c063428992932769b101c Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 13 Dec 2022 16:25:39 -0500 Subject: [PATCH 1/5] Use networkNode.sendMessage instead of connection.sendMessage as the call on connection is blocking. This was likely a major bug for seed nodes that at sending hash responses the main thread got blocked. Signed-off-by: HenrikJannsen --- .../bisq/core/dao/monitoring/network/StateNetworkService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 92e110c9c19..2df00fc94ff 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -145,7 +145,7 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List Date: Tue, 13 Dec 2022 16:35:16 -0500 Subject: [PATCH 2/5] Add maxConnection parameter to be used for setting the core pool size of the executor in NetworkNode. We use double the maxConnection size for the core size and 4x for the max pool size. Signed-off-by: HenrikJannsen --- .../java/bisq/network/p2p/NetworkNodeProvider.java | 5 +++-- .../bisq/network/p2p/network/LocalhostNetworkNode.java | 5 +++-- .../java/bisq/network/p2p/network/NetworkNode.java | 10 ++++++++-- .../java/bisq/network/p2p/network/TorNetworkNode.java | 5 +++-- .../network/p2p/network/LocalhostNetworkNodeTest.java | 4 ++-- .../bisq/network/p2p/network/TorNetworkNodeTest.java | 8 ++++---- 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/NetworkNodeProvider.java b/p2p/src/main/java/bisq/network/p2p/NetworkNodeProvider.java index 03652b970f5..81c41078095 100644 --- a/p2p/src/main/java/bisq/network/p2p/NetworkNodeProvider.java +++ b/p2p/src/main/java/bisq/network/p2p/NetworkNodeProvider.java @@ -45,6 +45,7 @@ public class NetworkNodeProvider implements Provider { public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, BridgeAddressProvider bridgeAddressProvider, @Nullable NetworkFilter networkFilter, + @Named(Config.MAX_CONNECTIONS) int maxConnections, @Named(Config.USE_LOCALHOST_FOR_P2P) boolean useLocalhostForP2P, @Named(Config.NODE_PORT) int port, @Named(Config.TOR_DIR) File torDir, @@ -56,7 +57,7 @@ public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, @Named(Config.TOR_STREAM_ISOLATION) boolean streamIsolation, @Named(Config.TOR_CONTROL_USE_SAFE_COOKIE_AUTH) boolean useSafeCookieAuthentication) { if (useLocalhostForP2P) { - networkNode = new LocalhostNetworkNode(port, networkProtoResolver, networkFilter); + networkNode = new LocalhostNetworkNode(port, networkProtoResolver, networkFilter, maxConnections); } else { TorMode torMode = getTorMode(bridgeAddressProvider, torDir, @@ -66,7 +67,7 @@ public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, password, cookieFile, useSafeCookieAuthentication); - networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, networkFilter); + networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, networkFilter, maxConnections); } } diff --git a/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java index 97fce6e14f8..e0251eccfc3 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java @@ -56,8 +56,9 @@ public static void setSimulateTorDelayHiddenService(int simulateTorDelayHiddenSe public LocalhostNetworkNode(int port, NetworkProtoResolver networkProtoResolver, - @Nullable NetworkFilter networkFilter) { - super(port, networkProtoResolver, networkFilter); + @Nullable NetworkFilter networkFilter, + int maxConnections) { + super(port, networkProtoResolver, networkFilter, maxConnections); } @Override diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index d24bb03a164..7687104bdac 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -72,6 +72,7 @@ public abstract class NetworkNode implements MessageListener { private final NetworkProtoResolver networkProtoResolver; @Nullable private final NetworkFilter networkFilter; + private final int maxConnections; private final CopyOnWriteArraySet inBoundConnections = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); @@ -92,10 +93,12 @@ public abstract class NetworkNode implements MessageListener { NetworkNode(int servicePort, NetworkProtoResolver networkProtoResolver, - @Nullable NetworkFilter networkFilter) { + @Nullable NetworkFilter networkFilter, + int maxConnections) { this.servicePort = servicePort; this.networkProtoResolver = networkProtoResolver; this.networkFilter = networkFilter; + this.maxConnections = maxConnections; } /////////////////////////////////////////////////////////////////////////////////////////// @@ -437,7 +440,10 @@ public void removeMessageListener(MessageListener messageListener) { void createExecutorService() { if (executorService == null) - executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 15, 30, 60); + executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, + maxConnections * 2, + maxConnections * 4, + 60); } void startServer(ServerSocket serverSocket) { diff --git a/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java index d070304e774..14fe3a0f7b6 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java @@ -78,8 +78,9 @@ public TorNetworkNode(int servicePort, NetworkProtoResolver networkProtoResolver, boolean useStreamIsolation, TorMode torMode, - @Nullable NetworkFilter networkFilter) { - super(servicePort, networkProtoResolver, networkFilter); + @Nullable NetworkFilter networkFilter, + int maxConnections) { + super(servicePort, networkProtoResolver, networkFilter, maxConnections); this.torMode = torMode; this.streamIsolation = useStreamIsolation; createExecutorService(); diff --git a/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java b/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java index 8315332c34f..e44c6da3117 100644 --- a/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java +++ b/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java @@ -41,7 +41,7 @@ public class LocalhostNetworkNodeTest { @Test public void testMessage() throws InterruptedException, IOException { CountDownLatch msgLatch = new CountDownLatch(2); - LocalhostNetworkNode node1 = new LocalhostNetworkNode(9001, TestUtils.getNetworkProtoResolver(), null); + LocalhostNetworkNode node1 = new LocalhostNetworkNode(9001, TestUtils.getNetworkProtoResolver(), null, 12); node1.addMessageListener((message, connection) -> { log.debug("onMessage node1 " + message); msgLatch.countDown(); @@ -69,7 +69,7 @@ public void onRequestCustomBridges() { } }); - LocalhostNetworkNode node2 = new LocalhostNetworkNode(9002, TestUtils.getNetworkProtoResolver(), null); + LocalhostNetworkNode node2 = new LocalhostNetworkNode(9002, TestUtils.getNetworkProtoResolver(), null, 12); node2.addMessageListener((message, connection) -> { log.debug("onMessage node2 " + message); msgLatch.countDown(); diff --git a/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java b/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java index 33bc3d79126..ef20da5d42d 100644 --- a/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java +++ b/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java @@ -56,7 +56,7 @@ public void testTorNodeBeforeSecondReady() throws InterruptedException, IOExcept latch = new CountDownLatch(1); int port = 9001; TorNetworkNode node1 = new TorNetworkNode(port, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node1.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -83,7 +83,7 @@ public void onRequestCustomBridges() { latch = new CountDownLatch(1); int port2 = 9002; TorNetworkNode node2 = new TorNetworkNode(port2, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node2.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -141,7 +141,7 @@ public void testTorNodeAfterBothReady() throws InterruptedException, IOException latch = new CountDownLatch(2); int port = 9001; TorNetworkNode node1 = new TorNetworkNode(port, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node1.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -167,7 +167,7 @@ public void onRequestCustomBridges() { int port2 = 9002; TorNetworkNode node2 = new TorNetworkNode(port2, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node2.start(new SetupListener() { @Override public void onTorNodeReady() { From 84584d38ac49654c63650e15725b7f0641ff44b0 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Mon, 12 Dec 2022 20:23:32 -0500 Subject: [PATCH 3/5] The connection:keep-alive header is restricted in java and would require a jvm option to allow it. Signed-off-by: HenrikJannsen --- .../java/bisq/seednode/reporting/SeedNodeReportingService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java index 59a8135bb88..415ad6a3db8 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java @@ -249,7 +249,6 @@ private void sendReportingItems(ReportingItems reportingItems) { .uri(URI.create(seedNodeReportingServerUrl)) .POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes)) .header("User-Agent", getMyAddress()) - .header("Connection", "keep-alive") .build(); httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).whenComplete((response, throwable) -> { if (throwable != null) { From 071a3524435e33217f18c553162e4422519f272e Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 13 Dec 2022 16:42:02 -0500 Subject: [PATCH 4/5] Remove rejectedExecutionHandler at newCachedThreadPool. It is more convenient to handle the RejectedExecutionException in the calling code to get more context for error logging. Signed-off-by: HenrikJannsen --- .../src/main/java/bisq/common/util/Utilities.java | 7 ++----- .../reporting/SeedNodeReportingService.java | 13 +++++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/common/src/main/java/bisq/common/util/Utilities.java b/common/src/main/java/bisq/common/util/Utilities.java index 72a9ffe94f5..18e03137f75 100644 --- a/common/src/main/java/bisq/common/util/Utilities.java +++ b/common/src/main/java/bisq/common/util/Utilities.java @@ -63,7 +63,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -139,14 +138,12 @@ private static ThreadPoolExecutor getThreadPoolExecutor(String name, public static ExecutorService newCachedThreadPool(int maximumPoolSize, long keepAliveTime, - TimeUnit timeUnit, - RejectedExecutionHandler rejectedExecutionHandler) { + TimeUnit timeUnit) { return new ThreadPoolExecutor(0, maximumPoolSize, keepAliveTime, timeUnit, - new SynchronousQueue<>(), - rejectedExecutionHandler); + new SynchronousQueue<>()); } @SuppressWarnings("SameParameterValue") diff --git a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java index 415ad6a3db8..6e9a9e0335e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java @@ -54,6 +54,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -115,10 +116,9 @@ public SeedNodeReportingService(P2PService p2PService, this.maxConnections = maxConnections; this.seedNodeReportingServerUrl = seedNodeReportingServerUrl; - executor = Utilities.newCachedThreadPool(5, - 8, - TimeUnit.MINUTES, - (runnable, executor) -> log.error("Execution was rejected. We skip the {} task.", runnable.toString())); + // The pool size must be larger as the expected parallel sends because HttpClient use it + // internally for asynchronous and dependent tasks. + executor = Utilities.newCachedThreadPool(20, 8, TimeUnit.MINUTES); httpClient = HttpClient.newBuilder().executor(executor).build(); heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC); @@ -257,9 +257,10 @@ private void sendReportingItems(ReportingItems reportingItems) { log.error("Response error message: {}", response); } }); + } catch (RejectedExecutionException t) { + log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", reportingItems, t.toString()); } catch (Throwable t) { - // RejectedExecutionException is thrown if we exceed our pool size. - log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); } } From f90c4b8f6b10bbbbed37cffadb141b1f35d4eea8 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Tue, 13 Dec 2022 17:33:16 -0500 Subject: [PATCH 5/5] Use cachedThreadPool which uses SynchronousQueue instead of ArrayBlockingQueue. When ArrayBlockingQueue is used (as in case of using Utilities.getListeningExecutorService) the maxPoolSize has no effect. The pool creates never more threads than the core pool size. Thus we have been limited to 15 threads for message sending and connection creation. This was likely a reason why seed nodes are not accepting new connections if the pool is exhausted. Slow message send can block a thread for 1-3 minutes. Signed-off-by: HenrikJannsen --- .../main/java/bisq/network/p2p/network/NetworkNode.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index 7687104bdac..aa898efbb59 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -439,11 +439,9 @@ public void removeMessageListener(MessageListener messageListener) { /////////////////////////////////////////////////////////////////////////////////////////// void createExecutorService() { - if (executorService == null) - executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, - maxConnections * 2, - maxConnections * 4, - 60); + if (executorService == null) { + executorService = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool(maxConnections * 4, 3, TimeUnit.MINUTES)); + } } void startServer(ServerSocket serverSocket) {