Skip to content

Commit

Permalink
tx: reject producers with timeouts larger than transaction_max_timeou…
Browse files Browse the repository at this point in the history
…t_ms

error code is inline with Apache Kafka return code
invalid_transaction_timeout(50)
  • Loading branch information
bharathv committed Jul 18, 2024
1 parent 2d7f722 commit a9cdabb
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/v/cluster/tx_errc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ std::ostream& operator<<(std::ostream& o, errc err) {
return o << "tx::errc::partition_disabled";
case errc::concurrent_transactions:
return o << "tx::errc::concurrent_transactions";
case errc::invalid_timeout:
return o << "tx::errc::invalid_timeout";
}
return o;
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/tx_errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ enum class errc {
tx_id_not_found,
partition_disabled,
concurrent_transactions,
// invalid timeout requested by the client.
invalid_timeout,
};

std::ostream& operator<<(std::ostream& o, errc err);
Expand Down
13 changes: 13 additions & 0 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,19 @@ ss::future<cluster::init_tm_tx_reply> tx_gateway_frontend::init_tm_tx_locally(
tx_id,
transaction_timeout_ms);

if (unlikely(
transaction_timeout_ms
> config::shard_local_cfg().transaction_max_timeout_ms())) {
vlog(
txlog.warn,
"[tx_id={}] Transactional timeout requested {}ms exceeds configured "
"maximum timeout {}ms",
tx_id,
transaction_timeout_ms,
config::shard_local_cfg().transaction_max_timeout_ms());
co_return init_tm_tx_reply{tx::errc::invalid_timeout};
}

model::ntp tx_ntp(model::tx_manager_nt.ns, model::tx_manager_nt.tp, tm);
auto shard = _shard_table.local().shard_for(tx_ntp);

Expand Down
2 changes: 2 additions & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ constexpr error_code map_tx_errc(cluster::tx::errc ec) {
case cluster::tx::errc::request_rejected:
case cluster::tx::errc::unknown_server_error:
return error_code::unknown_server_error;
case cluster::tx::errc::invalid_timeout:
return error_code::invalid_transaction_timeout;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public static interface StringAction {
}

final static int RETRY_TIMEOUT_SEC = 5;
final static int MAX_TRANSACTION_TIMEOUT_MS = 900000;
final static String txId1 = "tx1";
final static String txId2 = "tx2";
final static String topic1 = "topic1";
Expand Down Expand Up @@ -407,7 +408,7 @@ static void readCommittedSeekRespectsLongHangingTx(String connection)
TxProducer producer = null;
TxConsumer consumer = null;
try {
producer = new TxProducer(connection, txId1, Integer.MAX_VALUE);
producer = new TxProducer(connection, txId1, MAX_TRANSACTION_TIMEOUT_MS);
producer.initTransactions();
producer.beginTransaction();
long offset = producer.send(topic1, "key1", "value1");
Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/transactions/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def transaction_id_expiration_test(self):
producer = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
'transaction.timeout.ms': 3600000, # to avoid timing out
'transaction.timeout.ms': 900000, # to avoid timing out
})
producer.init_transactions()
producer.begin_transaction()
Expand Down Expand Up @@ -574,7 +574,7 @@ def graceful_leadership_transfer_tx_coordinator_test(self):
ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': str(i),
'transaction.timeout.ms': 1000000,
'transaction.timeout.ms': 900000,
}) for i in range(0, p_count)
]

Expand Down
4 changes: 2 additions & 2 deletions tests/rptest/transactions/tx_admin_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ def test_mark_transaction_expired(self):
producer1 = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '0',
'transaction.timeout.ms': 3600000, # avoid auto timeout
'transaction.timeout.ms': 900000, # avoid auto timeout
})
producer2 = ck.Producer({
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': '1',
'transaction.timeout.ms': 3600000, # avoid auto timeout
'transaction.timeout.ms': 900000, # avoid auto timeout
})
producer1.init_transactions()
producer2.init_transactions()
Expand Down

0 comments on commit a9cdabb

Please sign in to comment.