Skip to content

Commit

Permalink
Clusters list API return clusters with local flag
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Jun 16, 2023
1 parent e8fe2ca commit 72a98bc
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
Expand Down Expand Up @@ -83,13 +84,14 @@ public class ClustersBase extends AdminResource {
@ApiResponse(code = 200, message = "Return a list of clusters."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void getClusters(@Suspended AsyncResponse asyncResponse) {
public void getClusters(@QueryParam("verbose") @DefaultValue("false") boolean verbose,
@Suspended AsyncResponse asyncResponse) {
clusterResources().listAsync()
.thenApply(clusters -> clusters.stream()
// Remove "global" cluster from returned list
.filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster))
.map(cluster -> pulsar().getConfig().getClusterName().equalsIgnoreCase(cluster)
? cluster + "(local)" : cluster
? (verbose ? (cluster + "(local)") : cluster) : cluster
).collect(Collectors.toSet()))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ public void internalConfigurationRetroCompatibility() throws Exception {
@Test
@SuppressWarnings("unchecked")
public void clusters() throws Exception {
assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), new HashSet<>());
assertEquals(asyncRequests(ctx -> clusters.getClusters(false, ctx)), new HashSet<>());
verify(clusters, never()).validateSuperUserAccessAsync();

asyncRequests(ctx -> clusters.createCluster(ctx,
"use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build()));
// ensure to read from ZooKeeper directly
//clusters.clustersListCache().clear();
assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), Set.of("use"));
assertEquals(asyncRequests(ctx -> clusters.getClusters(false, ctx)), Set.of("use"));
assertEquals(asyncRequests(ctx -> clusters.getClusters(true, ctx)), Set.of("use(local)"));

// Check creating existing cluster
try {
Expand Down Expand Up @@ -324,7 +325,7 @@ public void clusters() throws Exception {
clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());

asyncRequests(ctx -> clusters.deleteCluster(ctx, "use"));
assertEquals(asyncRequests(ctx -> clusters.getClusters(ctx)), new HashSet<>());
assertEquals(asyncRequests(ctx -> clusters.getClusters(false, ctx)), new HashSet<>());

try {
asyncRequests(ctx -> clusters.getCluster(ctx, "use"));
Expand Down Expand Up @@ -361,7 +362,7 @@ public void clusters() throws Exception {
clusterCache.invalidateAll();
store.invalidateAll();
try {
asyncRequests(ctx -> clusters.getClusters(ctx));
asyncRequests(ctx -> clusters.getClusters(false, ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public interface Clusters {
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getClusters() throws PulsarAdminException;
default List<String> getClusters() throws PulsarAdminException{
return getClusters(false);
}

/**
* Get the list of clusters asynchronously.
Expand All @@ -67,7 +69,42 @@ public interface Clusters {
* </pre>
*
*/
CompletableFuture<List<String>> getClustersAsync();
default CompletableFuture<List<String>> getClustersAsync() {
return getClustersAsync(false);
}

/**
* Get the list of clusters.
* <p/>
* Get the list of all the Pulsar clusters.
* <p/>
* Response Example:
*
* <pre>
* <code>["c1", "c2", "c3"]</code>
* </pre>
* @param verbose Verbose information output of the clusters
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getClusters(boolean verbose) throws PulsarAdminException;

/**
* Get the list of clusters asynchronously.
* <p/>
* Get the list of all the Pulsar clusters.
* <p/>
* Response Example:
*
* <pre>
* <code>["c1", "c2", "c3"]</code>
* </pre>
*
*/
CompletableFuture<List<String>> getClustersAsync(boolean verbose);

/**
* Get the configuration data for the specified cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ public CompletableFuture<List<String>> getClustersAsync() {
return asyncGetRequest(path, new FutureCallback<List<String>>(){});
}

@Override
public List<String> getClusters(boolean verbose) throws PulsarAdminException {
return sync(() -> getClustersAsync(verbose));
}

@Override
public CompletableFuture<List<String>> getClustersAsync(boolean verbose) {
WebTarget path = adminClusters.queryParam("verbose", verbose);
return asyncGetRequest(path, new FutureCallback<List<String>>(){});
}

@Override
public ClusterData getCluster(String cluster) throws PulsarAdminException {
return sync(() -> getClusterAsync(cluster));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ public class CmdClusters extends CmdBase {

@Parameters(commandDescription = "List the existing clusters")
private class List extends CliCommand {

@Parameter(names = { "-v", "--verbose" },
description = "Verbose information output of the clusters", required = false)
private boolean verbose = false;
void run() throws PulsarAdminException {
print(getAdmin().clusters().getClusters());
print(getAdmin().clusters().getClusters(verbose));
}
}

Expand Down

0 comments on commit 72a98bc

Please sign in to comment.