Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix transaction buffer client handle endTxn op when topic or sub have been deleted. #11304

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -160,4 +161,14 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
void shutdown() throws InterruptedException, ManagedLedgerException;

/**
* Check managed ledger store has been initialized before.
*
* @param name {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> checkManagedLedgerInitializedBefore(String name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<Boolean> checkManagedLedgerInitializedBefore(String name);
CompletableFuture<Boolean> asyncExists(String name);


}
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
entryCacheManager.clear();
}

@Override
public CompletableFuture<Boolean> checkManagedLedgerInitializedBefore(String name) {
return store.exists(name);
}

@Override
public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException {
class Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
Expand Down Expand Up @@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @throws MetaStoreException
*/
Iterable<String> getManagedLedgers() throws MetaStoreException;

/**
* Check path exists.
*
* @param path {@link String}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> exists(String path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<Boolean> exists(String path);
CompletableFuture<Boolean> asyncExists(String ledgerName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -229,6 +230,11 @@ public Iterable<String> getManagedLedgers() throws MetaStoreException {
}
}

@Override
public CompletableFuture<Boolean> exists(String path) {
return store.exists(PREFIX + path);
}

//
// update timestamp if missing or 0
// 3 cases - timestamp does not exist for ledgers serialized before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,43 +1925,68 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();

if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic,
txnID, txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, t) -> {
if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
"Topic " + topic + " is not found."));
return;
}
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark())
CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], "
+ "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage()));
throwable.getMessage(),
txnID.getLeastSigBits(), txnID.getMostSigBits()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});
});
} else {
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}

} else {
getBrokerService().getManagedLedgerFactory()
.checkManagedLedgerInitializedBefore(TopicName.get(topic).getPersistenceNamingEncoding())
.thenAccept((b) -> {
if (b) {
log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic,
txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a typo, what about: "The topic " + topic + " does not exist in broker."

txnID.getMostSigBits(), txnID.getLeastSigBits()));
} else {
log.warn("handleEndTxnOnPartition fail ! The topic {} have not been created, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "has not been created"

+ "txnId: [{}], txnAction: [{}]",
topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {} , "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
});
}
}).exceptionally(e -> {
log.error("handleEndTxnOnPartition fail ! topic {} , "
+ "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
return null;
});
}

@Override
Expand All @@ -1972,70 +1997,82 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
final String topic = command.getSubscription().getTopic();
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getValue();
final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();

if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic,
log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}

CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.thenAccept(optionalTopic -> {

if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
return;
}

CompletableFuture<Optional<Topic>> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString());
topicFuture.thenAccept(optionalTopic -> {
if (optionalTopic.isPresent()) {
Subscription subscription = optionalTopic.get().getSubscription(subName);
if (subscription == null) {
log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Topic " + optionalTopic.get().getName()
+ " subscription " + subName + " is not exist."));
log.warn("handleEndTxnOnSubscription fail! "
+ "topic {} subscription {} is not exist. txnId: [{}], txnAction: [{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"is not exist" -> "does not exist"

optionalTopic.get().getName(), subName, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
return;
}

CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction,
command.getTxnidLeastBitsOfLowWatermark());
completableFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle end txn on subscription failed for request {}", requestId, throwable);
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
completableFuture.whenComplete((ignored, e) -> {
if (e != null) {
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(throwable),
BrokerServiceException.getClientErrorCode(e),
"Handle end txn on subscription failed."));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
}).exceptionally(e -> {
log.error("Handle end txn on subscription failed for request {}", requestId, e);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed."));
return null;
});
} else {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
} else {
getBrokerService().getManagedLedgerFactory()
.checkManagedLedgerInitializedBefore(TopicName.get(topic).getPersistenceNamingEncoding())
.thenAccept((b) -> {
if (b) {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, "
+ "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
} else {
log.warn("handleEndTxnOnSubscription fail ! The topic {} have not been created, "
+ "subscription: {} txnId: [{}], txnAction: [{}]",
topic, subName, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady, e.getMessage()));
return null;
});
}
}).exceptionally(e -> {
log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}"
+ "txnId: [{}], txnAction: [{}]", topic, subName,
txnID, TxnAction.valueOf(txnAction), e.getCause());
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
"Handle end txn on subscription failed."));
return null;
});
}

private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
Expand Down
Loading