Skip to content

Commit

Permalink
[fix][broker] monitor the load store isConnected and fix testIsolatio…
Browse files Browse the repository at this point in the history
…nPolicy
  • Loading branch information
heesung-sn committed Dec 20, 2023
1 parent b944f10 commit 1d25504
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -887,19 +887,40 @@ private void monitor() {
// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();

if (isChannelOwner) {
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
playLeader();
} else {
if (!topBundlesLoadDataStore.isConnected()) {
log.warn("Leader's topBundlesLoadDataStore is disconnected. Restarting it.");
topBundlesLoadDataStore.init();
}
if (!brokerLoadDataStore.isConnected()) {
log.warn("Leader's brokerLoadDataStore is disconnected. Restarting it.");
brokerLoadDataStore.init();
}
}
} else {
if (role != Follower) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the follower role.", role, isChannelOwner);
playFollower();
} else {
if (!topBundlesLoadDataStore.isConnected()) {
log.warn("Follower's topBundlesLoadDataStore is disconnected. Restarting it.");
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
}
if (!brokerLoadDataStore.isConnected()) {
log.warn("Follower's brokerLoadDataStore is disconnected. Restarting it.");
brokerLoadDataStore.init();
}
}
}

} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public interface LoadDataStore<T> extends Closeable {
*/
void startProducer() throws LoadDataStoreException;

/**
* Check if this store is connected.
*/
boolean isConnected();

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ public void startProducer() throws LoadDataStoreException {
}
}

@Override
public boolean isConnected() {
if (producer != null) {
return producer.isConnected();
}

// TODO: Currently, table view does not expose isConnected.
// Consider adding tableview.isConnected() in the future
return tableView != null;
}

@Override
public void close() throws IOException {
if (producer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
Expand Down Expand Up @@ -470,6 +475,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

doReturn(conf).when(ctx).brokerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,11 @@ public void testStopBroker() throws Exception {
}
}

String broker1 = admin.lookups().lookupTopic(topicName);
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
String broker1 = admin.lookups().lookupTopic(topicName);
assertNotEquals(broker1, broker);
});

assertNotEquals(broker1, broker);
}

@Test(timeOut = 40 * 1000)
Expand Down Expand Up @@ -308,7 +310,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {
assertEquals(result.size(), NUM_BROKERS);
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 40 * 1000, invocationCount = 30)
public void testIsolationPolicy() throws Exception {
final String namespaceIsolationPolicyName = "my-isolation-policy";
final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix;
Expand Down Expand Up @@ -351,7 +353,12 @@ public void testIsolationPolicy() throws Exception {
//expected when retried
}

String broker = admin.lookups().lookupTopic(topic);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
String broker = admin.lookups().lookupTopic(topic);
// This isolated topic should be assigned to the primary broker, broker-0
assertEquals(extractBrokerIndex(broker), 0);
});


for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
Expand All @@ -360,28 +367,29 @@ public void testIsolationPolicy() throws Exception {
}
}

assertEquals(extractBrokerIndex(broker), 0);

broker = admin.lookups().lookupTopic(topic);

final String brokerName = broker;
retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200);
assertEquals(extractBrokerIndex(broker), 1);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
String broker = admin.lookups().lookupTopic(topic);
// This isolated topic should be assigned to the secondary broker, broker-1
assertEquals(extractBrokerIndex(broker), 1);
});

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
if (name.contains("1")) {
container.stop();
}
}
try {
admin.lookups().lookupTopic(topic);
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
}

Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
// This isolated topic cannot be assigned to the remaining broker, broker-2
try {
String broker = admin.lookups().lookupTopic(topic);
log.warn("looked up broker {}. retrying...", broker);
} catch (Exception ex) {
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
}
});
}

private String getBrokerUrl(int index) {
Expand Down

0 comments on commit 1d25504

Please sign in to comment.