Skip to content

Commit

Permalink
Unack message tracker for pre-fetched messages (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Oct 17, 2016
1 parent 1833296 commit 1e9fa9f
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Object[][] batchMessageDelayMsProvider() {

@DataProvider(name = "batch_with_timeout")
public Object[][] ackTimeoutSecProvider() {
return new Object[][] { { 0, 0 }, { 0, 1 }, { 1000, 0 }, { 1000, 1 } };
return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
}

@Test(dataProvider = "batch_with_timeout")
Expand Down Expand Up @@ -117,8 +117,6 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutS
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
Expand All @@ -137,7 +135,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout
Producer producer = pulsarClient.createProducer("persistent://my-property/tp1/my-ns/my-topic2", producerConf);
List<Future<MessageId>> futures = Lists.newArrayList();

int numMessages = 5001;
int numMessages = 50;
// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
final String message = "my-message-" + i;
Expand All @@ -155,9 +153,6 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout
for (int i = 0; i < numMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
if (i % 1000 == 0) {
log.info("Received message: [{}]", receivedMessage);
}
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
Expand All @@ -169,7 +164,7 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
log.info("-- Exiting {} test --", methodName);
}

Expand All @@ -178,8 +173,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
Expand Down Expand Up @@ -230,7 +223,7 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
Thread.sleep(5000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
validatingLogInfo(consumer, producer, batchMessageDelayMs == 0 && ackTimeoutSec > 0);
log.info("-- Exiting {} test --", methodName);
}

Expand Down Expand Up @@ -331,7 +324,8 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception {
log.info("-- Exiting {} test --", methodName);
}

public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) throws InterruptedException {
public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount)
throws InterruptedException {
// Waiting for recording last stat info
Thread.sleep(1000);
ConsumerStats cStat = consumer.getStats();
Expand All @@ -340,7 +334,7 @@ public void validatingLogInfo(Consumer consumer, Producer producer, boolean veri
assertEquals(pStat.getTotalBytesSent(), cStat.getTotalBytesReceived());
assertEquals(pStat.getTotalMsgsSent(), pStat.getTotalAcksReceived());
if (verifyAckCount) {
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import com.yahoo.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.ConsumerBase;
import com.yahoo.pulsar.client.impl.MessageIdImpl;

public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase {
private static final long testTimeout = 90000; // 1.5 min
Expand Down Expand Up @@ -87,7 +85,7 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception {
log.info("Consumer received : " + new String(message.getData()));
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
long size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size();
long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, totalMessages / 2);

Expand All @@ -108,7 +106,7 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
} while (message != null);

size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size();
size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
assertEquals(hSet.size(), totalMessages);
Expand Down Expand Up @@ -148,12 +146,12 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception {
log.info("Message ID details " + ((MessageIdImpl) message.getMessageId()).toString());
message = consumer.receive(500, TimeUnit.MILLISECONDS);
}
long size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size();
long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size();
assertEquals(size, totalMessages);
log.info("Comulative Ack sent for " + new String(lastMessage.getData()));
log.info("Message ID details " + ((MessageIdImpl) lastMessage.getMessageId()).toString());
consumer.acknowledgeCumulative(lastMessage);
size = ((ConsumerBase) consumer).getUnAckedMessageTracker().size();
size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size();
assertEquals(size, 0);
message = consumer.receive((int) (2 * ackTimeOutMillis), TimeUnit.MILLISECONDS);
assertEquals(message, null);
Expand Down Expand Up @@ -362,7 +360,7 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException,
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setReceiverQueueSize(7);
conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
ConsumerBase consumer = (ConsumerBase) pulsarClient.subscribe(topicName, subscriptionName, conf);
ConsumerImpl consumer = (ConsumerImpl) pulsarClient.subscribe(topicName, subscriptionName, conf);

// 3. producer publish messages
for (int i = 0; i < totalMessages; i++) {
Expand All @@ -371,25 +369,21 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException,
producer.send(message.getBytes());
}

// 4. Consumer receives the message
assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
consumer.receive();
assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
Thread.sleep(ackTimeOutMillis);
consumer.receive();
assertEquals(consumer.getUnAckedMessageTracker().size(), 2);
Thread.sleep((long) (ackTimeOutMillis * 1.1));
// Timeout should have been triggered here

// 5. Trying to receive messages again
assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
Message message1 = consumer.receive();
for (int i = 0; i < totalMessages - 1; i++) {
Message msg = consumer.receive();
consumer.acknowledge(msg);
}

assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
Thread.sleep(ackTimeOutMillis);
Message message2 = consumer.receive();
assertEquals(consumer.getUnAckedMessageTracker().size(), 2);
consumer.acknowledge(message1);
consumer.acknowledge(message2);

Message msg = consumer.receive();
consumer.acknowledge(msg);
assertEquals(consumer.getUnAckedMessageTracker().size(), 0);

Thread.sleep((long) (ackTimeOutMillis * 1.1));

assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ enum ConsumerType {
protected final ExecutorService listenerExecutor;
final BlockingQueue<Message> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
protected final UnAckedMessageTracker unAckedMessageTracker;

protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture, boolean useGrowableQueue) {
Expand All @@ -72,17 +71,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, String subscriptio
}
this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
if (conf.getAckTimeoutMillis() != 0) {
this.unAckedMessageTracker = new UnAckedMessageTracker();
this.unAckedMessageTracker.start(client, this, conf.getAckTimeoutMillis());
} else {
this.unAckedMessageTracker = null;
}

}

public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}

@Override
Expand Down
Loading

0 comments on commit 1e9fa9f

Please sign in to comment.