Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -160,4 +161,14 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void shutdown() throws InterruptedException, ManagedLedgerException;

/**
* Check managed ledger has been initialized before.
*
* @param ledgerName {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
entryCacheManager.clear();
}

@Override
public CompletableFuture<Boolean> asyncExists(String ledgerName) {
return store.asyncExists(ledgerName);
}

@Override
public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
class Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
Expand Down Expand Up @@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @throws MetaStoreException
*/
Iterable<String> getManagedLedgers() throws MetaStoreException;

/**
* Check ledger exists.
*
* @param ledgerName {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -257,6 +258,11 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {
}
}

@Override
public CompletableFuture<Boolean> asyncExists(String path) {
return store.exists(PREFIX + path);
}

//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1998,43 +1998,68 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();

if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic,
txnID, txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, t) -> {
if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
"Topic " + topic + " is not found."));
return;
}
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark())
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage()));
throwable.getMessage(),
txnID.getLeastSigBits(), txnID.getMostSigBits()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});
});
} else {
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}

} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
.thenAccept((b) -> {
if (b) {
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic,
txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " does not exist in broker.",
txnID.getMostSigBits(), txnID.getLeastSigBits()));
} else {
log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, "
+ "txnId: [{}], txnAction: [{}]",
topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {} , "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
});
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {} , "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
});
}

@Override
Expand All @@ -2045,70 +2070,82 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
final String topic = command.getSubscription().getTopic();
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getValue();
final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();

if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic,
log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}

CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.thenAccept(optionalTopic -> {

if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
return;
}

CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
Subscription subscription = optionalTopic.get().getSubscription(subName);
if (subscription == null) {
log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Topic " + optionalTopic.get().getName()
+ " subscription " + subName + " is not exist."));
log.warn("handleEndTxnOnSubscription fail! "
+ "topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]",
optionalTopic.get().getName(), subName, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
return;
}

CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction,
command.getTxnidLeastBitsOfLowWatermark());
completableFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle end txn on subscription failed for request {}", requestId, throwable);
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
completableFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(throwable),
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed."));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
}).exceptionally(e -> {
log.error("Handle end txn on subscription failed for request {}", requestId, e);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed."));
return null;
});
} else {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
} else {
getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
.thenAccept((b) -> {
if (b) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
+ "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " does not exist in broker."));
} else {
log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, "
+ "subscription: {} txnId: [{}], txnAction: [{}]",
topic, subName, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady, e.getMessage()));
return null;
});
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
"Handle end txn on subscription failed."));
return null;
});
}

private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
Expand Down
Loading