Skip to content

Commit

Permalink
[transactions] KIP-664 Implement abortTransaction from KafkaAdmin (pr…
Browse files Browse the repository at this point in the history
…oxy) (streamnative#79)
  • Loading branch information
eolivelli authored Mar 15, 2023
1 parent c3376e5 commit c17c5b8
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,8 @@ public DescribeProducersResponseData.PartitionResponse activeProducerState() {
.setProducerId(producerStateEntry.producerId())
.setLastSequence(-1) // NOT HANDLED YET
.setProducerEpoch(producerStateEntry.producerEpoch() != null
? producerStateEntry.producerEpoch().intValue() : -1)
&& producerStateEntry.producerEpoch() >= 0
? producerStateEntry.producerEpoch().intValue() : 0)
.setLastTimestamp(producerStateEntry.lastTimestamp() != null
? producerStateEntry.lastTimestamp().longValue() : -1)
.setCoordinatorEpoch(producerStateEntry.coordinatorEpoch())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -169,6 +171,8 @@
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -2587,8 +2591,107 @@ protected void handleEndTxn(KafkaHeaderAndRequest kafkaHeaderAndRequest,

@Override
protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> resultFuture) {
resultFuture.completeExceptionally(new UnsupportedOperationException("not a proxy operation"));
CompletableFuture<AbstractResponse> response) {
WriteTxnMarkersRequest request = (WriteTxnMarkersRequest) kafkaHeaderAndRequest.getRequest();

if (request.data().markers().size() > 1 || request.data().markers().isEmpty()) {
// on the proxy according to KIP-664 we can see only a single "ABORT" transaction for one topic partition
response.completeExceptionally(
new UnsupportedOperationException("not a proxy operation (commit tx marker)"));
return;
}

boolean someCommits = request.data().markers().stream().anyMatch(t -> t.transactionResult());
if (someCommits) {
// on the proxy according to KIP-664 we can see only "ABORT" transactions from clients
response.completeExceptionally(
new UnsupportedOperationException("not a proxy operation (multiple markers)"));
return;
}

if (request.data().markers().get(0).topics().isEmpty() || request.data().markers().get(0).topics().size() > 1) {
// on the proxy according to KIP-664 we can see only a single "ABORT" transaction for one topic partition
response.completeExceptionally(
new UnsupportedOperationException("not a proxy operation (multiple topics)"));
return;
}

if (request.data().markers().get(0).topics().get(0).partitionIndexes().size() > 1) {
// on the proxy according to KIP-664 we can see only a single "ABORT" transaction for one topic partition
response.completeExceptionally(
new UnsupportedOperationException("not a proxy operation (multiple partitions)"));
return;
}

this.<WriteTxnMarkersRequest, WriteTxnMarkersRequestData, WriteTxnMarkersResponse>
sendRequestToAllTopicOwners(kafkaHeaderAndRequest, response,
(WriteTxnMarkersRequestData data) -> {

// as we have only 1 TopicPartition this code maybe simplified

List<TopicPartition> topics = new ArrayList<>();
data.markers().forEach(tm-> {
tm.topics().forEach(t -> {
t.partitionIndexes().forEach(index -> {
topics.add(new TopicPartition(t.name(), index));
});
});
});
return topics;
},
WriteTxnMarkersRequest.class,
WriteTxnMarkersRequestData.class,
(WriteTxnMarkersRequest describeProducersRequest, List<TopicPartition> keys) -> {

// as we have only 1 TopicPartition this code maybe simplified

// keep only the Markers for the partitions owner by one broker
WriteTxnMarkersRequestData data = new WriteTxnMarkersRequestData()
.setMarkers(new ArrayList<>());

// one marker may go to multiple partitions owned by different brokers, so we need to split it
describeProducersRequest.data()
.markers().forEach((WriteTxnMarkersRequestData.WritableTxnMarker tm) -> {
WriteTxnMarkersRequestData.WritableTxnMarker singlePartitionMarker =
new WriteTxnMarkersRequestData.WritableTxnMarker()
.setCoordinatorEpoch(tm.coordinatorEpoch())
.setTopics(new ArrayList<>())
.setProducerId(tm.producerId())
.setTransactionResult(tm.transactionResult())
.setProducerEpoch(tm.producerEpoch());
tm.topics().forEach((WriteTxnMarkersRequestData.WritableTxnMarkerTopic t) -> {
WriteTxnMarkersRequestData.WritableTxnMarkerTopic topicCopy =
new WriteTxnMarkersRequestData.WritableTxnMarkerTopic()
.setName(t.name())
.setPartitionIndexes(new ArrayList<>());
// keep only the partitions owned by the broker
t.partitionIndexes().forEach(index -> {
if (keys.contains(new TopicPartition(t.name(), index))) {
topicCopy.partitionIndexes().add(index);
}
});
// add the topic only if it has partitions
if (!topicCopy.partitionIndexes().isEmpty()) {
singlePartitionMarker.topics().add(topicCopy);
}
});
// add the marker only if it has topics
if (!singlePartitionMarker.topics().isEmpty()) {
data.markers().add(singlePartitionMarker);
}
});
return new WriteTxnMarkersRequest.Builder(data).build(describeProducersRequest.version());
},
(allResponses) -> {
// merging is simple, as we expect only 1 topic partition
WriteTxnMarkersResponseData responseData = new WriteTxnMarkersResponseData();
responseData.setMarkers(allResponses
.stream()
.flatMap(d->d.data().markers().stream())
.collect(Collectors.toList()));
return new WriteTxnMarkersResponse(responseData);
},
null);
}

private ConnectionToBroker grabConnectionToBroker(String brokerHost, int brokerPort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.function.Function;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
Expand Down Expand Up @@ -783,7 +784,7 @@ public void basicRecoveryAfterDeleteCreateTopic()
// use Kafka API, this way we assign a topic UUID
@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1)));
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1))).all().get();

KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId, 1000);

Expand Down Expand Up @@ -823,7 +824,7 @@ public void basicRecoveryAfterDeleteCreateTopic()
// the PH is notified of the deletion using TopicEventListener

// create the topic again, using the kafka APIs
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1)));
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 4, (short) 1))).all().get();

// the snapshot now points to a offset that doesn't make sense in the new topic
// because the new topic is empty
Expand Down Expand Up @@ -1369,7 +1370,7 @@ public void testSnapshotEventuallyTaken() throws Exception {

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1)));
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1))).all().get();

// no snapshot initially
assertNull(protocolHandler.getTransactionCoordinator(tenant)
Expand Down Expand Up @@ -1436,7 +1437,7 @@ public void testAbortedTxEventuallyPurged() throws Exception {

@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1)));
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1))).all().get();

final KafkaProducer<Integer, String> producer1 = buildTransactionProducer(transactionalId);

Expand Down Expand Up @@ -1662,6 +1663,7 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
assertEquals(0, transactionDescription.topicPartitions().size());
break;
case ONGOING:
case PREPARE_ABORT:
assertTrue(transactionDescription.transactionStartTimeMs().orElseThrow() > 0);
assertEquals(1, transactionDescription.topicPartitions().size());
break;
Expand All @@ -1683,6 +1685,7 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
assertEquals(0, topicPartitionPartitionProducerStateMap.size());
break;
case ONGOING:
case PREPARE_ABORT:
assertEquals(1, topicPartitionPartitionProducerStateMap.size());
TopicPartition tp = transactionDescription.topicPartitions().iterator().next();
DescribeProducersResult.PartitionProducerState partitionProducerState =
Expand All @@ -1702,6 +1705,58 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa

}

@Test(timeOut = 1000 * 30)
public void testAbortTransactinsFromAdmin() throws Exception {

String topicName = "testAbortTransactinsFromAdmin";
String transactionalId = "myProducer_" + UUID.randomUUID();

@Cleanup
KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId);
@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1)))
.all().get();

producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();

// the transaction is in ONGOING state
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING,
(stateOnBroker, stateOnCoodinator) -> {
});

TopicPartition topicPartition = new TopicPartition(topicName, 0);

DescribeProducersResult.PartitionProducerState partitionProducerState =
kafkaAdmin.describeProducers(Collections.singletonList(topicPartition))
.partitionResult(topicPartition).get();
ProducerState producerState = partitionProducerState.activeProducers().get(0);

// we send the ABORT transaction marker to the broker
kafkaAdmin.abortTransaction(new AbortTransactionSpec(topicPartition,
producerState.producerId(),
(short) producerState.producerEpoch(),
producerState.coordinatorEpoch().orElse(-1))).all().get();

// the coordinator isn't aware of the operation sent to the brokers
// so it allows to abort the transaction
producer.commitTransaction();

producer.close();

// the transaction is eventually committed
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT,
(stateOnBroker, stateOnCoodinator) -> {
});
});
}

/**
* Get the Kafka server address.
*/
Expand Down

0 comments on commit c17c5b8

Please sign in to comment.