Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-10: Removing cluster from topic name #1150

Merged
merged 15 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public void start() throws PulsarServerException {

this.webService = new WebService(this);
this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin", true);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true);
this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true);
this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true);

this.webService.addServlet("/metrics",
Expand Down Expand Up @@ -462,8 +463,7 @@ public void loadNamespaceDestinations(NamespaceBundle bundle) {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();

for (String topic : getNamespaceService().getListOfDestinations(nsName.getProperty(), nsName.getCluster(),
nsName.getLocalName())) {
for (String topic : getNamespaceService().getListOfDestinations(nsName)) {
try {
DestinationName dn = DestinationName.get(topic);
if (bundle.includes(dn)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
Expand Down Expand Up @@ -189,11 +190,22 @@ public void validatePoliciesReadOnlyAccess() {
protected List<String> getListOfNamespaces(String property) throws Exception {
List<String> namespaces = Lists.newArrayList();

for (String cluster : globalZk().getChildren(path(POLICIES, property), false)) {
// this will return a cluster in v1 and a namespace in v2
for (String clusterOrNamespace : globalZk().getChildren(path(POLICIES, property), false)) {
// Then get the list of namespaces
try {
for (String namespace : globalZk().getChildren(path(POLICIES, property, cluster), false)) {
namespaces.add(String.format("%s/%s/%s", property, cluster, namespace));
final List<String> children = globalZk().getChildren(path(POLICIES, property, clusterOrNamespace), false);
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(property, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace created
// with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list
if (globalZk().getData(path(POLICIES, namespace), false, null).length != 0) {
namespaces.add(namespace);
}
} else {
children.forEach(ns -> {
namespaces.add(NamespaceName.get(property, clusterOrNamespace, ns).toString());
});
}
} catch (KeeperException.NoNodeException e) {
// A cluster was deleted between the 2 getChildren() calls, ignoring
Expand All @@ -204,7 +216,56 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}


protected NamespaceName namespaceName;

protected void validateNamespaceName(String property, String namespace) {
try {
this.namespaceName = NamespaceName.get(property, namespace);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create namespace with invalid name {}", clientAppId(), namespace, e);
throw new RestException(Status.PRECONDITION_FAILED, "Namespace name is not valid");
}
}

@Deprecated
protected void validateNamespaceName(String property, String cluster, String namespace) {
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create namespace with invalid name {}", clientAppId(), namespace, e);
throw new RestException(Status.PRECONDITION_FAILED, "Namespace name is not valid");
}
}

protected DestinationName destinationName;

protected void validateDestinationName(String property, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.destinationName = DestinationName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace,
topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}

this.destinationName = DestinationName.get(domain(), namespaceName, topic);
}

@Deprecated
protected void validateDestinationName(String property, String cluster, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
this.destinationName = DestinationName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}/{}", clientAppId(), domain(), property, cluster,
namespace, topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}

/**
* Redirect the call to the specified broker
*
Expand All @@ -227,20 +288,20 @@ protected void validateBrokerName(String broker) throws MalformedURLException {
}
}

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
Policies policies = policiesCache().get(AdminResource.path(POLICIES, property, cluster, namespace))
Policies policies = policiesCache().get(AdminResource.path(POLICIES, namespaceName.toString()))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
Expand All @@ -249,35 +310,35 @@ public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}

ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
return pulsar().getConfigurationCache().propertiesCache();
}

ZooKeeperDataCache<Policies> policiesCache() {
protected ZooKeeperDataCache<Policies> policiesCache() {
return pulsar().getConfigurationCache().policiesCache();
}

ZooKeeperDataCache<LocalPolicies> localPoliciesCache() {
protected ZooKeeperDataCache<LocalPolicies> localPoliciesCache() {
return pulsar().getLocalZkCacheService().policiesCache();
}

ZooKeeperDataCache<ClusterData> clustersCache() {
protected ZooKeeperDataCache<ClusterData> clustersCache() {
return pulsar().getConfigurationCache().clustersCache();
}

ZooKeeperChildrenCache managedLedgerListCache() {
protected ZooKeeperChildrenCache managedLedgerListCache() {
return pulsar().getLocalZkCacheService().managedLedgerListCache();
}

Set<String> clusters() {
protected Set<String> clusters() {
try {
return pulsar().getConfigurationCache().clustersListCache().get();
} catch (Exception e) {
throw new RestException(e);
}
}

ZooKeeperChildrenCache clustersListCache() {
protected ZooKeeperChildrenCache clustersListCache() {
return pulsar().getConfigurationCache().clustersListCache();
}

Expand All @@ -297,32 +358,30 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
return pulsar().getConfigurationCache().failureDomainListCache();
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateClusterOwnership(dn.getCluster());
protected PartitionedTopicMetadata getPartitionedTopicMetadata(DestinationName destinationName,
boolean authoritative) {
validateClusterOwnership(destinationName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
validateGlobalNamespaceOwnership(dn.getNamespaceObject());
validateGlobalNamespaceOwnership(destinationName.getNamespaceObject());

try {
checkConnect(dn);
checkConnect(destinationName);
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(dn.getProperty());
validateAdminAccessOnProperty(destinationName.getProperty());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination,
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destinationName,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), destinationName.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), destinationName,
partitionMetadata.partitions);
}
return partitionMetadata;
Expand All @@ -339,8 +398,8 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarSe
}
}

protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(
PulsarService pulsar, String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
Expand Down Expand Up @@ -375,4 +434,22 @@ protected void validateClusterExists(String cluster) {
throw new RestException(e);
}
}

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = policiesCache().get(AdminResource.path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
throw new RestException(e);
}
}
}
Loading