Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-2: Introduce non-persistent topics #538

Merged
merged 11 commits into from
Jul 28, 2017
46 changes: 29 additions & 17 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,6 @@ statusFilePath=
# use only brokers running the latest software version (to minimize impact to bundles)
preferLaterVersions=false

### --- Authentication --- ###

# Enable TLS
tlsEnabled=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Path for the trusted TLS certificate file
tlsTrustCertsFilePath=

# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
Expand Down Expand Up @@ -129,6 +112,35 @@ maxConcurrentLookupRequest=10000
# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

### --- Authentication --- ###

# Enable TLS
tlsEnabled=false

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Path for the trusted TLS certificate file
tlsTrustCertsFilePath=

# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

### --- Authentication --- ###

# Enable authentication
Expand Down
18 changes: 18 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

# Max number of concurrent topic loading request broker allows to control number of zk-operations
maxConcurrentTopicLoadRequest=5000

# Max concurrent non-persistent message can be processed per connection
maxConcurrentNonPersistentMessagePerConnection=1000

# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;

import io.netty.buffer.ByteBuf;

public class EntryCacheManager {

private final long maxSize;
Expand Down Expand Up @@ -236,5 +238,9 @@ public int compareTo(EntryCache other) {

}

public static Entry create(long ledgerId, long entryId, ByteBuf data) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for non-persistent topics we want to create Entry with specific ledgerId-entryId and payload so, it can be pass to other entities (eg: dispatcher, consumer) as those entities APIs requires a Entry wrapper to pass payload/msgId information.

return EntryImpl.create(ledgerId, entryId, data);
}

private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Max number of concurrent topic loading request broker allows to control number of zk-operations
@FieldContext(dynamic = true)
private int maxConcurrentTopicLoadRequest = 5000;
// Max concurrent non-persistent message can be processed per connection
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
// Number of worker threads to serve non-persistent topic
private int numWorkerThreadsForNonPersistentTopic = 8;
// Enable broker to load persistent topics
private boolean enablePersistentTopics = true;
// Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -507,6 +515,38 @@ public void setMaxConcurrentTopicLoadRequest(int maxConcurrentTopicLoadRequest)
this.maxConcurrentTopicLoadRequest = maxConcurrentTopicLoadRequest;
}

public int getMaxConcurrentNonPersistentMessagePerConnection() {
return maxConcurrentNonPersistentMessagePerConnection;
}

public void setMaxConcurrentNonPersistentMessagePerConnection(int maxConcurrentNonPersistentMessagePerConnection) {
this.maxConcurrentNonPersistentMessagePerConnection = maxConcurrentNonPersistentMessagePerConnection;
}

public int getNumWorkerThreadsForNonPersistentTopic() {
return numWorkerThreadsForNonPersistentTopic;
}

public void setNumWorkerThreadsForNonPersistentTopic(int numWorkerThreadsForNonPersistentTopic) {
this.numWorkerThreadsForNonPersistentTopic = numWorkerThreadsForNonPersistentTopic;
}

public boolean isEnablePersistentTopics() {
return enablePersistentTopics;
}

public void setEnablePersistentTopics(boolean enablePersistentTopics) {
this.enablePersistentTopics = enablePersistentTopics;
}

public boolean isEnableNonPersistentTopics() {
return enableNonPersistentTopics;
}

public void setEnableNonPersistentTopics(boolean enableNonPersistentTopics) {
this.enableNonPersistentTopics = enableNonPersistentTopics;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
private final String webServiceUrlTls;
private final String pulsarServiceUrl;
private final String pulsarServiceUrlTls;
private boolean persistentTopicsEnabled=true;
private boolean nonPersistentTopicsEnabled=true;

// Most recently available system resource usage.
private ResourceUsage cpu;
Expand Down Expand Up @@ -370,5 +372,21 @@ public String getPulsarServiceUrl() {
public String getPulsarServiceUrlTls() {
return pulsarServiceUrlTls;
}

public boolean isPersistentTopicsEnabled() {
return persistentTopicsEnabled;
}

public void setPersistentTopicsEnabled(boolean persistentTopicsEnabled) {
this.persistentTopicsEnabled = persistentTopicsEnabled;
}

public boolean isNonPersistentTopicsEnabled() {
return nonPersistentTopicsEnabled;
}

public void setNonPersistentTopicsEnabled(boolean nonPersistentTopicsEnabled) {
this.nonPersistentTopicsEnabled = nonPersistentTopicsEnabled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
Expand All @@ -36,17 +37,21 @@
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
Expand All @@ -64,6 +69,7 @@ public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
public static final String LOAD_SHEDDING_UNLOAD_DISABLED_FLAG_PATH = "/admin/flags/load-shedding-unload-disabled";
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";

protected ZooKeeper globalZk() {
return pulsar().getGlobalZkCache().getZooKeeper();
Expand Down Expand Up @@ -97,12 +103,10 @@ protected void zkCreateOptimistic(String path, byte[] content) throws Exception
* Get the domain of the destination (whether it's queue or topic)
*/
protected String domain() {
if (uri.getPath().startsWith("queues/")) {
return "queue";
} else if (uri.getPath().startsWith("topics/")) {
return "topic";
} else if (uri.getPath().startsWith("persistent/")) {
if (uri.getPath().startsWith("persistent/")) {
return "persistent";
} else if (uri.getPath().startsWith("non-persistent/")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: should we remove queue and topic here?

return "non-persistent";
} else {
throw new RestException(Status.INTERNAL_SERVER_ERROR, "domain() invoked from wrong resource");
}
Expand Down Expand Up @@ -281,4 +285,69 @@ protected ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPolic
return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateClusterOwnership(dn.getCluster());

try {
checkConnect(dn);
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(dn.getProperty());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
partitionMetadata.partitions);
}
return partitionMetadata;
}

protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
try {
return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
}
throw new RestException(e);
}
}

protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
@Override
public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
}
}).thenAccept(metadata -> {
// if the partitioned topic is not found in zk, then the topic is not partitioned
if (metadata.isPresent()) {
metadataFuture.complete(metadata.get());
} else {
metadataFuture.complete(new PartitionedTopicMetadata());
}
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
metadataFuture.completeExceptionally(e);
}
return metadataFuture;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -50,8 +49,9 @@

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -64,12 +64,12 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -1273,20 +1273,24 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @

private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<PersistentTopic> topicList = pulsar().getBrokerService()
List<Topic> topicList = pulsar().getBrokerService()
.getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange);

List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (subscription != null) {
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
subscription = PersistentReplicator.getRemoteCluster(subscription);
}
for (PersistentTopic topic : topicList) {
futures.add(topic.clearBacklog(subscription));
for (Topic topic : topicList) {
if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog(subscription));
}
}
} else {
for (PersistentTopic topic : topicList) {
futures.add(topic.clearBacklog());
for (Topic topic : topicList) {
if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog());
}
}
}

Expand All @@ -1300,14 +1304,14 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc

private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<PersistentTopic> topicList = pulsar().getBrokerService()
List<Topic> topicList = pulsar().getBrokerService()
.getAllTopicsFromNamespaceBundle(nsName.toString(), nsName.toString() + "/" + bundleRange);
List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
} else {
for (PersistentTopic topic : topicList) {
PersistentSubscription sub = topic.getPersistentSubscription(subscription);
for (Topic topic : topicList) {
Subscription sub = topic.getSubscription(subscription);
if (sub != null) {
futures.add(sub.delete());
}
Expand Down
Loading