Skip to content

Commit

Permalink
fix Intermittent test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Mar 6, 2017
1 parent 3345d24 commit c7dcdfd
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,15 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio
stopBroker();
conf.setMaxConcurrentLookupRequest(1);
startBroker();
// wait for consumer to reconnect
Thread.sleep(3000);

// wait strategically for all consumers to reconnect
for (int i = 0; i < 5; i++) {
if (!areAllConsumersConnected(consumers)) {
Thread.sleep(1000 + (i * 500));
} else {
break;
}
}

int totalConnectedConsumers = 0;
for (int i = 0; i < consumers.size(); i++) {
Expand All @@ -265,6 +272,15 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio
pulsarClient.close();
}

private boolean areAllConsumersConnected(List<Consumer> consumers) {
for (int i = 0; i < consumers.size(); i++) {
if (!((ConsumerImpl) consumers.get(i)).isConnected()) {
return false;
}
}
return true;
}

private void upsertLookupPermits(int permits) throws Exception {
Map<String, String> throttlingMap = Maps.newHashMap();
throttlingMap.put("maxConcurrentLookupRequest", Integer.toString(permits));
Expand Down

0 comments on commit c7dcdfd

Please sign in to comment.