Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored replicator test with backlog quota interactions #773

Merged
merged 1 commit into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ public PulsarStats(PulsarService pulsar) {

@Override
public void close() {
ReferenceCountUtil.safeRelease(topicStatsBuf);
ReferenceCountUtil.safeRelease(tempTopicStatsBuf);
bufferLock.writeLock().lock();
try {
ReferenceCountUtil.safeRelease(topicStatsBuf);
ReferenceCountUtil.safeRelease(tempTopicStatsBuf);
} finally {
bufferLock.writeLock().unlock();
}
}

public ClusterReplicationMetrics getClusterReplicationMetrics() {
Expand Down Expand Up @@ -193,7 +198,7 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
log.warn("Exception while recording topic load time for topic {}, {}", topic, ex.getMessage());
}
}

public void recordZkLatencyTimeValue(EventType eventType, long latencyMs) {
try {
if (EventType.write.equals(eventType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,58 +690,58 @@ public void testReplicatorProducerClosing() throws Exception {
* @throws Exception
*/

@Test(enabled = true, priority = -1)
@Test(timeOut = 60000, enabled = true, priority = -1)
public void testResumptionAfterBacklogRelaxed() throws Exception {

List<RetentionPolicy> policies = Lists.newArrayList();
policies.add(RetentionPolicy.producer_exception);
policies.add(RetentionPolicy.producer_request_hold);

for (RetentionPolicy policy : policies) {
// Use 1Mb quota by default
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1 * 1024 * 1024, policy));
Thread.sleep(200);

DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/%s", policy));
DestinationName dest = DestinationName
.get(String.format("persistent://pulsar/global/ns1/%s-%d", policy, System.currentTimeMillis()));

// Producer on r1
MessageProducer producer1 = new MessageProducer(url1, dest);

// Consumer on r1
MessageConsumer consumer1 = new MessageConsumer(url1, dest);

// Consumer on r2
MessageConsumer consumer2 = new MessageConsumer(url2, dest);

// Replicator for r1 -> r2
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString());
Replicator replicator = topic.getPersistentReplicator("r2");

// Restrict backlog quota limit to 1
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Produce a message to r1, then it will be replicated to r2 and fulfill the backlog.
// Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of local backlog
producer1.produce(1);
consumer1.receive(1);
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// Produce 9 messages to r1, then it will be pended because of the backlog limit excess
producer1.produce(9);
consumer1.receive(9);
Thread.sleep(1000L);
assertEquals(replicator.getStats().replicationBacklog, 9);
Thread.sleep(500);

// Restrict backlog quota limit to 1 byte to stop replication
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));

// Relax backlog quota limit to 1G
admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1024 * 1024 * 1024, policy));
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

// The messages should be replicated to r2
assertEquals(replicator.getStats().replicationBacklog, 0);

// Next message will not be replicated, because r2 has reached the quota
producer1.produce(1);

Thread.sleep(500);

assertEquals(replicator.getStats().replicationBacklog, 1);

// Consumer will now drain 1 message and the replication backlog will be cleared
consumer2.receive(1);
consumer2.receive(9);
if (!consumer2.drained()) {
throw new Exception("consumer2 - unexpected message in queue");
}

// Wait until the 2nd message got delivered to consumer
consumer2.receive(1);

assertEquals(replicator.getStats().replicationBacklog, 0);

producer1.close();
consumer1.close();
consumer2.close();
}
}
Expand Down