diff --git a/common/src/main/java/bisq/common/util/Utilities.java b/common/src/main/java/bisq/common/util/Utilities.java index 72a9ffe94f5..18e03137f75 100644 --- a/common/src/main/java/bisq/common/util/Utilities.java +++ b/common/src/main/java/bisq/common/util/Utilities.java @@ -63,7 +63,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -139,14 +138,12 @@ private static ThreadPoolExecutor getThreadPoolExecutor(String name, public static ExecutorService newCachedThreadPool(int maximumPoolSize, long keepAliveTime, - TimeUnit timeUnit, - RejectedExecutionHandler rejectedExecutionHandler) { + TimeUnit timeUnit) { return new ThreadPoolExecutor(0, maximumPoolSize, keepAliveTime, timeUnit, - new SynchronousQueue<>(), - rejectedExecutionHandler); + new SynchronousQueue<>()); } @SuppressWarnings("SameParameterValue") diff --git a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java index 92e110c9c19..2df00fc94ff 100644 --- a/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java +++ b/core/src/main/java/bisq/core/dao/monitoring/network/StateNetworkService.java @@ -145,7 +145,7 @@ public void sendGetStateHashesResponse(Connection connection, int nonce, List { public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, BridgeAddressProvider bridgeAddressProvider, @Nullable NetworkFilter networkFilter, + @Named(Config.MAX_CONNECTIONS) int maxConnections, @Named(Config.USE_LOCALHOST_FOR_P2P) boolean useLocalhostForP2P, @Named(Config.NODE_PORT) int port, @Named(Config.TOR_DIR) File torDir, @@ -56,7 +57,7 @@ public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, @Named(Config.TOR_STREAM_ISOLATION) boolean streamIsolation, @Named(Config.TOR_CONTROL_USE_SAFE_COOKIE_AUTH) boolean useSafeCookieAuthentication) { if (useLocalhostForP2P) { - networkNode = new LocalhostNetworkNode(port, networkProtoResolver, networkFilter); + networkNode = new LocalhostNetworkNode(port, networkProtoResolver, networkFilter, maxConnections); } else { TorMode torMode = getTorMode(bridgeAddressProvider, torDir, @@ -66,7 +67,7 @@ public NetworkNodeProvider(NetworkProtoResolver networkProtoResolver, password, cookieFile, useSafeCookieAuthentication); - networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, networkFilter); + networkNode = new TorNetworkNode(port, networkProtoResolver, streamIsolation, torMode, networkFilter, maxConnections); } } diff --git a/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java index 97fce6e14f8..e0251eccfc3 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/LocalhostNetworkNode.java @@ -56,8 +56,9 @@ public static void setSimulateTorDelayHiddenService(int simulateTorDelayHiddenSe public LocalhostNetworkNode(int port, NetworkProtoResolver networkProtoResolver, - @Nullable NetworkFilter networkFilter) { - super(port, networkProtoResolver, networkFilter); + @Nullable NetworkFilter networkFilter, + int maxConnections) { + super(port, networkProtoResolver, networkFilter, maxConnections); } @Override diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index d24bb03a164..aa898efbb59 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -72,6 +72,7 @@ public abstract class NetworkNode implements MessageListener { private final NetworkProtoResolver networkProtoResolver; @Nullable private final NetworkFilter networkFilter; + private final int maxConnections; private final CopyOnWriteArraySet inBoundConnections = new CopyOnWriteArraySet<>(); private final CopyOnWriteArraySet messageListeners = new CopyOnWriteArraySet<>(); @@ -92,10 +93,12 @@ public abstract class NetworkNode implements MessageListener { NetworkNode(int servicePort, NetworkProtoResolver networkProtoResolver, - @Nullable NetworkFilter networkFilter) { + @Nullable NetworkFilter networkFilter, + int maxConnections) { this.servicePort = servicePort; this.networkProtoResolver = networkProtoResolver; this.networkFilter = networkFilter; + this.maxConnections = maxConnections; } /////////////////////////////////////////////////////////////////////////////////////////// @@ -436,8 +439,9 @@ public void removeMessageListener(MessageListener messageListener) { /////////////////////////////////////////////////////////////////////////////////////////// void createExecutorService() { - if (executorService == null) - executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 15, 30, 60); + if (executorService == null) { + executorService = MoreExecutors.listeningDecorator(Utilities.newCachedThreadPool(maxConnections * 4, 3, TimeUnit.MINUTES)); + } } void startServer(ServerSocket serverSocket) { diff --git a/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java index d070304e774..14fe3a0f7b6 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/TorNetworkNode.java @@ -78,8 +78,9 @@ public TorNetworkNode(int servicePort, NetworkProtoResolver networkProtoResolver, boolean useStreamIsolation, TorMode torMode, - @Nullable NetworkFilter networkFilter) { - super(servicePort, networkProtoResolver, networkFilter); + @Nullable NetworkFilter networkFilter, + int maxConnections) { + super(servicePort, networkProtoResolver, networkFilter, maxConnections); this.torMode = torMode; this.streamIsolation = useStreamIsolation; createExecutorService(); diff --git a/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java b/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java index 8315332c34f..e44c6da3117 100644 --- a/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java +++ b/p2p/src/test/java/bisq/network/p2p/network/LocalhostNetworkNodeTest.java @@ -41,7 +41,7 @@ public class LocalhostNetworkNodeTest { @Test public void testMessage() throws InterruptedException, IOException { CountDownLatch msgLatch = new CountDownLatch(2); - LocalhostNetworkNode node1 = new LocalhostNetworkNode(9001, TestUtils.getNetworkProtoResolver(), null); + LocalhostNetworkNode node1 = new LocalhostNetworkNode(9001, TestUtils.getNetworkProtoResolver(), null, 12); node1.addMessageListener((message, connection) -> { log.debug("onMessage node1 " + message); msgLatch.countDown(); @@ -69,7 +69,7 @@ public void onRequestCustomBridges() { } }); - LocalhostNetworkNode node2 = new LocalhostNetworkNode(9002, TestUtils.getNetworkProtoResolver(), null); + LocalhostNetworkNode node2 = new LocalhostNetworkNode(9002, TestUtils.getNetworkProtoResolver(), null, 12); node2.addMessageListener((message, connection) -> { log.debug("onMessage node2 " + message); msgLatch.countDown(); diff --git a/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java b/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java index 33bc3d79126..ef20da5d42d 100644 --- a/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java +++ b/p2p/src/test/java/bisq/network/p2p/network/TorNetworkNodeTest.java @@ -56,7 +56,7 @@ public void testTorNodeBeforeSecondReady() throws InterruptedException, IOExcept latch = new CountDownLatch(1); int port = 9001; TorNetworkNode node1 = new TorNetworkNode(port, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node1.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -83,7 +83,7 @@ public void onRequestCustomBridges() { latch = new CountDownLatch(1); int port2 = 9002; TorNetworkNode node2 = new TorNetworkNode(port2, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node2.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -141,7 +141,7 @@ public void testTorNodeAfterBothReady() throws InterruptedException, IOException latch = new CountDownLatch(2); int port = 9001; TorNetworkNode node1 = new TorNetworkNode(port, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node1.start(new SetupListener() { @Override public void onTorNodeReady() { @@ -167,7 +167,7 @@ public void onRequestCustomBridges() { int port2 = 9002; TorNetworkNode node2 = new TorNetworkNode(port2, TestUtils.getNetworkProtoResolver(), false, - new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null); + new NewTor(new File("torNode_" + port), null, "", this::getBridgeAddresses), null, 12); node2.start(new SetupListener() { @Override public void onTorNodeReady() { diff --git a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java index 59a8135bb88..6e9a9e0335e 100644 --- a/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/reporting/SeedNodeReportingService.java @@ -54,6 +54,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -115,10 +116,9 @@ public SeedNodeReportingService(P2PService p2PService, this.maxConnections = maxConnections; this.seedNodeReportingServerUrl = seedNodeReportingServerUrl; - executor = Utilities.newCachedThreadPool(5, - 8, - TimeUnit.MINUTES, - (runnable, executor) -> log.error("Execution was rejected. We skip the {} task.", runnable.toString())); + // The pool size must be larger as the expected parallel sends because HttpClient use it + // internally for asynchronous and dependent tasks. + executor = Utilities.newCachedThreadPool(20, 8, TimeUnit.MINUTES); httpClient = HttpClient.newBuilder().executor(executor).build(); heartBeatTimer = UserThread.runPeriodically(this::sendHeartBeat, HEART_BEAT_DELAY_SEC); @@ -249,7 +249,6 @@ private void sendReportingItems(ReportingItems reportingItems) { .uri(URI.create(seedNodeReportingServerUrl)) .POST(HttpRequest.BodyPublishers.ofByteArray(protoMessageAsBytes)) .header("User-Agent", getMyAddress()) - .header("Connection", "keep-alive") .build(); httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()).whenComplete((response, throwable) -> { if (throwable != null) { @@ -258,9 +257,10 @@ private void sendReportingItems(ReportingItems reportingItems) { log.error("Response error message: {}", response); } }); + } catch (RejectedExecutionException t) { + log.warn("Did not send reportingItems {} because of RejectedExecutionException {}", reportingItems, t.toString()); } catch (Throwable t) { - // RejectedExecutionException is thrown if we exceed our pool size. - log.error("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); + log.warn("Did not send reportingItems {} because of exception {}", reportingItems, t.toString()); } }