From 9b30517ebeed818e57d86dbe20e3cbcee7e07b89 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 8 Nov 2023 11:19:02 +0100 Subject: [PATCH 1/4] feat: add HTTP connection interceptor which TTL configuration --- .../rest/ConnectionClosingInterceptor.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java diff --git a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java new file mode 100644 index 0000000000..a23047738e --- /dev/null +++ b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java @@ -0,0 +1,96 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.rest; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.logging.Logger; +import javax.annotation.Nonnull; + +import okhttp3.Call; +import okhttp3.Connection; +import okhttp3.EventListener; +import okhttp3.Interceptor; +import okhttp3.Response; +import okhttp3.internal.connection.RealConnection; + +/** + * This interceptor closes connections that exceed a specified maximum age. + * It's beneficial for scenarios where your application requires establishing new connections to the same host after a predetermined interval. + * This interceptor is most effective in applications that use a single connection, meaning requests are not made in parallel. + *

+ * Caution is advised, as setting a very short interval can lead to performance issues because establishing new connections is a resource-intensive operation. + */ +public class ConnectionClosingInterceptor extends EventListener implements Interceptor { + + private static final Logger LOG = Logger.getLogger(ConnectionClosingInterceptor.class.getName()); + + private final ConcurrentMap connectionTimes = new ConcurrentHashMap<>(); + private final long connectionMaxAgeMillis; + + /** + * Create a new interceptor that will close connections older than the given max age. + * + * @param connectionMaxAge the max age of connections, the precision is milliseconds + */ + public ConnectionClosingInterceptor(@Nonnull final Duration connectionMaxAge) { + this.connectionMaxAgeMillis = connectionMaxAge.toMillis(); + } + + @Override + @Nonnull + public Response intercept(@Nonnull final Chain chain) throws IOException { + Connection connection = chain.connection(); + + // + // If the connection is old, mark it to not be reused. + // + if (connection != null && isConnectionOld(connection)) { + if (connection instanceof RealConnection) { + LOG.fine("Marking connection to not be reused: " + connection); + ((RealConnection) connection).noNewExchanges$okhttp(); + } else { + LOG.warning("Unable to mark connection to not be reused: " + connection); + } + } + + return chain.proceed(chain.request()); + } + + @Override + public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) { + connectionTimes.putIfAbsent(connection, System.currentTimeMillis()); + } + + /** + * Check if the connection is older than the max age. + * + * @param connection the connection to check + * @return true if the connection is older than the max age + */ + private boolean isConnectionOld(@Nonnull final Connection connection) { + Long time = connectionTimes.get(connection); + return (time != null && (System.currentTimeMillis() - time) > connectionMaxAgeMillis); + } +} From 7641ad101f6ca758065dda0b4513b61eb2799253 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 8 Nov 2023 11:25:05 +0100 Subject: [PATCH 2/4] fix: code style --- .../influxdb/rest/ConnectionClosingInterceptor.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java index a23047738e..0d4af8f871 100644 --- a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java +++ b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java @@ -36,11 +36,13 @@ import okhttp3.internal.connection.RealConnection; /** - * This interceptor closes connections that exceed a specified maximum age. - * It's beneficial for scenarios where your application requires establishing new connections to the same host after a predetermined interval. - * This interceptor is most effective in applications that use a single connection, meaning requests are not made in parallel. + * This interceptor closes connections that exceed a specified maximum lifetime age (TTL). It's beneficial for + * scenarios where your application requires establishing new connections to the same host after a predetermined + * interval. This interceptor is most effective in applications that use a single connection, meaning requests + * are not made in parallel. *

- * Caution is advised, as setting a very short interval can lead to performance issues because establishing new connections is a resource-intensive operation. + * Caution is advised, as setting a very short interval can lead to performance issues because + * establishing new connections is a resource-intensive operation. */ public class ConnectionClosingInterceptor extends EventListener implements Interceptor { From c3c6279a4a1550a1d9861f7997cca5e1cef42275 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 10 Nov 2023 12:54:27 +0100 Subject: [PATCH 3/4] chore: add integration test --- .../rest/ConnectionClosingInterceptor.java | 7 +- .../rest/ITConnectionClosingInterceptor.java | 143 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java diff --git a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java index 0d4af8f871..e23e8d7ff2 100644 --- a/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java +++ b/client-core/src/main/java/com/influxdb/rest/ConnectionClosingInterceptor.java @@ -72,6 +72,7 @@ public Response intercept(@Nonnull final Chain chain) throws IOException { if (connection instanceof RealConnection) { LOG.fine("Marking connection to not be reused: " + connection); ((RealConnection) connection).noNewExchanges$okhttp(); + connectionTimes.remove(connection); } else { LOG.warning("Unable to mark connection to not be reused: " + connection); } @@ -93,6 +94,10 @@ public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connecti */ private boolean isConnectionOld(@Nonnull final Connection connection) { Long time = connectionTimes.get(connection); - return (time != null && (System.currentTimeMillis() - time) > connectionMaxAgeMillis); + if (time == null) { + return false; + } + long age = System.currentTimeMillis() - time; + return age > connectionMaxAgeMillis; } } diff --git a/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java b/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java new file mode 100644 index 0000000000..141c9e609a --- /dev/null +++ b/client-core/src/test/java/com/influxdb/rest/ITConnectionClosingInterceptor.java @@ -0,0 +1,143 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.rest; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.logging.Logger; +import javax.annotation.Nonnull; + +import okhttp3.Call; +import okhttp3.Connection; +import okhttp3.EventListener; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.influxdb.test.AbstractMockServerTest; + +class ITConnectionClosingInterceptor extends AbstractMockServerTest { + + private static final Logger LOG = Logger.getLogger(ITConnectionClosingInterceptor.class.getName()); + + private String url; + private OkHttpClient client; + private ConnectionsListener connectionsListener; + + @BeforeEach + void setUp() { + connectionsListener = new ConnectionsListener(); + url = startMockServer(); + } + + @AfterEach + void tearDown() { + client.connectionPool().evictAll(); + client.dispatcher().executorService().shutdown(); + } + + @Test + public void withoutTTLonConnection() throws Exception { + + client = new OkHttpClient.Builder() + .eventListener(connectionsListener) + .build(); + + callApi(5, 3); + + Assertions.assertThat(connectionsListener.connections).hasSize(1); + Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1); + } + + @Test + public void withTTLonConnection() throws Exception { + + // Use connection TTL of 2 second + ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(Duration.ofSeconds(2)) { + + @Override + public void connectionAcquired(@NotNull Call call, @NotNull Connection connection) { + super.connectionAcquired(call, connection); + + // count the number of connections, the okhttp client can have only one listener => we have to use this + connectionsListener.connections.add(connection); + } + }; + + client = new OkHttpClient.Builder() + .addNetworkInterceptor(interceptor) + .eventListener(interceptor) + .protocols(Collections.singletonList(Protocol.HTTP_1_1)) + .build(); + + callApi(5, 3); + + Assertions.assertThat(connectionsListener.connections).hasSize(3); + Assertions.assertThat(client.connectionPool().connectionCount()).isEqualTo(1); + } + + /** + * Call API by specified times. + * + * @param times the number of times to call API + * @param sleepSeconds the number of seconds to sleep between calls + * @throws IOException if an error occurs + */ + private void callApi(final int times, final int sleepSeconds) throws Exception { + for (int i = 0; i < times; i++) { + mockServer.enqueue(createResponse("")); + + Request request = new Request.Builder() + .url(url) + .build(); + + LOG.info(String.format("Calling API %d", i)); + try (Response response = client.newCall(request).execute()) { + Assertions.assertThat(response.isSuccessful()).isTrue(); + } + + LOG.info(String.format("Sleeping %d seconds; connection counts: %d", sleepSeconds, connectionsListener.connections.size())); + Thread.sleep(sleepSeconds * 1000L); + } + } + + /** + * Event listener that store acquired connections. + */ + private static class ConnectionsListener extends EventListener { + private final Set connections = new HashSet<>(); + + @Override + public void connectionAcquired(@Nonnull final Call call, @Nonnull final Connection connection) { + connections.add(connection); + } + } +} From 964e31661ee26a487797645325a1c0363b3d3a5d Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Tue, 5 Dec 2023 15:06:18 +0100 Subject: [PATCH 4/4] docs: update CHANGELOG.md --- CHANGELOG.md | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 87708d6c4c..59df6b2138 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,76 @@ ## 6.12.0 [unreleased] +### Features +1. [#643](https://github.com/influxdata/influxdb-client-java/pull/643): `ConnectionClosingInterceptor` interceptor closes connections that exceed +a specified maximum lifetime age (TTL). It's beneficial for scenarios where your application requires establishing new connections to the same host after +a predetermined interval. + +The connection to the InfluxDB Enterprise with the `ConnectionClosingInterceptor` can be configured as follows: +```java +package example; + +import java.time.Duration; +import java.util.Collections; + +import okhttp3.OkHttpClient; +import okhttp3.Protocol; + +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.InfluxDBClientOptions; +import com.influxdb.client.domain.WriteConsistency; +import com.influxdb.rest.ConnectionClosingInterceptor; + +public class InfluxQLExample { + + public static void main(final String[] args) throws InterruptedException { + + // + // Credentials to connect to InfluxDB Enterprise + // + String url = "https://localhost:8086"; + String username = "admin"; + String password = "password"; + String database = "database"; + WriteConsistency consistency = WriteConsistency.ALL; + + // + // Configure underlying HTTP client + // + OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder() + .protocols(Collections.singletonList(Protocol.HTTP_1_1)); + + // + // Use new Connection TTL feature + // + Duration connectionMaxAge = Duration.ofMinutes(1); + ConnectionClosingInterceptor interceptor = new ConnectionClosingInterceptor(connectionMaxAge); + okHttpClientBuilder + .addNetworkInterceptor(interceptor) + .eventListenerFactory(call -> interceptor); + + // + // Configure InfluxDB client + // + InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder() + .url(url) + .org("-") + .authenticateToken(String.format("%s:%s", username, password).toCharArray()) + .bucket(String.format("%s/%s", database, "")) + .consistency(consistency) + .okHttpClient(okHttpClientBuilder); + + // + // Create client and write data + // + try (InfluxDBClient client = InfluxDBClientFactory.create(optionsBuilder.build())) { + + // ... + } + } +} +``` + ## 6.11.0 [2023-12-05] ### Features