From 6ddadec7a53e5898d2d40d578749d35dbca65f63 Mon Sep 17 00:00:00 2001 From: Saranya Krishnakumar Date: Thu, 1 Feb 2024 11:25:11 -0800 Subject: [PATCH] Allow dynamic update of read and write limits set for Servers --- src/main/asciidoc/net.adoc | 12 +++ src/main/java/examples/NetExamples.java | 35 +++++++ .../java/io/vertx/core/http/HttpServer.java | 9 ++ .../java/io/vertx/core/net/NetServer.java | 8 ++ .../vertx/core/net/TrafficShapingOptions.java | 4 +- .../io/vertx/core/net/impl/TCPServerBase.java | 31 +++++- .../core/http/HttpBandwidthLimitingTest.java | 94 +++++++++++++++--- .../core/net/NetBandwidthLimitingTest.java | 97 +++++++++++++++++++ 8 files changed, 273 insertions(+), 17 deletions(-) diff --git a/src/main/asciidoc/net.adoc b/src/main/asciidoc/net.adoc index 160d4b5d1b1..9ba3c24b88f 100644 --- a/src/main/asciidoc/net.adoc +++ b/src/main/asciidoc/net.adoc @@ -365,6 +365,18 @@ through {@link io.vertx.core.net.NetServerOptions} and for HttpServer it can be {@link examples.NetExamples#configureTrafficShapingForHttpServer} ---- +These traffic shaping options can also be dynamically updated after server start. + +[source,$lang] +---- +{@link examples.NetExamples#dynamicallyUpdateTrafficShapingForNetServer} +---- + +[source,$lang] +---- +{@link examples.NetExamples#dynamicallyUpdateTrafficShapingForHttpServer} +---- + [[ssl]] === Configuring servers and clients to work with SSL/TLS diff --git a/src/main/java/examples/NetExamples.java b/src/main/java/examples/NetExamples.java index d4e9b5091d9..8451b101337 100755 --- a/src/main/java/examples/NetExamples.java +++ b/src/main/java/examples/NetExamples.java @@ -721,6 +721,23 @@ public void configureTrafficShapingForNetServer(Vertx vertx) { NetServer server = vertx.createNetServer(options); } + public void dynamicallyUpdateTrafficShapingForNetServer(Vertx vertx) { + NetServerOptions options = new NetServerOptions() + .setHost("localhost") + .setPort(1234) + .setTrafficShapingOptions(new TrafficShapingOptions() + .setInboundGlobalBandwidth(64 * 1024) + .setOutboundGlobalBandwidth(128 * 1024)); + NetServer server = vertx.createNetServer(options); + TrafficShapingOptions update = new TrafficShapingOptions() + .setInboundGlobalBandwidth(2 * 64 * 1024) // twice + .setOutboundGlobalBandwidth(128 * 1024); // unchanged + server + .listen(1234, "localhost") + // wait until traffic shaping handler is created for updates + .onSuccess(v -> server.updateTrafficShapingOptions(update)); + } + public void configureTrafficShapingForHttpServer(Vertx vertx) { HttpServerOptions options = new HttpServerOptions() .setHost("localhost") @@ -731,4 +748,22 @@ public void configureTrafficShapingForHttpServer(Vertx vertx) { HttpServer server = vertx.createHttpServer(options); } + + + public void dynamicallyUpdateTrafficShapingForHttpServer(Vertx vertx) { + HttpServerOptions options = new HttpServerOptions() + .setHost("localhost") + .setPort(1234) + .setTrafficShapingOptions(new TrafficShapingOptions() + .setInboundGlobalBandwidth(64 * 1024) + .setOutboundGlobalBandwidth(128 * 1024)); + HttpServer server = vertx.createHttpServer(options); + TrafficShapingOptions update = new TrafficShapingOptions() + .setInboundGlobalBandwidth(2 * 64 * 1024) // twice + .setOutboundGlobalBandwidth(128 * 1024); // unchanged + server + .listen(1234, "localhost") + // wait until traffic shaping handler is created for updates + .onSuccess(v -> server.updateTrafficShapingOptions(update)); + } } diff --git a/src/main/java/io/vertx/core/http/HttpServer.java b/src/main/java/io/vertx/core/http/HttpServer.java index 1a28fa44f2e..5bcffc57e51 100644 --- a/src/main/java/io/vertx/core/http/HttpServer.java +++ b/src/main/java/io/vertx/core/http/HttpServer.java @@ -21,6 +21,7 @@ import io.vertx.core.metrics.Measured; import io.vertx.core.net.SSLOptions; import io.vertx.core.net.SocketAddress; +import io.vertx.core.net.TrafficShapingOptions; import io.vertx.core.net.impl.SocketAddressImpl; import io.vertx.core.streams.ReadStream; @@ -182,6 +183,14 @@ default void updateSSLOptions(SSLOptions options, boolean force, Handler updateSSLOptions(SSLOptions options, boolean force) { } } + public void updateTrafficShapingOptions(TrafficShapingOptions options) { + if (options == null) { + throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update"); + } + if (trafficShapingHandler == null) { + throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " + + "to use traffic shaping during startup"); + } + TCPServerBase server = actualServer; + if (server != null && server != this) { + server.updateTrafficShapingOptions(options); + } else { + long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats()); + trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis); + + if (options.getPeakOutboundGlobalBandwidth() != 0) { + trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth()); + } + if (options.getMaxDelayToWait() != 0) { + long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait()); + trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis); + } + } + } + public Future bind(SocketAddress address) { ContextInternal listenContext = vertx.getOrCreateContext(); return listen(address, listenContext).map(this); diff --git a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java index 94cba5649d6..f8f9fb21331 100644 --- a/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java +++ b/src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java @@ -55,22 +55,27 @@ public static Iterable data() { Function http1ServerFactory = (v) -> Providers.http1Server(v, INBOUND_LIMIT, OUTBOUND_LIMIT); Function http2ServerFactory = (v) -> Providers.http2Server(v, INBOUND_LIMIT, OUTBOUND_LIMIT); + Function http1NonTrafficShapedServerFactory = (v) -> Providers.http1Server(v, 0, 0); + Function http2NonTrafficShapedServerFactory = (v) -> Providers.http1Server(v, 0, 0); Function http1ClientFactory = (v) -> v.createHttpClient(); Function http2ClientFactory = (v) -> v.createHttpClient(createHttp2ClientOptions()); return Arrays.asList(new Object[][] { - { 1.1, http1ServerFactory, http1ClientFactory }, - { 2.0, http2ServerFactory, http2ClientFactory } + { 1.1, http1ServerFactory, http1ClientFactory, http1NonTrafficShapedServerFactory }, + { 2.0, http2ServerFactory, http2ClientFactory, http2NonTrafficShapedServerFactory } }); } private Function serverFactory; private Function clientFactory; + private Function nonTrafficShapedServerFactory; public HttpBandwidthLimitingTest(double protoVersion, Function serverFactory, - Function clientFactory) { + Function clientFactory, + Function nonTrafficShapedServerFactory) { this.serverFactory = serverFactory; this.clientFactory = clientFactory; + this.nonTrafficShapedServerFactory = nonTrafficShapedServerFactory; } @Before @@ -199,6 +204,63 @@ public void start(Promise startPromise) { Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests } + @Test + public void testDynamicOutboundRateUpdate() throws Exception { + Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE); + + HttpServer testServer = serverFactory.apply(vertx); + testServer.requestHandler(HANDLERS.bufferRead(expectedBuffer)); + startServer(testServer); + + // update outbound rate to twice the limit + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged + .setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT); + testServer.updateTrafficShapingOptions(trafficOptions); + + long startTime = System.nanoTime(); + HttpClient testClient = clientFactory.apply(vertx); + read(expectedBuffer, testServer, testClient); + await(); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(TEST_CONTENT_SIZE, OUTBOUND_LIMIT)); + } + + @Test + public void testDynamicInboundRateUpdate() throws Exception { + Buffer expectedBuffer = TestUtils.randomBuffer((TEST_CONTENT_SIZE)); + + HttpServer testServer = serverFactory.apply(vertx); + testServer.requestHandler(HANDLERS.bufferWrite(expectedBuffer)); + startServer(testServer); + + // update inbound rate to twice the limit + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setOutboundGlobalBandwidth(OUTBOUND_LIMIT) // unchanged + .setInboundGlobalBandwidth(2 * INBOUND_LIMIT); + testServer.updateTrafficShapingOptions(trafficOptions); + + long startTime = System.nanoTime(); + HttpClient testClient = clientFactory.apply(vertx); + write(expectedBuffer, testServer, testClient); + await(); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(TEST_CONTENT_SIZE, INBOUND_LIMIT)); + } + + @Test(expected = IllegalStateException.class) + public void testRateUpdateWhenServerStartedWithoutTrafficShaping() { + HttpServer testServer = nonTrafficShapedServerFactory.apply(vertx); + + // update inbound rate to twice the limit + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setOutboundGlobalBandwidth(OUTBOUND_LIMIT) + .setInboundGlobalBandwidth(2 * INBOUND_LIMIT); + testServer.updateTrafficShapingOptions(trafficOptions); + } + /** * The throttling takes a while to kick in so the expected time cannot be strict especially * for small data sizes in these tests. @@ -211,6 +273,10 @@ private long expectedTimeMillis(long size, int rate) { return (long) (TimeUnit.MILLISECONDS.convert(( size / rate), TimeUnit.SECONDS) * 0.5); // multiplied by 0.5 to be more tolerant of time pauses during CI runs } + private long expectedUpperBoundTimeMillis(long size, int rate) { + return TimeUnit.MILLISECONDS.convert(( size / rate), TimeUnit.SECONDS); // Since existing rate will be upperbound, runs should complete by this time + } + private void read(Buffer expected, HttpServer server, HttpClient client) { client.request(HttpMethod.GET, server.actualPort(), DEFAULT_HTTP_HOST,"/buffer-read") .compose(req -> req.send() @@ -280,19 +346,25 @@ static class Providers { private static HttpServer http1Server(Vertx vertx, int inboundLimit, int outboundLimit) { HttpServerOptions options = new HttpServerOptions() .setHost(DEFAULT_HTTP_HOST) - .setPort(DEFAULT_HTTP_PORT) - .setTrafficShapingOptions(new TrafficShapingOptions() - .setInboundGlobalBandwidth(inboundLimit) - .setOutboundGlobalBandwidth(outboundLimit)); + .setPort(DEFAULT_HTTP_PORT); + + if (inboundLimit != 0 || outboundLimit != 0) { + options.setTrafficShapingOptions(new TrafficShapingOptions() + .setInboundGlobalBandwidth(inboundLimit) + .setOutboundGlobalBandwidth(outboundLimit)); + } return vertx.createHttpServer(options); } private static HttpServer http2Server(Vertx vertx, int inboundLimit, int outboundLimit) { - HttpServerOptions options = createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST) - .setTrafficShapingOptions(new TrafficShapingOptions() - .setInboundGlobalBandwidth(inboundLimit) - .setOutboundGlobalBandwidth(outboundLimit)); + HttpServerOptions options = createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST); + + if (inboundLimit != 0 || outboundLimit != 0) { + options.setTrafficShapingOptions(new TrafficShapingOptions() + .setInboundGlobalBandwidth(inboundLimit) + .setOutboundGlobalBandwidth(outboundLimit)); + } return vertx.createHttpServer(options); } diff --git a/src/test/java/io/vertx/core/net/NetBandwidthLimitingTest.java b/src/test/java/io/vertx/core/net/NetBandwidthLimitingTest.java index f7c6b763db8..d1864a28ae8 100644 --- a/src/test/java/io/vertx/core/net/NetBandwidthLimitingTest.java +++ b/src/test/java/io/vertx/core/net/NetBandwidthLimitingTest.java @@ -249,6 +249,96 @@ public void start(Promise startPromise) { assertTimeTakenFallsInRange(expectedTimeInMillis, elapsedMillis); } + @Test + public void testDynamicInboundRateUpdate() { + long startTime = System.nanoTime(); + + Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4); + Buffer received = Buffer.buffer(); + NetServer server = netServer(vertx); + + server.connectHandler(sock -> { + sock.handler(buff -> { + received.appendBuffer(buff); + if (received.length() == expected.length()) { + assertEquals(expected, received); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(received.length(), INBOUND_LIMIT)); + testComplete(); + } + }); + // Send some data to the client to trigger the buffer write + sock.write("foo"); + }); + Future result = server.listen(testAddress); + + // update rate + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setOutboundGlobalBandwidth(OUTBOUND_LIMIT) // unchanged + .setInboundGlobalBandwidth(2 * INBOUND_LIMIT); + server.updateTrafficShapingOptions(trafficOptions); + + result.onComplete(onSuccess(resp -> { + Future clientConnect = client.connect(testAddress); + clientConnect.onComplete(onSuccess(sock -> { + sock.handler(buf -> { + sock.write(expected); + }); + })); + })); + await(); + } + + @Test + public void testDynamicOutboundRateUpdate() { + long startTime = System.nanoTime(); + + Buffer expected = TestUtils.randomBuffer(64 * 1024 * 4); + Buffer received = Buffer.buffer(); + NetServer server = netServer(vertx); + server.connectHandler(sock -> { + sock.handler(buf -> { + sock.write(expected); + }); + }); + Future result = server.listen(testAddress); + + // update rate + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged + .setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT); + server.updateTrafficShapingOptions(trafficOptions); + + result.onComplete(onSuccess(resp -> { + Future clientConnect = client.connect(testAddress); + clientConnect.onComplete(onSuccess(sock -> { + sock.handler(buff -> { + received.appendBuffer(buff); + if (received.length() == expected.length()) { + assertEquals(expected, received); + long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(received.length(), OUTBOUND_LIMIT)); + testComplete(); + } + }); + sock.write("foo"); + })); + })); + await(); + } + + @Test(expected = IllegalStateException.class) + public void testRateUpdateWhenServerStartedWithoutTrafficShaping() { + NetServerOptions options = new NetServerOptions().setHost(DEFAULT_HOST).setPort(DEFAULT_PORT); + NetServer testServer = vertx.createNetServer(options); + + // update inbound rate to twice the limit + TrafficShapingOptions trafficOptions = new TrafficShapingOptions() + .setOutboundGlobalBandwidth(OUTBOUND_LIMIT) + .setInboundGlobalBandwidth(2 * INBOUND_LIMIT); + testServer.updateTrafficShapingOptions(trafficOptions); + } + /** * Calculate time taken for transfer given bandwidth limit set. * @@ -260,6 +350,13 @@ private long expectedTimeMillis(int size, int rate) { return TimeUnit.MILLISECONDS.convert((size / rate), TimeUnit.SECONDS); } + /** + * Upperbound time taken is calculated with old rate limit before update. Hence time taken should be less than this value. + */ + private long expectedUpperBoundTimeMillis(int size, int rate) { + return TimeUnit.MILLISECONDS.convert((size / rate), TimeUnit.SECONDS); + } + /** * The throttling takes a while to kick in so the expected time cannot be strict especially * for small data sizes in these tests.