Skip to content

Commit

Permalink
merge: #11311
Browse files Browse the repository at this point in the history
11311: [Backport stable/8.1] Close channels on Timeout r=Zelldon a=backport-action

# Description
Backport of #11307 to `stable/8.1`.

relates to zeebe-io/zeebe-chaos#294 zeebe-io/zeebe-chaos#294

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and ChrisKujawa authored Dec 21, 2022
2 parents aea7e81 + 135d274 commit 84581b8
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.util.StringUtil;
import io.camunda.zeebe.util.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -130,9 +131,28 @@ public NettyMessagingService(
this.advertisedAddress = advertisedAddress;
this.protocolVersion = protocolVersion;
this.config = config;
this.channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());

openFutures = new CopyOnWriteArrayList<>();
initAddresses(config);
}

@VisibleForTesting
// duplicated for tests - to inject channel pool
NettyMessagingService(
final String cluster,
final Address advertisedAddress,
final MessagingConfig config,
final ProtocolVersion protocolVersion,
final Function<Function<Address, CompletableFuture<Channel>>, ChannelPool>
channelPoolFactor) {
preamble = cluster.hashCode();
this.advertisedAddress = advertisedAddress;
this.protocolVersion = protocolVersion;
this.config = config;
this.channelPool = channelPoolFactor.apply(this::openChannel);

openFutures = new CopyOnWriteArrayList<>();
channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());
initAddresses(config);
}

