From 56aea6de66f72f58e671ea82ee6ff28e79481d6f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 25 Mar 2022 11:18:37 +0000 Subject: [PATCH 1/3] tests: cut over si-verifier->kgo-verifier This is a rename of the tool (which was never really SI-specific and a move to the redpanda-data org out of jcsp's personal repo) --- tests/docker/Dockerfile | 6 +++--- .../services/franz_go_verifiable_services.py | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 1afeac7ae1fd5..f020625ea4b17 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -117,9 +117,9 @@ RUN git -C /opt clone https://github.com/twmb/franz-go.git && cd /opt/franz-go & RUN go install github.com/twmb/kcl@latest && \ mv /root/go/bin/kcl /usr/local/bin/ -# Should fork to redpanda repo -RUN git -C /opt clone https://github.com/jcsp/si-verifier.git && \ - cd /opt/si-verifier && git reset --hard bcf4996d063e96497f8ff8be35d82fa1cec53180 && \ +# Install the kgo-verifier tool +RUN git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git && \ + cd /opt/kgo-verifier && git reset --hard 30c56af && \ go mod tidy && go build # Expose port 8080 for any http examples within clients diff --git a/tests/rptest/services/franz_go_verifiable_services.py b/tests/rptest/services/franz_go_verifiable_services.py index f776c30a2754c..a199a3049bf4b 100644 --- a/tests/rptest/services/franz_go_verifiable_services.py +++ b/tests/rptest/services/franz_go_verifiable_services.py @@ -12,7 +12,7 @@ from ducktape.services.background_thread import BackgroundThreadService # The franz-go root directory -TESTS_DIR = os.path.join("/opt", "si-verifier") +TESTS_DIR = os.path.join("/opt", "kgo-verifier") from enum import Enum @@ -25,7 +25,7 @@ class ServiceStatus(Enum): class FranzGoVerifiableService(BackgroundThreadService): """ - FranzGoVerifiableService is si-verifier service. + FranzGoVerifiableService is kgo-verifier service. To validate produced record user should run consumer and producer in one node. Use ctx.cluster.alloc(ClusterSpec.simple_linux(1)) to allocate node and pass it to constructor """ @@ -87,8 +87,8 @@ def stop_node(self, node): self.logger.debug("Killing pid %s" % {self._pid}) node.account.signal(self._pid, 9, allow_fail=True) else: - self.logger.debug("Killing si-verifier") - node.account.kill_process("si-verifier", + self.logger.debug("Killing kgo-verifier") + node.account.kill_process("kgo-verifier", clean_shutdown=False) except RemoteCommandError as e: if b"No such process" not in e.msg: @@ -99,7 +99,7 @@ def stop_node(self, node): def clean_node(self, node): self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node") - node.account.kill_process("si-verifier", clean_shutdown=False) + node.account.kill_process("kgo-verifier", clean_shutdown=False) node.account.remove("valid_offsets*json", True) def start_node(self, node, clean=None): @@ -144,7 +144,7 @@ def _worker(self, idx, node): while not self._stopping.is_set( ) and not self._shutting_down.is_set(): cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs 0 --rand_read_msgs 0 --seq_read=1' % ( - f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(), + f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(), self._topic, self._msg_size) self.execute_cmd(cmd, node) except Exception as ex: @@ -174,7 +174,7 @@ def _worker(self, idx, node): while not self._stopping.is_set( ) and not self._shutting_down.is_set(): cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs 0 --rand_read_msgs %s --parallel %s --seq_read=0' % ( - f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(), + f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(), self._topic, self._msg_size, self._rand_read_msgs, self._parallel) @@ -202,7 +202,7 @@ def _worker(self, idx, node): self._stopping.clear() try: cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs %s --rand_read_msgs 0 --seq_read=0' % ( - f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(), + f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(), self._topic, self._msg_size, self._msg_count) self.execute_cmd(cmd, node) From 6bde87d1c5c41c8a9a348d36f6ffebb5989e76c9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Mar 2022 11:17:25 +0100 Subject: [PATCH 2/3] tests: update kgo-verifier wrappers to read status --- tests/docker/Dockerfile | 2 +- .../services/franz_go_verifiable_services.py | 77 ++++++++++++++++++- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index f020625ea4b17..a6290f5fb229c 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -114,7 +114,7 @@ RUN cd /tmp/go/sarama/produce_test && go mod tidy && go build RUN git -C /opt clone https://github.com/twmb/franz-go.git && cd /opt/franz-go && \ cd /opt/franz-go/examples/bench && go mod tidy && go build -RUN go install github.com/twmb/kcl@latest && \ +RUN go install github.com/twmb/kcl@v0.8.0 && \ mv /root/go/bin/kcl /usr/local/bin/ # Install the kgo-verifier tool diff --git a/tests/rptest/services/franz_go_verifiable_services.py b/tests/rptest/services/franz_go_verifiable_services.py index a199a3049bf4b..2876f8224b0ed 100644 --- a/tests/rptest/services/franz_go_verifiable_services.py +++ b/tests/rptest/services/franz_go_verifiable_services.py @@ -8,6 +8,7 @@ # by the Apache License, Version 2.0 import os +import json import threading from ducktape.services.background_thread import BackgroundThreadService @@ -64,10 +65,13 @@ def execute_cmd(self, cmd, node): if self._pid is None: self._pid = line.strip() - self.logger.debug(line.rstrip()) if self._stopping.is_set(): break + line = line.rstrip() + self.logger.debug(line) + yield line + def save_exception(self, ex): if self._stopping.is_set(): pass @@ -130,12 +134,33 @@ def free(self): return super(FranzGoVerifiableService, self).free_all() +class ConsumerStatus: + def __init__(self, valid_reads, invalid_reads, out_of_scope_invalid_reads): + self.valid_reads = valid_reads + self.invalid_reads = invalid_reads + self.out_of_scope_invalid_reads = out_of_scope_invalid_reads + + @property + def total_reads(self): + # At time of writing, invalid reads is never nonzero, because the program + # terminates as soon as it sees an invalid read + return self.valid_reads + self.out_of_scope_invalid_reads + + def __str__(self): + return f"ConsumerStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>" + + class FranzGoVerifiableSeqConsumer(FranzGoVerifiableService): def __init__(self, context, redpanda, topic, msg_size, nodes=None): super(FranzGoVerifiableSeqConsumer, self).__init__(context, redpanda, topic, msg_size, nodes) self._shutting_down = threading.Event() + self._consumer_status = ConsumerStatus(0, 0, 0) + + @property + def consumer_status(self): + return self._consumer_status def _worker(self, idx, node): self.status = ServiceStatus.RUNNING @@ -146,7 +171,15 @@ def _worker(self, idx, node): cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs 0 --rand_read_msgs 0 --seq_read=1' % ( f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(), self._topic, self._msg_size) - self.execute_cmd(cmd, node) + for line in self.execute_cmd(cmd, node): + if not line.startswith("{"): + continue + data = json.loads(line) + self._consumer_status = ConsumerStatus( + data['ValidReads'], data['InvalidReads'], + data['OutOfScopeInvalidReads']) + self.logger.info(f"SeqConsumer {self._consumer_status}") + except Exception as ex: self.save_exception(ex) finally: @@ -166,6 +199,11 @@ def __init__(self, self).__init__(context, redpanda, topic, msg_size, nodes) self._rand_read_msgs = rand_read_msgs self._parallel = parallel + self._consumer_status = ConsumerStatus(0, 0, 0) + + @property + def consumer_status(self): + return self._consumer_status def _worker(self, idx, node): self.status = ServiceStatus.RUNNING @@ -178,13 +216,32 @@ def _worker(self, idx, node): self._topic, self._msg_size, self._rand_read_msgs, self._parallel) - self.execute_cmd(cmd, node) + for line in self.execute_cmd(cmd, node): + if not line.startswith("{"): + continue + data = json.loads(line) + self._consumer_status = ConsumerStatus( + data['ValidReads'], data['InvalidReads'], + data['OutOfScopeInvalidReads']) + self.logger.info(f"RandomConsumer {self._consumer_status}") + except Exception as ex: self.save_exception(ex) finally: self.status = ServiceStatus.FINISH +class ProduceStatus: + def __init__(self, sent, acked, bad_offsets, restarts): + self.sent = sent + self.acked = acked + self.bad_offsets = bad_offsets + self.restarts = restarts + + def __str__(self): + return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts}>" + + class FranzGoVerifiableProducer(FranzGoVerifiableService): def __init__(self, context, @@ -196,6 +253,11 @@ def __init__(self, super(FranzGoVerifiableProducer, self).__init__(context, redpanda, topic, msg_size, custom_node) self._msg_count = msg_count + self._status = ProduceStatus(0, 0, 0, 0) + + @property + def produce_status(self): + return self._status def _worker(self, idx, node): self.status = ServiceStatus.RUNNING @@ -205,7 +267,14 @@ def _worker(self, idx, node): f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(), self._topic, self._msg_size, self._msg_count) - self.execute_cmd(cmd, node) + for line in self.execute_cmd(cmd, node): + if line.startswith("{"): + data = json.loads(line) + self._status = ProduceStatus(data['Sent'], data['Acked'], + data['BadOffsets'], + data['Restarts']) + self.logger.info(str(self._status)) + except Exception as ex: self.save_exception(ex) finally: From 73399ea96f5e97d6a972fc275f497f63d2d3bd1f Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 28 Mar 2022 11:17:42 +0100 Subject: [PATCH 3/3] tests: update kgo-verifier tests to check statuses This makes the tests a lot more robust: rather than simply assuming that a non-fatal exit means test success, we check that the test workload sees the expected number of messages. --- .../rptest/tests/franz_go_verifiable_test.py | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/tests/rptest/tests/franz_go_verifiable_test.py b/tests/rptest/tests/franz_go_verifiable_test.py index 713a9e85be414..ade9a4b210293 100644 --- a/tests/rptest/tests/franz_go_verifiable_test.py +++ b/tests/rptest/tests/franz_go_verifiable_test.py @@ -51,15 +51,6 @@ def __init__(self, test_context, *args, **kwargs): self.RANDOM_READ_COUNT, self.RANDOM_READ_PARALLEL, self._node_for_franz_go) - # In the future producer will signal about json creation - def _create_json_file(self, msg_count): - small_producer = FranzGoVerifiableProducer(self.test_context, - self.redpanda, self.topic, - self.MSG_SIZE, msg_count, - self._node_for_franz_go) - small_producer.start() - small_producer.wait() - def free_nodes(self): # Free the normally allocated nodes (e.g. RedpandaService) super().free_nodes() @@ -96,19 +87,30 @@ def test_with_all_type_of_loads(self): "Skipping test in debug mode (requires release build)") return - # Need create json file for consumer at first - self._create_json_file(1000) - self._producer.start(clean=False) + + # Don't start consumers until the producer has written out its first + # checkpoint with valid ranges. + wait_until(lambda: self._producer.produce_status.acked > 0, + timeout_sec=30, + backoff_sec=0.1) + wrote_at_least = self._producer.produce_status.acked + self._seq_consumer.start(clean=False) self._rand_consumer.start(clean=False) self._producer.wait() + assert self._producer.produce_status.acked == self.PRODUCE_COUNT + self._seq_consumer.shutdown() self._rand_consumer.shutdown() + self._seq_consumer.wait() self._rand_consumer.wait() + assert self._seq_consumer.consumer_status.valid_reads >= wrote_at_least + assert self._rand_consumer.consumer_status.total_reads == self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL + class FranzGoVerifiableWithSiTest(FranzGoVerifiableBase): MSG_SIZE = 1000000 @@ -159,22 +161,37 @@ def test_with_all_type_of_loads_and_si(self): rpk.alter_topic_config(self.topic, 'retention.bytes', str(self.segment_size)) - # Need create json file for consumer at first - self._create_json_file(10000) + self._producer.start(clean=False) + + # Don't start consumers until the producer has written out its first + # checkpoint with valid ranges. + wait_until(lambda: self._producer.produce_status.acked > 0, + timeout_sec=30, + backoff_sec=5.0) - # Verify that we really enabled shadow indexing correctly, such - # that some objects were written + # nce we've written a lot of data, check that some of it showed up in S3 + wait_until(lambda: self._producer.produce_status.acked > 10000, + timeout_sec=300, + backoff_sec=5) objects = list(self.redpanda.get_objects_from_si()) assert len(objects) > 0 for o in objects: self.logger.info(f"S3 object: {o.Key}, {o.ContentLength}") - self._producer.start(clean=False) + wrote_at_least = self._producer.produce_status.acked self._seq_consumer.start(clean=False) self._rand_consumer.start(clean=False) + # Wait until we have written all the data we expected to write self._producer.wait() + assert self._producer.produce_status.acked >= self.PRODUCE_COUNT + + # Wait for last iteration of consumers to finish: if they are currently + # mid-run, they'll run to completion. self._seq_consumer.shutdown() self._rand_consumer.shutdown() self._seq_consumer.wait() self._rand_consumer.wait() + + assert self._seq_consumer.consumer_status.valid_reads >= wrote_at_least + assert self._rand_consumer.consumer_status.total_reads == self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL