Skip to content

Commit

Permalink
Merge pull request #24406 from bharathv/cross_core_test
Browse files Browse the repository at this point in the history
datalake/tests: add a verifier test with cross core partition movements
  • Loading branch information
bharathv authored Dec 9, 2024
2 parents 247c698 + 92e5aa5 commit 25163d2
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 46 deletions.
35 changes: 23 additions & 12 deletions tests/rptest/services/redpanda_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import requests
from rptest.services.redpanda import RedpandaService
from ducktape.utils.util import wait_until
from prometheus_client.parser import text_string_to_metric_families

from ducktape.cluster.cluster import ClusterNode

Expand Down Expand Up @@ -119,25 +120,35 @@ def start_stream(self, name: str, config: dict):
def remove_stream(self, name: str):
self._request("DELETE", f"streams/{name}")

def wait_for_stream_to_finish(self,
name: str,
timeout_sec=60,
remove=True):
def stream_metrics(self, name: str):
metrics_resp = self._request("GET", "metrics")
assert metrics_resp.status_code == 200
families = text_string_to_metric_families(metrics_resp.text)
metrics = dict()
for family in families:
for sample in family.samples:
if sample.labels.get('stream') == name:
family_metrics = metrics.get(family.name, [])
family_metrics.append(sample)
metrics[family.name] = family_metrics
return metrics

def stop_stream(self, name: str, wait_to_finish=True, timeout_sec=60):
"""
Waits for all streams to finish and then removes the stream
Optionally waits for the stream to finish and then removes the stream
"""
def _finished():
streams = self._request("GET", f"streams").json()
return name not in streams or streams[name]["active"] == False

wait_until(_finished,
timeout_sec=timeout_sec,
backoff_sec=0.5,
err_msg=f"Timeout waiting for {name} stream to finish",
retry_on_exc=True)
if wait_to_finish:
wait_until(_finished,
timeout_sec=timeout_sec,
backoff_sec=0.5,
err_msg=f"Timeout waiting for {name} stream to finish",
retry_on_exc=True)

if remove:
self.remove_stream(name)
self.remove_stream(name)

def _request(self, method, endpoint, **kwargs):
self.logger.debug(f"Executing request {method} {self.url}/{endpoint}")
Expand Down
31 changes: 31 additions & 0 deletions tests/rptest/tests/datalake/datalake_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rptest.services.spark_service import SparkService
from rptest.services.trino_service import TrinoService
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.services.redpanda_connect import RedpandaConnectService
from rptest.tests.datalake.query_engine_factory import get_query_engine_by_type


Expand Down Expand Up @@ -94,6 +95,36 @@ def spark(self) -> SparkService:
assert spark, "Missing Spark service"
return spark

def start_counter_stream(self,
topic: str,
name: str = "ducky_stream",
count: int = 100,
interval: str = "") -> RedpandaConnectService:
stream_conf = {
"input": {
"generate": {
"mapping": "root = counter()",
"interval": interval,
"count": count,
"batch_size": 1
}
},
"pipeline": {
"processors": []
},
"output": {
"redpanda": {
"seed_brokers": self.redpanda.brokers_list(),
"topic": topic,
}
}
}
connect = RedpandaConnectService(self.test_ctx, self.redpanda)
connect.start()
# create a stream
connect.start_stream(name, config=stream_conf)
return connect

def service(self, engine_type: QueryEngineType):
for e in self.query_engines:
if e.engine_name() == engine_type:
Expand Down
7 changes: 4 additions & 3 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ def _consumer_thread(self):

with self._lock:
self._total_msgs_cnt += 1
self.logger.debug(
f"Consumed message partition: {msg.partition()} at offset {msg.offset()}"
)
if self._total_msgs_cnt % 100 == 0:
self.logger.debug(
f"Consumed message partition: {msg.partition()} at offset {msg.offset()}"
)
self._consumed_messages[msg.partition()].append(msg)
if len(self._errors) > 0:
return
Expand Down
31 changes: 2 additions & 29 deletions tests/rptest/tests/datalake/datalake_verifier_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,6 @@ def __init__(self, test_context):
},
schema_registry_config=SchemaRegistryConfig())