Expand Down Expand Up @@ -497,35 +517,47 @@ private <T> CompletableFuture<T> executeOnPooledConnection(
* @param type the message type to map to the connection
* @param callback the callback to execute
* @param executor an executor on which to complete the callback future
* @param future the future to be completed once the callback future is complete
* @param responseFuture the future to be completed once the callback future is complete
* @param <T> the callback response type
*/
private <T> void executeOnPooledConnection(
final Address address,
final String type,
final Function<ClientConnection, CompletableFuture<T>> callback,
final Executor executor,
final CompletableFuture<T> future) {
final CompletableFuture<T> responseFuture) {
if (address.equals(advertisedAddress)) {
callback
.apply(localConnection)
.whenComplete(
(result, error) -> {
if (error == null) {
executor.execute(() -> future.complete(result));
executor.execute(() -> responseFuture.complete(result));
} else {
executor.execute(() -> future.completeExceptionally(error));
executor.execute(() -> responseFuture.completeExceptionally(error));
}
});
return;
}

openFutures.add(future);
openFutures.add(responseFuture);
channelPool
.getChannel(address, type)
.whenComplete(
(channel, channelError) -> {
if (channelError == null) {
responseFuture.whenComplete(
(response, error) -> {
if (error instanceof TimeoutException) {
// response future has been completed exceptionally by our
// timeout check, we will not receive any response on this channel
// if we talk with the wrong IP or node
// See https://github.com/zeebe-io/zeebe-chaos/issues/294
// In order to no longer reuse a maybe broken/outdated channel we close it
// On next request a new channel will be created
channel.close();
}
});
final ClientConnection connection = getOrCreateClientConnection(channel);
callback
.apply(connection)
Expand All @@ -534,8 +566,8 @@ private <T> void executeOnPooledConnection(
if (sendError == null) {
executor.execute(
() -> {
future.complete(result);
openFutures.remove(future);
responseFuture.complete(result);
openFutures.remove(responseFuture);
});
} else {
final Throwable cause = Throwables.getRootCause(sendError);
Expand All @@ -553,16 +585,16 @@ private <T> void executeOnPooledConnection(
}
executor.execute(
() -> {
future.completeExceptionally(sendError);
openFutures.remove(future);
responseFuture.completeExceptionally(sendError);
openFutures.remove(responseFuture);
});
}
});
} else {
executor.execute(
() -> {
future.completeExceptionally(channelError);
openFutures.remove(future);
responseFuture.completeExceptionally(channelError);
openFutures.remove(responseFuture);
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.slf4j.LoggerFactory.getLogger;

import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -32,6 +35,7 @@
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.netty.channel.Channel;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Arrays;
Expand All @@ -47,9 +51,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -582,6 +588,105 @@ public void testCompletableRemoteHandlerFailureEmptyStringValue() {
.withMessage(expectedException.getMessage());
}

@Test
public void shouldCloseChannelAfterTimeout() {
// given
final var subject = nextSubject();
final var expectedException = new TimeoutException();

final AtomicInteger channelsOpen = new AtomicInteger(0);
final AtomicReference<Channel> channelRef = new AtomicReference<>();
final ManagedMessagingService nettyWithOwnPool =
createMessagingServiceWithSpiedPool(channelsOpen, channelRef);

netty2.registerHandler(
subject, (address, bytes) -> new CompletableFuture<>()); // never complete this future

// when
final CompletableFuture<byte[]> response =
nettyWithOwnPool.sendAndReceive(address2, subject, "fail".getBytes());

// then
assertThat(response)
.failsWithin(Duration.ofSeconds(15))
.withThrowableOfType(ExecutionException.class)
.havingRootCause()
.isInstanceOf(expectedException.getClass())
.withMessageContaining("timed out in");
// closing causes an CloseException which causes another close
verify(channelRef.get(), timeout(1000).atLeast(1)).close();
}

@Test
public void shouldCreateNewChannelOnNewRequestAfterTimeout() {
// given
final var subject = nextSubject();
final var expectedException = new TimeoutException();

final AtomicInteger channelsOpen = new AtomicInteger(0);
final AtomicReference<Channel> channelRef = new AtomicReference<>();
final ManagedMessagingService nettyWithOwnPool =
createMessagingServiceWithSpiedPool(channelsOpen, channelRef);

netty2.registerHandler(
subject, (address, bytes) -> new CompletableFuture<>()); // never complete this future
final CompletableFuture<byte[]> response =
nettyWithOwnPool.sendAndReceive(address2, subject, "fail".getBytes());
assertThat(response)
.failsWithin(Duration.ofSeconds(15))
.withThrowableOfType(ExecutionException.class)
.havingRootCause()
.isInstanceOf(expectedException.getClass())
.withMessageContaining("timed out in");

// when
nettyWithOwnPool.sendAndReceive(address2, subject, "fail".getBytes());

// then
// closing causes an CloseException which causes another close
verify(channelRef.get(), timeout(1000).atLeast(1)).close();
Awaitility.await("channels should be recreated").until(channelsOpen::get, c -> c == 2);
}

private static ManagedMessagingService createMessagingServiceWithSpiedPool(
final AtomicInteger channelsOpen, final AtomicReference<Channel> channelRef) {
final var otherAddress = Address.from(SocketUtil.getNextAddress().getPort());
final var config = new MessagingConfig();
return (ManagedMessagingService)
new NettyMessagingService(
"test",
otherAddress,
config,
ProtocolVersion.V2,
(channelFactory) ->
// returning channel pool - to create spied channels
new ChannelPool(
(addr) -> {
// channel factory
channelsOpen.incrementAndGet();
// we have to return another future so our spy is used in the messaging
// service
final CompletableFuture<Channel> channelSpyFuture =
new CompletableFuture<>();
final CompletableFuture<Channel> channelCreationFuture =
channelFactory.apply(addr);
channelCreationFuture.whenComplete(
(channel, err) -> {
if (err == null) {
final Channel spy = spy(channel);
channelRef.set(spy);
channelSpyFuture.complete(spy);
} else {
channelSpyFuture.completeExceptionally(err);
}
});
return channelSpyFuture;
},
config.getConnectionPoolSize()))
.start()
.join();
}

@Test
public void testNoRemoteHandlerException() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* comprehensive tests. In general, you should try to avoid this, but at times there is no other
* way.
*/
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD})
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
@Retention(RetentionPolicy.SOURCE)
@Documented
public @interface VisibleForTesting {}

0 comments on commit 84581b8

Please sign in to comment.