Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-22: Dead Letter Topic #2400

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
be74b4e
Add `redeliveryCount` and `deadLetterTopic` to CommandSubscribe in Pu…
Aug 13, 2018
3cc25d1
Add `maxRedeliveryCount` and `deadLetterTopic` to java client
Aug 13, 2018
44cf08e
Add RedeliveryTracker
Aug 14, 2018
d4ff25c
Implement of Dead Letter Topic
Aug 14, 2018
2f4957a
Fix import
Aug 14, 2018
8867373
Update maxRedeliveryCount and deadLetterTopic in PersistentDispatcher…
Aug 15, 2018
195bf0b
Fix bug:
Aug 15, 2018
5ae3c46
Add `redeliveryCount` and `deadLetterTopic` to CommandSubscribe in Pu…
Aug 13, 2018
1529fcb
Add `maxRedeliveryCount` and `deadLetterTopic` to java client
Aug 13, 2018
ec6da32
Add RedeliveryTracker
Aug 14, 2018
8e6c96a
Implement of Dead Letter Topic
Aug 14, 2018
6bb9281
Fix import
Aug 14, 2018
ccf8abb
Update maxRedeliveryCount and deadLetterTopic in PersistentDispatcher…
Aug 15, 2018
eeae4ea
Fix bug:
Aug 15, 2018
22f5e58
Fix bug of generate default dead letter topic. Add dead letter tutorial.
Aug 15, 2018
49b7cac
Merge remote-tracking branch 'origin/DLQ' into DLQ
Aug 15, 2018
74e1e8d
Fix bug of deserialize entry
Aug 15, 2018
5716023
Fix send dead letter topic with exception message can't re-use
Aug 15, 2018
05aaad2
Fix bug in tutorial of SubscriptionWithDeadLetter
Aug 15, 2018
9fe5541
Add read entry by position in ManagedCursor
Aug 16, 2018
c2503c4
Add `redeliveryCount` and `deadLetterTopic` to CommandSubscribe in Pu…
Aug 13, 2018
6bd204c
Add `maxRedeliveryCount` and `deadLetterTopic` to java client
Aug 13, 2018
f4a77f8
Add RedeliveryTracker
Aug 14, 2018
30d2bf1
Implement of Dead Letter Topic
Aug 14, 2018
f5ea1c4
Fix import
Aug 14, 2018
1107481
Update maxRedeliveryCount and deadLetterTopic in PersistentDispatcher…
Aug 15, 2018
9e895f8
Fix bug:
Aug 15, 2018
4c41659
Fix bug of generate default dead letter topic. Add dead letter tutorial.
Aug 15, 2018
3ab4834
Fix bug of deserialize entry
Aug 15, 2018
b5bee1f
Fix send dead letter topic with exception message can't re-use
Aug 15, 2018
a419677
Fix bug in tutorial of SubscriptionWithDeadLetter
Aug 15, 2018
8543ed5
Add read entry by position in ManagedCursor
Aug 16, 2018
256a9b9
Merge remote-tracking branch 'origin/DLQ' into DLQ
Aug 16, 2018
2283c9d
Fix bug in consumer subscribe with dead letter
Aug 16, 2018
c6a505b
Add test for subscribe info changed(maxRedeliverCount, deadLetterTopic)
Aug 16, 2018
7d578a0
enable module pulsar-sql
Aug 16, 2018
e141968
Optimization send messages to dead letter topic.
Aug 17, 2018
c714af0
Support set maxUnackedMessagePerConsumer by client.
Aug 17, 2018
9cd60c4
add UT for dead letter topic.
Aug 20, 2018
1882707
Fix UT with error didn't finish within the time-out 120000
Aug 20, 2018
d1fe915
Fix bug of getRedeliveryCount in NonPersistentRedeliveryTracker.
Aug 21, 2018
dc31b2f
Fix block client when any messages should send to dead letter topic.
Aug 22, 2018
b1b1378
Add comments to Quorum.
Aug 22, 2018
4b2c3a9
Add some assert and logs.
Aug 24, 2018
2f1028e
Change messagesToDeadLetter store ledgerId and entryId as primitive v…
Aug 24, 2018
2c46fc3
Delete unused import.
Aug 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.impl.PositionImpl;

