Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker side deduplication #751

Merged
merged 3 commits into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ brokerDeleteInactiveTopicsFrequencySeconds=60
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360

# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

Expand Down Expand Up @@ -103,11 +122,11 @@ maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down
50 changes: 35 additions & 15 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,33 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# How frequently to proactively check and purge expired messages
# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
brokerDeduplicationEnabled=false

# Maximum number of producer information that it's going to be
# persisted for deduplication purposes
brokerDeduplicationMaxNumberOfProducers=10000

# Number of entries after which a dedup info snapshot is taken.
# A bigger interval will lead to less snapshots being taken though it would
# increase the topic recovery time, when the entries published after the
# snapshot need to be replayed
brokerDeduplicationEntriesInterval=1000

# Time of inactivity after which the broker will discard the deduplication information
# relative to a disconnected producer. Default is 6 hours.
brokerDeduplicationProducerInactivityTimeoutMinutes=360


# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false

# Allow client libraries with no version information
# Allow client libraries with no version information
clientLibraryVersionCheckAllowUnversioned=true

# Path for the file used to determine the rotation status for the broker when responding
Expand All @@ -92,11 +112,11 @@ maxUnackedMessagesPerBroker=0

# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default bytes per second dispatch throttling-limit for every topic. Using a value of 0, is disabling
Expand Down Expand Up @@ -158,7 +178,7 @@ bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters=

# Timeout for BK add / read operations
# Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30

# Speculative reads are initiated if a read request doesn't complete within a certain time
Expand All @@ -174,11 +194,11 @@ bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800

# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
# forming a new bookie ensemble
# forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true

# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=

### --- Managed Ledger --- ###
Expand All @@ -194,7 +214,7 @@ managedLedgerDefaultAckQuorum=1

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running in the same broker
# running in the same broker
managedLedgerCacheSizeMB=1024

# Threshold to which bring down the cache level when eviction is triggered
Expand All @@ -206,7 +226,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1
# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
# * Either the max rollover time has been reached
# * or max entries have been written to the ledged and at least min-time
# * or max entries have been written to the ledged and at least min-time
# has passed
managedLedgerMaxEntriesPerLedger=50000

Expand All @@ -224,7 +244,7 @@ managedLedgerCursorRolloverTimeInSeconds=14400



### --- Load balancer --- ###
### --- Load balancer --- ###

# Enable load balancer
loadBalancerEnabled=false
Expand All @@ -242,13 +262,13 @@ loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1

# Load shedding interval. Broker periodically checks whether some traffic should be offload from
# some over-loaded broker to other under-loaded brokers
# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=30

# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30

# Usage threshold to determine a broker as under-loaded
# Usage threshold to determine a broker as under-loaded
loadBalancerBrokerUnderloadedThresholdPercentage=1

# Usage threshold to determine a broker as over-loaded
Expand Down Expand Up @@ -285,7 +305,7 @@ replicationMetricsEnabled=true

# Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency
# links.
# links.
replicationConnectionsPerBroker=16

# Replicator producer queue size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long brokerDeleteInactiveTopicsFrequencySeconds = 60;
// How frequently to proactively check and purge expired messages
private int messageExpiryCheckIntervalInMinutes = 5;

// Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject
// messages that were already stored in the topic
private boolean brokerDeduplicationEnabled = false;

// Maximum number of producer information that it's going to be
// persisted for deduplication purposes
private int brokerDeduplicationMaxNumberOfProducers = 10000;

// Number of entries after which a dedup info snapshot is taken.
// A bigger interval will lead to less snapshots being taken though it would
// increase the topic recovery time, when the entries published after the
// snapshot need to be replayed
private int brokerDeduplicationEntriesInterval = 1000;

// Time of inactivity after which the broker will discard the deduplication information
// relative to a disconnected producer. Default is 6 hours.
private int brokerDeduplicationProducerInactivityTimeoutMinutes = 360;

// Enable check for minimum allowed client library version
private boolean clientLibraryVersionCheckEnabled = false;
// Allow client libraries with no version information
Expand Down Expand Up @@ -105,7 +125,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
// limit/2 messages
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
// Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
// message dispatch-throttling
// message dispatch-throttling
@FieldContext(dynamic = true)
private int dispatchThrottlingRatePerTopicInMsg = 0;
// Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
Expand All @@ -120,10 +140,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int maxConcurrentTopicLoadRequest = 5000;
// Max concurrent non-persistent message can be processed per connection
private int maxConcurrentNonPersistentMessagePerConnection = 1000;
// Number of worker threads to serve non-persistent topic
// Number of worker threads to serve non-persistent topic
private int numWorkerThreadsForNonPersistentTopic = 8;

// Enable broker to load persistent topics
private boolean enablePersistentTopics = true;

// Enable broker to load non-persistent topics
private boolean enableNonPersistentTopics = true;

Expand Down Expand Up @@ -433,6 +455,31 @@ public void setBrokerDeleteInactiveTopicsEnabled(boolean brokerDeleteInactiveTop
this.brokerDeleteInactiveTopicsEnabled = brokerDeleteInactiveTopicsEnabled;
}

public int getBrokerDeduplicationMaxNumberOfProducers() {
return brokerDeduplicationMaxNumberOfProducers;
}

public void setBrokerDeduplicationMaxNumberOfProducers(int brokerDeduplicationMaxNumberOfProducers) {
this.brokerDeduplicationMaxNumberOfProducers = brokerDeduplicationMaxNumberOfProducers;
}

public int getBrokerDeduplicationEntriesInterval() {
return brokerDeduplicationEntriesInterval;
}

public void setBrokerDeduplicationEntriesInterval(int brokerDeduplicationEntriesInterval) {
this.brokerDeduplicationEntriesInterval = brokerDeduplicationEntriesInterval;
}

public int getBrokerDeduplicationProducerInactivityTimeoutMinutes() {
return brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public void setBrokerDeduplicationProducerInactivityTimeoutMinutes(
int brokerDeduplicationProducerInactivityTimeoutMinutes) {
this.brokerDeduplicationProducerInactivityTimeoutMinutes = brokerDeduplicationProducerInactivityTimeoutMinutes;
}

public long getBrokerDeleteInactiveTopicsFrequencySeconds() {
return brokerDeleteInactiveTopicsFrequencySeconds;
}
Expand All @@ -449,6 +496,14 @@ public void setMessageExpiryCheckIntervalInMinutes(int messageExpiryCheckInterva
this.messageExpiryCheckIntervalInMinutes = messageExpiryCheckIntervalInMinutes;
}

public boolean isBrokerDeduplicationEnabled() {
return brokerDeduplicationEnabled;
}

public void setBrokerDeduplicationEnabled(boolean brokerDeduplicationEnabled) {
this.brokerDeduplicationEnabled = brokerDeduplicationEnabled;
}

public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,49 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path
}
}

@POST
@Path("/{property}/{cluster}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean enableDeduplication) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

NamespaceName nsName = new NamespaceName(property, cluster, namespace);
Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
policiesNode.getKey().deduplicationEnabled = enableDeduplication;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, property, cluster, namespace),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));

log.info("[{}] Successfully {} on namespace {}/{}/{}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", property, cluster, namespace);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to modify deplication status for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to modify deplication status on namespace {}/{}/{} expected policy node version={} : concurrent modification",
clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to modify deplication status on namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

@GET
@Path("/{property}/{cluster}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
Expand Down
Loading