def simple_stream(self, topic, subject):
return {
"input": {
"generate": {
"mapping": "root = counter()",
"interval": "",
"count": 100,
"batch_size": 1
}
},
"pipeline": {
"processors": []
},
"output": {
"redpanda": {
"seed_brokers": self.redpanda.brokers_list(),
"topic": topic,
}
}
}

def setUp(self):
pass

Expand All @@ -70,15 +49,9 @@ def _prepare_test_data(self, topic_name: str, dl: DatalakeServices):
partitions=1,
replicas=1,
iceberg_mode="key_value")

connect = RedpandaConnectService(self.test_context, self.redpanda)
connect.start()
# create a stream
connect.start_stream(name="ducky_stream",
config=self.simple_stream(topic_name,
"verifier_schema"))
connect = dl.start_counter_stream(topic=topic_name)
dl.wait_for_translation(topic_name, 100)
connect.wait_for_stream_to_finish("ducky_stream")
connect.stop_stream("ducky_stream")

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types())
Expand Down
109 changes: 109 additions & 0 deletions tests/rptest/tests/datalake/partition_movement_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.spark_service import QueryEngineType
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import PandaproxyConfig, SchemaRegistryConfig, SISettings
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.services.cluster import cluster
from rptest.tests.datalake.utils import supported_storage_types


class PartitionMovementTest(PartitionMovementMixin, RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
super(PartitionMovementTest,
self).__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_context=test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000
},
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.test_ctx = test_ctx
self.topic_name = "test"

def setUp(self):
pass

@cluster(num_nodes=4)
@matrix(cloud_storage_type=supported_storage_types())
def test_cross_core_movements(self, cloud_storage_type):
"""Tests interaction between cross core partition movement and iceberg translation.
Cross core partition movement involves shutting down the partition replica machinery on one
core and restarting it on another core. This test ensures the translation can make progress
during these cross core movements."""

moves = 15
admin = self.redpanda._admin
topic = self.topic_name
partition = 0
stream = "cross_core_test"

with DatalakeServices(self.test_context,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[QueryEngineType.TRINO
]) as dl:
dl.create_iceberg_enabled_topic(topic)
# A long running counter that runs until stopped
connect = dl.start_counter_stream(name=stream,
topic=topic,
count=0,
interval="1ms")

def total_records_ingested():
metrics = connect.stream_metrics(name=stream)
samples = metrics["output_sent"]
for s in samples:
if s.name == "output_sent_total":
return s.value
assert False, f"Unable to probe metrics for stream {stream}"

def ensure_stream_progress(target: int):
wait_until(
lambda: total_records_ingested() >= target,
timeout_sec=20,
backoff_sec=5,
err_msg=
f"Timed out waiting for stream producer to reach target: {target}"
)

for _ in range(moves):
assignments = self._get_current_node_cores(
admin, topic, partition)
for a in assignments:
a['core'] = (a['core'] +
1) % self.redpanda.get_node_cpu_count()

counter_before = total_records_ingested()

self._set_partition_assignments(topic,
partition,
assignments,
admin=admin)
self._wait_post_move(topic, partition, assignments, 180)
# Make sure the stream is not stuck
ensure_stream_progress(counter_before + 500)

connect.stop_stream(name=stream, wait_to_finish=False)

total_row_count = total_records_ingested()
dl.wait_for_translation_until_offset(topic, total_row_count - 1)

verifier = DatalakeVerifier(self.redpanda, topic, dl.trino())
verifier.start()
verifier.wait()
2 changes: 1 addition & 1 deletion tests/rptest/tests/datalake/simple_connect_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,5 @@ def test_translating_avro_serialized_records(self, cloud_storage_type):
topic_name, "verifier_schema"))

verifier.start()
connect.wait_for_stream_to_finish("ducky_stream")
connect.stop_stream("ducky_stream")
verifier.wait()
2 changes: 1 addition & 1 deletion tests/rptest/tests/redpanda_connect_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def test_redpanda_connect_producer(self):
"test"))

# wait for the stream to finish
connect.wait_for_stream_to_finish(name="ducky_stream")
connect.stop_stream(name="ducky_stream")
connect.wait()

# check if the messages are in the topic
Expand Down

0 comments on commit 25163d2

Please sign in to comment.