Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[transactions] KIP-664 Implement abortTransaction from KafkaAdmin (pr…
Browse files Browse the repository at this point in the history
…oxy) (#79)

(cherry picked from commit c17c5b8)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 30, 2023
1 parent d60d786 commit 0ec6d7f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,8 @@ public DescribeProducersResponseData.PartitionResponse activeProducerState() {
.setProducerId(producerStateEntry.producerId())
.setLastSequence(-1) // NOT HANDLED YET
.setProducerEpoch(producerStateEntry.producerEpoch() != null
? producerStateEntry.producerEpoch().intValue() : -1)
&& producerStateEntry.producerEpoch() >= 0
? producerStateEntry.producerEpoch().intValue() : 0)
.setLastTimestamp(producerStateEntry.lastTimestamp() != null
? producerStateEntry.lastTimestamp().longValue() : -1)
.setCoordinatorEpoch(producerStateEntry.coordinatorEpoch())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
Expand Down Expand Up @@ -1477,6 +1478,7 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
assertEquals(0, transactionDescription.topicPartitions().size());
break;
case ONGOING:
case PREPARE_ABORT:
assertTrue(transactionDescription.transactionStartTimeMs().orElseThrow() > 0);
assertEquals(1, transactionDescription.topicPartitions().size());
break;
Expand All @@ -1498,6 +1500,7 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
assertEquals(0, topicPartitionPartitionProducerStateMap.size());
break;
case ONGOING:
case PREPARE_ABORT:
assertEquals(1, topicPartitionPartitionProducerStateMap.size());
TopicPartition tp = transactionDescription.topicPartitions().iterator().next();
DescribeProducersResult.PartitionProducerState partitionProducerState =
Expand All @@ -1517,6 +1520,58 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa

}

@Test(timeOut = 1000 * 30)
public void testAbortTransactinsFromAdmin() throws Exception {

String topicName = "testAbortTransactinsFromAdmin";
String transactionalId = "myProducer_" + UUID.randomUUID();

@Cleanup
KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId);
@Cleanup
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
kafkaAdmin.createTopics(Arrays.asList(new NewTopic(topicName, 1, (short) 1)))
.all().get();

producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
producer.flush();

// the transaction is in ONGOING state
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.ONGOING,
(stateOnBroker, stateOnCoodinator) -> {
});

TopicPartition topicPartition = new TopicPartition(topicName, 0);

DescribeProducersResult.PartitionProducerState partitionProducerState =
kafkaAdmin.describeProducers(Collections.singletonList(topicPartition))
.partitionResult(topicPartition).get();
ProducerState producerState = partitionProducerState.activeProducers().get(0);

// we send the ABORT transaction marker to the broker
kafkaAdmin.abortTransaction(new AbortTransactionSpec(topicPartition,
producerState.producerId(),
(short) producerState.producerEpoch(),
producerState.coordinatorEpoch().orElse(-1))).all().get();

// the coordinator isn't aware of the operation sent to the brokers
// so it allows to abort the transaction
producer.commitTransaction();

producer.close();

// the transaction is eventually committed
Awaitility.await().untilAsserted(() -> {
assertTransactionState(kafkaAdmin, transactionalId,
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT,
(stateOnBroker, stateOnCoodinator) -> {
});
});
}

/**
* Get the Kafka server address.
*/
Expand Down

0 comments on commit 0ec6d7f

Please sign in to comment.