Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using newer client versions in Ducktape tests #24523

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
5 changes: 3 additions & 2 deletions tests/docker/ducktape-deps/kafka-tools
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env bash
set -e
for ver in "2.3.1" "2.4.1" "2.5.0" "2.7.0" "3.0.0" "3.7.0"; do
for ver in "2.3.1" "2.4.1" "2.5.0" "2.7.0" "3.0.0" "3.7.0" "3.8.0" "3.9.0"; do

mkdir -p "/opt/kafka-${ver}"
chmod a+rw "/opt/kafka-${ver}"
curl "$KAFKA_MIRROR/kafka_2.12-${ver}.tgz" | tar xz --strip-components=1 -C "/opt/kafka-${ver}"
done
ln -s /opt/kafka-3.0.0/ /opt/kafka-dev
ln -s /opt/kafka-3.9.0/ /opt/kafka-dev

set -e
git -C /opt clone --depth 1 --branch 0.14.0-example-producer-args https://github.com/redpanda-data/strimzi-kafka-oauth.git
2 changes: 1 addition & 1 deletion tests/docker/ducktape-deps/librdkafka
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -e
mkdir /opt/librdkafka
curl -SL "https://github.com/confluentinc/librdkafka/archive/refs/tags/v2.2.0.tar.gz" | tar -xz --strip-components=1 -C /opt/librdkafka
curl -SL "https://github.com/confluentinc/librdkafka/archive/refs/tags/v2.6.1.tar.gz" | tar -xz --strip-components=1 -C /opt/librdkafka
cd /opt/librdkafka
./configure
make -j$(nproc)
10 changes: 8 additions & 2 deletions tests/rptest/clients/kafka_cli_tools.py
Original file line number Diff line number Diff line change
@@ -65,7 +65,8 @@ class KafkaCliTools:
"""

# See tests/docker/Dockerfile to add new versions
VERSIONS = ("3.0.0", "2.7.0", "2.5.0", "2.4.1", "2.3.1")
VERSIONS = ("3.9.0", "3.8.0", "3.7.0", "3.0.0", "2.7.0", "2.5.0", "2.4.1",
"2.3.1")

def __init__(self,
redpanda: RedpandaServiceForClients,
@@ -328,7 +329,8 @@ def produce(self,
acks: int = -1,
throughput: int = -1,
batch_size: int = 81960,
linger_ms: int = 0):
linger_ms: int = 0,
enable_idempotence: bool = True):
self._redpanda.logger.debug("Producing to topic: %s", topic)
cmd = [self._script("kafka-producer-perf-test.sh")]
cmd += ["--topic", topic]
@@ -343,6 +345,8 @@ def produce(self,
"bootstrap.servers=%s" % self._redpanda.brokers(),
"linger.ms=%d" % linger_ms,
]
if enable_idempotence is False:
cmd += ["enable.idempotence=false"]
if self._command_config:
cmd += ["--producer.config", self._command_config.name]
return self._execute(cmd, "produce")
@@ -386,6 +390,8 @@ def delete_records(self, offsets: dict[str, Any]):
assert split_str[-1] == ""
partition_watermark_lines = split_str[2:-1]
for partition_watermark_line in partition_watermark_lines:
if not partition_watermark_line.startswith("partition: "):
continue
topic_partition_str, result_str = partition_watermark_line.strip(
).split('\t')
topic_partition_str_split = topic_partition_str.split(
13 changes: 8 additions & 5 deletions tests/rptest/tests/archival_test.py
Original file line number Diff line number Diff line change
@@ -45,6 +45,8 @@
LOG_EXTENSION = ".log"

CONTROLLER_LOG_PREFIX = os.path.join(RedpandaService.DATA_DIR, "redpanda")
INTERNAL_TOPIC_PREFIX = os.path.join(RedpandaService.DATA_DIR,
"kafka_internal")

# Log errors expected when connectivity between redpanda and the S3
# backend is disrupted
@@ -222,7 +224,7 @@ def __init__(self, test_context):

self._s3_port = self.si_settings.cloud_storage_api_endpoint_port

self.kafka_tools = KafkaCliTools(self.redpanda)
self.kafka_tools = KafkaCliTools(self.redpanda, version="3.9.0")
self.rpk = RpkTool(self.redpanda)
self.admin = Admin(self.redpanda)

@@ -375,7 +377,7 @@ def test_single_partition_leadership_transfer(self, cloud_storage_type):
self.redpanda.start_node(node)
time.sleep(5)
self.kafka_tools.produce(self.topic, 5000, 1024)
validate(self._cross_node_verify, self.logger, 90)
validate(self._cross_node_verify, self.logger, 120)

@cluster(num_nodes=3)
@matrix(cloud_storage_type=get_cloud_storage_type())
@@ -392,7 +394,7 @@ def test_all_partitions_leadership_transfer(self, cloud_storage_type):
self.redpanda.start_node(node)
time.sleep(5)
self.kafka_tools.produce(self.topic, 5000, 1024)
validate(self._cross_node_verify, self.logger, 90)
validate(self._cross_node_verify, self.logger, 120)

@cluster(num_nodes=3)
@matrix(acks=[-1, 0, 1], cloud_storage_type=get_cloud_storage_type())
@@ -803,8 +805,9 @@ def _get_redpanda_log_segment_checksums(self, node):

# Filter out all unwanted paths
def included(path):
return not path.startswith(
CONTROLLER_LOG_PREFIX) and path.endswith(LOG_EXTENSION)
return not (path.startswith(CONTROLLER_LOG_PREFIX)
or path.startswith(INTERNAL_TOPIC_PREFIX)
) and path.endswith(LOG_EXTENSION)

# Remove data dir from path
def normalize_path(path):
9 changes: 1 addition & 8 deletions tests/rptest/tests/fetch_after_deletion_test.py
Original file line number Diff line number Diff line change
@@ -38,18 +38,11 @@ def setUp(self):
pass

@cluster(num_nodes=3)
@parametrize(transactions_enabled=True)
@parametrize(transactions_enabled=False)
def test_fetch_after_committed_offset_was_removed(self,
transactions_enabled):
def test_fetch_after_committed_offset_was_removed(self):
"""
Test fetching when consumer offset was deleted by retention
"""

self.redpanda._extra_rp_conf[
"enable_transactions"] = transactions_enabled
self.redpanda._extra_rp_conf[
"enable_idempotence"] = transactions_enabled
self.redpanda.start()

topic = TopicSpec(partition_count=1,
3 changes: 2 additions & 1 deletion tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
@@ -378,7 +378,8 @@ def test_target_min_capacity_wanted_time_based(self):
1024,
throughput=500,
acks=-1,
linger_ms=50)
linger_ms=50,
enable_idempotence=False)

node = self.redpanda.nodes[0]
reported = admin.get_local_storage_usage(
2 changes: 2 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
@@ -3957,6 +3957,8 @@ def test_versions(self):
result = self.sr_client.get_versions(test_subject)
assert result == [1], f"Result: {result}"

# reinitialize client to drop the cache
self.sr_client = SchemaRegistryClient({'url': self._base_uri()})
with expect_exception(SchemaRegistryError, lambda e: True):
self.sr_client.get_version(test_subject, 2)

5 changes: 4 additions & 1 deletion tests/rptest/tests/topic_delete_test.py
Original file line number Diff line number Diff line change
@@ -86,7 +86,10 @@ def get_kvstore_topic_key_counts(redpanda):
if k['data'].get('ntp', {}).get('topic', None) == 'controller':
# Controller storage item
continue

if k['data'].get('ntp', {}).get('namespace',
None) == 'kafka_internal':
# Internal topic storage item
continue
excess_keys.append(k)

redpanda.logger.info(
5 changes: 4 additions & 1 deletion tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
@@ -205,7 +205,10 @@ def watermark_is_present(self):
err_msg=
f'failed to get high watermark before produce for {topic_spec}')

self._kafka_tools.produce(topic_spec.name, 10000, 1024)
self._kafka_tools.produce(topic_spec.name,
10000,
1024,
enable_idempotence=False)

new_state = PartitionState(self._rpk, topic_spec.name)
wait_until(
2 changes: 1 addition & 1 deletion tests/setup.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
'prometheus-client==0.9.0',
'kafka-python==2.0.2',
'crc32c==2.2',
'confluent-kafka==2.2.0',
'confluent-kafka==2.6.1',
'zstandard==0.15.2',
'xxhash==2.0.2',
'protobuf==4.21.8',
Loading