diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/ClientErrorsTest.java index 8fc988af9a460..73752a3668772 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/ClientErrorsTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.fail; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -587,7 +588,8 @@ public void testFlowSendWhenPartitionedSubscribeCompletes() throws Exception { client.close(); } - @Test + // Run this test multiple times to reproduce race conditions on reconnection logic + @Test(invocationCount = 100) public void testProducerReconnect() throws Exception { AtomicInteger numOfConnections = new AtomicInteger(); AtomicReference channelCtx = new AtomicReference<>(); @@ -614,13 +616,12 @@ public void testProducerReconnect() throws Exception { Producer producer = client.createProducer("persistent://prop/use/ns/t1"); // close the cnx after creating the producer - channelCtx.get().channel().close(); - Thread.sleep(300); + channelCtx.get().channel().close().get(); producer.send(new byte[0]); assertEquals(msgSent.get(), true); - assertEquals(numOfConnections.get(), 3); + assertTrue(numOfConnections.get() >= 3); mockBrokerService.resetHandleConnect(); mockBrokerService.resetHandleProducer(); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConnectionPool.java index 9a868f8d65879..5741a0e27225e 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConnectionPool.java @@ -36,6 +36,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -140,12 +141,23 @@ private CompletableFuture createConnection(InetSocketAddress address, future.channel().closeFuture().addListener(v -> { // Remove connection from pool when it gets closed + if (log.isDebugEnabled()) { + log.debug("Removing closed connection from pool: {}", v); + } cleanupConnection(address, connectionKey, cnxFuture); }); // We are connected to broker, but need to wait until the connect/connected handshake is // complete final ClientCnx cnx = (ClientCnx) future.channel().pipeline().get("handler"); + if (!future.channel().isActive() || cnx == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Connection was already closed by the time we got notified", future.channel()); + } + cnxFuture.completeExceptionally(new ChannelException("Connection already closed")); + return; + } + cnx.connectionFuture().thenRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] Connection handshake completed", cnx.channel()); @@ -158,7 +170,6 @@ private CompletableFuture createConnection(InetSocketAddress address, cnx.ctx().close(); return null; }); - }); return cnxFuture; @@ -176,7 +187,7 @@ private void cleanupConnection(InetSocketAddress address, int connectionKey, map.remove(connectionKey, connectionFuture); } } - + public static int signSafeMod(long dividend, int divisor) { int mod = (int) (dividend % (long) divisor); if (mod < 0) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java index e5da39985aef6..161220666f177 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ProducerImpl.java @@ -245,12 +245,15 @@ public void sendAsync(Message message, SendCallback callback) { op.setBatchSizeByte(payload.readableBytes()); pendingMessages.put(op); + // Read the connection before validating if it's still connected, so that we avoid reading a null + // value + ClientCnx cnx = cnx(); if (isConnected()) { // If we do have a connection, the message is sent immediately, otherwise we'll try again once a // new // connection is established cmd.retain(); - cnx().ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx(), op)); + cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op)); stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte); } else { if (log.isDebugEnabled()) { @@ -510,9 +513,10 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { * Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the * message header-payload again. *
    - *
  • if matches with existing checksum: it means message was corrupt while sending to broker. So, resend message
  • - *
  • if doesn't match with existing checksum: it means message is already corrupt and can't retry again. So, fail - * send-message by failing callback
  • + *
  • if matches with existing checksum: it means message was corrupt while sending to broker. So, resend + * message
  • + *
  • if doesn't match with existing checksum: it means message is already corrupt and can't retry again. + * So, fail send-message by failing callback
  • *
* * @param cnx @@ -536,8 +540,8 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) op.callback.sendComplete( new PulsarClientException.ChecksumException("Checksum failded on corrupt message")); } catch (Throwable t) { - log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, - sequenceId, t); + log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, + producerName, sequenceId, t); } ReferenceCountUtil.safeRelease(op.cmd); op.recycle(); @@ -548,7 +552,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) producerName, sequenceId); } } - + } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Corrupt message is already timed out {}", topic, producerName, sequenceId); @@ -558,7 +562,7 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) // as msg is not corrupted : let producer resend pending-messages again including checksum failed message resendMessages(cnx); } - + /** * Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that * message is corrupt. @@ -799,7 +803,7 @@ private void resendMessages(ClientCnx cnx) { final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion(); for (OpSendMsg op : pendingMessages) { - + if (stripChecksum) { stripChecksum(op); } @@ -824,7 +828,7 @@ private void resendMessages(ClientCnx cnx) { } /** - * Strips checksum from {@link OpSendMsg} command if present else ignore it. + * Strips checksum from {@link OpSendMsg} command if present else ignore it. * * @param op */ @@ -875,7 +879,7 @@ private void stripChecksum(OpSendMsg op) { public int brokerChecksumSupportedVersion() { return ProtocolVersion.v6.getNumber(); } - + @Override String getHandlerName() { return producerName; @@ -1058,8 +1062,9 @@ private void batchMessageAndSend() { /** * Casts input cmd to {@link DoubleByteBuf} * - * Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type of {@link io.netty.buffer.WrappedByteBuf}) - * So, this method casts input cmd to {@link DoubleByteBuf} else retrieves it from LeakAwareByteBuf. + * Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type + * of {@link io.netty.buffer.WrappedByteBuf}) So, this method casts input cmd to {@link DoubleByteBuf} else + * retrieves it from LeakAwareByteBuf. * * @param cmd * @return DoubleByteBuf or null in case failed to cast input {@link ByteBuf} @@ -1067,18 +1072,17 @@ private void batchMessageAndSend() { private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) { DoubleByteBuf msg = null; if (cmd instanceof DoubleByteBuf) { - msg = (DoubleByteBuf) cmd; + msg = (DoubleByteBuf) cmd; } else { try { - msg = (DoubleByteBuf) cmd.unwrap(); + msg = (DoubleByteBuf) cmd.unwrap(); } catch (Exception e) { - log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(), - e); + log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(), e); } } return msg; } - + public long getDelayInMillis() { OpSendMsg firstMsg = pendingMessages.peek(); if (firstMsg != null) { @@ -1099,7 +1103,8 @@ public int getPendingQueueSize() { return pendingMessages.size(); } - private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); + private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS") + .withZone(ZoneId.systemDefault()); private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) { switch (compressionType) {