/**
* A ManangedCursor is a persisted cursor inside a ManagedLedger.
Expand Down Expand Up @@ -75,6 +78,16 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Return entry at the position.
*/
Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can already be done through replayEntries() and asyncReplayEntries() methods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When i use asyncReplayEntries(), i should change ConcurrentLongPairSet to Set then call asyncReplayEntries() and in asyncReplayEntries() implement call ledger.asyncReadEntry() foreach. So i think i need a readEntry() method to read entry that no need to change ConcurrentLongPairSet to Set.


/**
* Return entry at the position async.
*/
void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx);

/**
* Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -385,6 +387,27 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

@Override
public Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException {
final CompletableFuture<Entry> readFuture = new CompletableFuture<>();
ledger.asyncReadEntry(position, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
readFuture.complete(entry);
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
readFuture.completeExceptionally(exception);
}
}, null);
return readFuture.get();
}

@Override
public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
ledger.asyncReadEntry(position, callback, ctx);
}

@Override
public List<Entry> readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException {
checkArgument(numberOfEntriesToRead > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down Expand Up @@ -60,6 +62,17 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException {
return null;
}


@Override
public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {

}

@Override
public boolean isDurable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,9 +867,8 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
if (topic.getSubscriptions().containsKey(subscriptionName)) {
throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
}

PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest).get();
.createSubscription(subscriptionName, InitialPosition.Latest, 0, null).get();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, messageId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.bookkeeper.mledger.Position;

import java.util.List;

public interface RedeliveryTracker {

int incrementAndGetRedeliveryCount(Position position);

int getRedeliveryCount(Position position);

void remove(Position position);

void removeBatch(List<Position> positions);

void clear();

}
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final int maxRedeliveryCount = subscribe.getMaxRedeliveryCount();
final String deadLetterTopic = subscribe.getDeadLetterTopic();
final int maxUnackedMessagesPerConsumer = subscribe.getMaxUnackedMessagePerConsumer();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;

CompletableFuture<Boolean> isProxyAuthorizedFuture;
Expand Down Expand Up @@ -591,7 +594,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (isCompatible) {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition);
startMessageId, metadata, readCompacted, initialPosition,
0, null, maxUnackedMessagesPerConsumer);
} else {
return FutureUtil.failedFuture(new BrokerServiceException(
"Trying to subscribe with incompatible schema"
Expand All @@ -601,7 +605,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition);
startMessageId, metadata, readCompacted, initialPosition,
maxRedeliveryCount, deadLetterTopic, maxUnackedMessagesPerConsumer);
}
})
.thenAccept(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ default long getOriginalSequenceId() {

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
int maxRedeliveryCount, String deadLetterTopic, int maxUnackedMessagesPerConsumer);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
int maxRedeliveryCount, String deadLetterTopic);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.RedeliveryTracker;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class NonPersistentRedeliveryTracker implements RedeliveryTracker {

private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new ConcurrentHashMap<>(16);

@Override
public int incrementAndGetRedeliveryCount(Position position) {
trackerCache.putIfAbsent(position, new AtomicInteger(0));
return trackerCache.get(position).incrementAndGet();
}

@Override
public int getRedeliveryCount(Position position) {
trackerCache.putIfAbsent(position, new AtomicInteger(0));
return trackerCache.get(position).get();
}

@Override
public void remove(Position position) {
trackerCache.remove(position);
}

@Override
public void removeBatch(List<Position> positions) {
if (positions != null) {
positions.forEach(this::remove);
}
}

@Override
public void clear() {
trackerCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ public void removeProducer(Producer producer) {
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition, int maxRedeliveryCount,
String deadLetterTopic, int maxUnackedMessagesPerConsumer) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -388,7 +389,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
int maxRedeliveryCount, String deadLetterTopic) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription {

public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
String subscriptionName, ManagedCursor cursor) {
super(topic, subscriptionName, cursor);
super(topic, subscriptionName, cursor, 0, null);
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;

Expand Down
Loading