diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index b63a9dfbc..5dea8b1a3 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -547,11 +547,21 @@ enum Status { /** * Forces reconnect behavior. Stops the current connection including the reading and writing, * copies already queued outgoing messages, and then begins the reconnect logic. + * Does not flush. Does not force close the connection. See {@link #forceReconnect(ForceReconnectOptions)}. * @throws IOException the forceReconnect fails * @throws InterruptedException the connection is not connected */ void forceReconnect() throws IOException, InterruptedException; + /** + * Forces reconnect behavior. Stops the current connection including the reading and writing, + * copies already queued outgoing messages, and then begins the reconnect logic. + * @param options options for how the forceReconnect works + * @throws IOException the forceReconnect fails + * @throws InterruptedException the connection is not connected + */ + void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException; + /** * Calculates the round trip time between this client and the server. * @return the RTT as a duration diff --git a/src/main/java/io/nats/client/ForceReconnectOptions.java b/src/main/java/io/nats/client/ForceReconnectOptions.java new file mode 100644 index 000000000..f0eb40344 --- /dev/null +++ b/src/main/java/io/nats/client/ForceReconnectOptions.java @@ -0,0 +1,106 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client; + +import java.time.Duration; + +/** + * The PublishOptions class specifies the options for publishing with JetStream enabled servers. + * Options are created using a {@link ForceReconnectOptions.Builder Builder}. + */ +public class ForceReconnectOptions { + + public static final ForceReconnectOptions FORCE_CLOSE_INSTANCE = ForceReconnectOptions.builder().forceClose().build(); + + private final boolean forceClose; + private final Duration flushWait; + + private ForceReconnectOptions(Builder b) { + this.forceClose = b.forceClose; + this.flushWait = b.flushWait; + } + + public boolean isForceClose() { + return forceClose; + } + + public boolean isFlush() { + return flushWait != null; + } + + public Duration getFlushWait() { + return flushWait; + } + + /** + * Creates a builder for the options. + * @return the builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * ForceReconnectOptions are created using a Builder. + */ + public static class Builder { + boolean forceClose = false; + Duration flushWait; + + /** + * Constructs a new Builder with the default values. + */ + public Builder() {} + + public Builder forceClose() { + this.forceClose = true; + return this; + } + + /** + * @param flushWait if supplied and at least 1 millisecond, the forceReconnect will try to + * flush before closing for the specified wait time. Flush happens before close + * so not affected by forceClose option + * @return the builder + */ + public Builder flush(Duration flushWait) { + this.flushWait = flushWait == null || flushWait.toMillis() < 1 ? null : flushWait; + return this; + } + + /** + * @param flushWaitMillis if supplied and at least 1 millisecond, the forceReconnect will try to + * flush before closing for the specified wait time. Flush happens before close + * so not affected by forceClose option + * @return the builder + */ + public Builder flush(long flushWaitMillis) { + if (flushWaitMillis > 0) { + this.flushWait = Duration.ofMillis(flushWaitMillis); + } + else { + this.flushWait = null; + } + return this; + } + + /** + * Builds the ForceReconnectOptions. + * @return ForceReconnectOptions + */ + public ForceReconnectOptions build() { + return new ForceReconnectOptions(this); + } + } +} diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 414852b6d..5704171cc 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -1567,7 +1567,7 @@ public Builder proxy(Proxy proxy) { * @return the Builder for chaining */ public Builder dataPortType(String dataPortClassName) { - this.dataPortType = dataPortClassName; + this.dataPortType = dataPortClassName == null ? DEFAULT_DATA_PORT_TYPE : dataPortClassName; return this; } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index fd9fbd2d1..8d8fd44f5 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -271,10 +271,15 @@ else if (trace) { @Override public void forceReconnect() throws IOException, InterruptedException { + forceReconnect(null); + } + + @Override + public void forceReconnect(ForceReconnectOptions options) throws IOException, InterruptedException { if (!tryingToConnect.get()) { try { tryingToConnect.set(true); - forceReconnectImpl(); + forceReconnectImpl(options); } finally { tryingToConnect.set(false); @@ -282,7 +287,16 @@ public void forceReconnect() throws IOException, InterruptedException { } } - void forceReconnectImpl() throws IOException, InterruptedException { + void forceReconnectImpl(ForceReconnectOptions options) throws InterruptedException { + if (options != null && options.getFlushWait() != null) { + try { + flush(options.getFlushWait()); + } + catch (TimeoutException e) { + // ignore, don't care, too bad; + } + } + closeSocketLock.lock(); try { updateStatus(Status.DISCONNECTED); @@ -299,7 +313,12 @@ void forceReconnectImpl() throws IOException, InterruptedException { dataPort = null; executor.submit(() -> { try { - closeMe.forceClose(); + if (options != null && options.isForceClose()) { + closeMe.forceClose(); + } + else { + closeMe.close(); + } } catch (IOException ignore) {} }); diff --git a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java index c7ae9b492..84746a7f5 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java +++ b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java @@ -13,6 +13,7 @@ package io.nats.client.impl; +import io.nats.client.ForceReconnectOptions; import io.nats.client.Options; import io.nats.client.support.NatsUri; @@ -39,10 +40,10 @@ public void run() { writeWatcherTimer.cancel(); // we don't need to repeat this connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); try { - connection.forceReconnect(); + connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE); } catch (IOException e) { - // retry maybe? forceReconnect + // retry maybe? } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/src/test/java/io/nats/client/impl/ForceReconnectQueueCheckDataPort.java b/src/test/java/io/nats/client/impl/ForceReconnectQueueCheckDataPort.java new file mode 100644 index 000000000..0944b9d80 --- /dev/null +++ b/src/test/java/io/nats/client/impl/ForceReconnectQueueCheckDataPort.java @@ -0,0 +1,35 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io.nats.client.impl; + +import java.io.IOException; + +public class ForceReconnectQueueCheckDataPort extends SocketDataPort { + public static String WRITE_CHECK; + public static long DELAY; + + @Override + public void write(byte[] src, int toWrite) throws IOException { + String s = new String(src, 0, Math.min(7, toWrite)); + if (s.startsWith(WRITE_CHECK)) { + try { + Thread.sleep(DELAY); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + super.write(src, toWrite); + } +} diff --git a/src/test/java/io/nats/client/impl/ReconnectTests.java b/src/test/java/io/nats/client/impl/ReconnectTests.java index eb7eb0a66..2e7352a44 100644 --- a/src/test/java/io/nats/client/impl/ReconnectTests.java +++ b/src/test/java/io/nats/client/impl/ReconnectTests.java @@ -16,7 +16,6 @@ import io.nats.client.*; import io.nats.client.ConnectionListener.Events; import io.nats.client.api.ServerInfo; -import nats.io.NatsRunnerUtils; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -33,6 +32,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import static io.nats.client.NatsTestServer.getNatsLocalhostUri; import static io.nats.client.support.NatsConstants.OUTPUT_QUEUE_IS_FULL; import static io.nats.client.utils.TestBase.*; import static org.junit.jupiter.api.Assertions.*; @@ -671,7 +671,7 @@ public void testReconnectWait() throws Exception { @Test public void testReconnectOnConnect() throws Exception { int port = NatsTestServer.nextPort(); - Options options = Options.builder().server(NatsTestServer.getNatsLocalhostUri(port)).build(); + Options options = Options.builder().server(getNatsLocalhostUri(port)).build(); CountDownLatch latch = new CountDownLatch(1); AtomicReference testConn = new AtomicReference<>(); @@ -756,6 +756,125 @@ private static void _testForceReconnect(Connection nc0, ListenerForTesting liste assertTrue(listener.getConnectionEvents().contains(Events.RECONNECTED)); } + @Test + public void testForceReconnectQueueBehaviorCheck() throws Exception { + runInJsCluster((nc0, nc1, nc2) -> { + int pubCount = 1000000; + int subscribeTime = 4000; + int flushWait = 2500; + int port = nc0.getServerInfo().getPort(); + + ForceReconnectQueueCheckDataPort.DELAY = 75; + + String subject = subject(); + ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0); + + subject = subject(); + ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait); + + subject = subject(); + ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0); + + subject = subject(); + ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject; + _testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait); + }); + } + + private static void _testForceReconnectQueueCheck(String subject, int pubCount, int subscribeTime, int port, boolean forceClose, int flushWait) throws InterruptedException { + ReconnectQueueCheckSubscriber subscriber = new ReconnectQueueCheckSubscriber(subject, pubCount, port); + Thread tsub = new Thread(subscriber); + tsub.start(); + + ForceReconnectOptions.Builder froBuilder = ForceReconnectOptions.builder(); + if (flushWait > 0) { + froBuilder.flush(flushWait); + } + if (forceClose) { + froBuilder.forceClose(); + } + + Options options = Options.builder() + .server(getNatsLocalhostUri(port)) + .dataPortType(ForceReconnectQueueCheckDataPort.class.getCanonicalName()) + .build(); + + try (Connection nc = Nats.connect(options)) { + for (int x = 1; x <= pubCount; x++) { + nc.publish(subject, (x + "").getBytes()); + } + + nc.forceReconnect(froBuilder.build()); + + long maxTime = subscribeTime; + while (!subscriber.subscriberDone.get() && maxTime > 0) { + //noinspection BusyWait + Thread.sleep(50); + maxTime -= 50; + } + } + catch (Exception e) { + e.printStackTrace(); + } + + subscriber.subscriberDone.set(true); + tsub.join(); + + if (flushWait > 0) { + assertEquals(pubCount, subscriber.lastNotSkipped); + } + } + + static class ReconnectQueueCheckSubscriber implements Runnable { + final AtomicBoolean subscriberDone; + final String subject; + final int pubCount; + final int port; + boolean completed; + int lastNotSkipped; + int firstAfterSkip; + + public ReconnectQueueCheckSubscriber(String subject, int pubCount, int port) { + this.subscriberDone = new AtomicBoolean(false); + this.subject = subject; + this.pubCount = pubCount; + this.port = port; + lastNotSkipped = 0; + firstAfterSkip = -1; + completed = false; + } + + @Override + public void run() { + Options options = Options.builder().server(getNatsLocalhostUri(port)).build(); + try (Connection nc = Nats.connect(options)) { + Subscription sub = nc.subscribe(subject); + while (!subscriberDone.get()) { + Message m = sub.nextMessage(100); + if (m != null) { + String next = "" + (lastNotSkipped + 1); + String md = new String(m.getData()); + if (md.equals(next)) { + if (++lastNotSkipped >= pubCount) { + completed = true; + subscriberDone.set(true); + } + } + else { + firstAfterSkip = Integer.parseInt(md); + subscriberDone.set(true); + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + } @Test public void testSocketDataPortTimeout() throws Exception { @@ -774,8 +893,8 @@ public void testSocketDataPortTimeout() throws Exception { int port2 = nc2.getServerInfo().getPort(); String[] servers = new String[]{ - NatsRunnerUtils.getNatsLocalhostUri(port1), - NatsRunnerUtils.getNatsLocalhostUri(port2) + getNatsLocalhostUri(port1), + getNatsLocalhostUri(port2) }; Connection nc = standardConnection(builder.servers(servers).build()); String subject = subject();