Skip to content

Commit

Permalink
Fixed race condition on client reconnection logic (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Feb 17, 2017
1 parent e43b665 commit 956430d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -587,7 +588,8 @@ public void testFlowSendWhenPartitionedSubscribeCompletes() throws Exception {
client.close();
}

@Test
// Run this test multiple times to reproduce race conditions on reconnection logic
@Test(invocationCount = 100)
public void testProducerReconnect() throws Exception {
AtomicInteger numOfConnections = new AtomicInteger();
AtomicReference<ChannelHandlerContext> channelCtx = new AtomicReference<>();
Expand All @@ -614,13 +616,12 @@ public void testProducerReconnect() throws Exception {
Producer producer = client.createProducer("persistent://prop/use/ns/t1");

// close the cnx after creating the producer
channelCtx.get().channel().close();
Thread.sleep(300);
channelCtx.get().channel().close().get();

producer.send(new byte[0]);

assertEquals(msgSent.get(), true);
assertEquals(numOfConnections.get(), 3);
assertTrue(numOfConnections.get() >= 3);

mockBrokerService.resetHandleConnect();
mockBrokerService.resetHandleProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -140,12 +141,23 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress address,

future.channel().closeFuture().addListener(v -> {
// Remove connection from pool when it gets closed
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
}
cleanupConnection(address, connectionKey, cnxFuture);
});

// We are connected to broker, but need to wait until the connect/connected handshake is
// complete
final ClientCnx cnx = (ClientCnx) future.channel().pipeline().get("handler");
if (!future.channel().isActive() || cnx == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection was already closed by the time we got notified", future.channel());
}
cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
return;
}

cnx.connectionFuture().thenRun(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection handshake completed", cnx.channel());
Expand All @@ -158,7 +170,6 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress address,
cnx.ctx().close();
return null;
});

});

return cnxFuture;
Expand All @@ -176,7 +187,7 @@ private void cleanupConnection(InetSocketAddress address, int connectionKey,
map.remove(connectionKey, connectionFuture);
}
}

public static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % (long) divisor);
if (mod < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,15 @@ public void sendAsync(Message message, SendCallback callback) {
op.setBatchSizeByte(payload.readableBytes());
pendingMessages.put(op);

// Read the connection before validating if it's still connected, so that we avoid reading a null
// value
ClientCnx cnx = cnx();
if (isConnected()) {
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a
// new
// connection is established
cmd.retain();
cnx().ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx(), op));
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
} else {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -510,9 +513,10 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) {
* Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the
* message header-payload again.
* <ul>
* <li><b>if matches with existing checksum</b>: it means message was corrupt while sending to broker. So, resend message</li>
* <li><b>if doesn't match with existing checksum</b>: it means message is already corrupt and can't retry again. So, fail
* send-message by failing callback</li>
* <li><b>if matches with existing checksum</b>: it means message was corrupt while sending to broker. So, resend
* message</li>
* <li><b>if doesn't match with existing checksum</b>: it means message is already corrupt and can't retry again.
* So, fail send-message by failing callback</li>
* </ul>
*
* @param cnx
Expand All @@ -536,8 +540,8 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
op.callback.sendComplete(
new PulsarClientException.ChecksumException("Checksum failded on corrupt message"));
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
sequenceId, t);
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,
producerName, sequenceId, t);
}
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
Expand All @@ -548,7 +552,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
producerName, sequenceId);
}
}

} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Corrupt message is already timed out {}", topic, producerName, sequenceId);
Expand All @@ -558,7 +562,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId)
// as msg is not corrupted : let producer resend pending-messages again including checksum failed message
resendMessages(cnx);
}

/**
* Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that
* message is corrupt.
Expand Down Expand Up @@ -799,7 +803,7 @@ private void resendMessages(ClientCnx cnx) {

final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
for (OpSendMsg op : pendingMessages) {

if (stripChecksum) {
stripChecksum(op);
}
Expand All @@ -824,7 +828,7 @@ private void resendMessages(ClientCnx cnx) {
}

/**
* Strips checksum from {@link OpSendMsg} command if present else ignore it.
* Strips checksum from {@link OpSendMsg} command if present else ignore it.
*
* @param op
*/
Expand Down Expand Up @@ -875,7 +879,7 @@ private void stripChecksum(OpSendMsg op) {
public int brokerChecksumSupportedVersion() {
return ProtocolVersion.v6.getNumber();
}

@Override
String getHandlerName() {
return producerName;
Expand Down Expand Up @@ -1058,27 +1062,27 @@ private void batchMessageAndSend() {
/**
* Casts input cmd to {@link DoubleByteBuf}
*
* Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type of {@link io.netty.buffer.WrappedByteBuf})
* So, this method casts input cmd to {@link DoubleByteBuf} else retrieves it from LeakAwareByteBuf.
* Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type
* of {@link io.netty.buffer.WrappedByteBuf}) So, this method casts input cmd to {@link DoubleByteBuf} else
* retrieves it from LeakAwareByteBuf.
*
* @param cmd
* @return DoubleByteBuf or null in case failed to cast input {@link ByteBuf}
*/
private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) {
DoubleByteBuf msg = null;
if (cmd instanceof DoubleByteBuf) {
msg = (DoubleByteBuf) cmd;
msg = (DoubleByteBuf) cmd;
} else {
try {
msg = (DoubleByteBuf) cmd.unwrap();
msg = (DoubleByteBuf) cmd.unwrap();
} catch (Exception e) {
log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(),
e);
log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(), e);
}
}
return msg;
}

public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {
Expand All @@ -1099,7 +1103,8 @@ public int getPendingQueueSize() {
return pendingMessages.size();
}

private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());
private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(ZoneId.systemDefault());

private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) {
switch (compressionType) {
Expand Down

0 comments on commit 956430d

Please sign in to comment.