Skip to content

Commit

Permalink
Merge pull request #4119 from jcsp/kgo-verifier
Browse files Browse the repository at this point in the history
tests: rename si-verifier to kgo-verifier, and improve it.
  • Loading branch information
jcsp authored Mar 29, 2022
2 parents b744123 + 73399ea commit 3957dd5
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 33 deletions.
8 changes: 4 additions & 4 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ RUN cd /tmp/go/sarama/produce_test && go mod tidy && go build
RUN git -C /opt clone https://github.com/twmb/franz-go.git && cd /opt/franz-go && \
cd /opt/franz-go/examples/bench && go mod tidy && go build

RUN go install github.com/twmb/kcl@latest && \
RUN go install github.com/twmb/kcl@v0.8.0 && \
mv /root/go/bin/kcl /usr/local/bin/

# Should fork to redpanda repo
RUN git -C /opt clone https://github.com/jcsp/si-verifier.git && \
cd /opt/si-verifier && git reset --hard bcf4996d063e96497f8ff8be35d82fa1cec53180 && \
# Install the kgo-verifier tool
RUN git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git && \
cd /opt/kgo-verifier && git reset --hard 30c56af && \
go mod tidy && go build

# Expose port 8080 for any http examples within clients
Expand Down
93 changes: 81 additions & 12 deletions tests/rptest/services/franz_go_verifiable_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
# by the Apache License, Version 2.0

import os
import json
import threading
from ducktape.services.background_thread import BackgroundThreadService

# The franz-go root directory
TESTS_DIR = os.path.join("/opt", "si-verifier")
TESTS_DIR = os.path.join("/opt", "kgo-verifier")

from enum import Enum

Expand All @@ -25,7 +26,7 @@ class ServiceStatus(Enum):

class FranzGoVerifiableService(BackgroundThreadService):
"""
FranzGoVerifiableService is si-verifier service.
FranzGoVerifiableService is kgo-verifier service.
To validate produced record user should run consumer and producer in one node.
Use ctx.cluster.alloc(ClusterSpec.simple_linux(1)) to allocate node and pass it to constructor
"""
Expand Down Expand Up @@ -64,10 +65,13 @@ def execute_cmd(self, cmd, node):
if self._pid is None:
self._pid = line.strip()

self.logger.debug(line.rstrip())
if self._stopping.is_set():
break

line = line.rstrip()
self.logger.debug(line)
yield line

def save_exception(self, ex):
if self._stopping.is_set():
pass
Expand All @@ -87,8 +91,8 @@ def stop_node(self, node):
self.logger.debug("Killing pid %s" % {self._pid})
node.account.signal(self._pid, 9, allow_fail=True)
else:
self.logger.debug("Killing si-verifier")
node.account.kill_process("si-verifier",
self.logger.debug("Killing kgo-verifier")
node.account.kill_process("kgo-verifier",
clean_shutdown=False)
except RemoteCommandError as e:
if b"No such process" not in e.msg:
Expand All @@ -99,7 +103,7 @@ def stop_node(self, node):

def clean_node(self, node):
self._redpanda.logger.info(f"{self.__class__.__name__}.clean_node")
node.account.kill_process("si-verifier", clean_shutdown=False)
node.account.kill_process("kgo-verifier", clean_shutdown=False)
node.account.remove("valid_offsets*json", True)

def start_node(self, node, clean=None):
Expand Down Expand Up @@ -130,12 +134,33 @@ def free(self):
return super(FranzGoVerifiableService, self).free_all()


class ConsumerStatus:
def __init__(self, valid_reads, invalid_reads, out_of_scope_invalid_reads):
self.valid_reads = valid_reads
self.invalid_reads = invalid_reads
self.out_of_scope_invalid_reads = out_of_scope_invalid_reads

@property
def total_reads(self):
# At time of writing, invalid reads is never nonzero, because the program
# terminates as soon as it sees an invalid read
return self.valid_reads + self.out_of_scope_invalid_reads

def __str__(self):
return f"ConsumerStatus<{self.valid_reads} {self.invalid_reads} {self.out_of_scope_invalid_reads}>"


class FranzGoVerifiableSeqConsumer(FranzGoVerifiableService):
def __init__(self, context, redpanda, topic, msg_size, nodes=None):
super(FranzGoVerifiableSeqConsumer,
self).__init__(context, redpanda, topic, msg_size, nodes)

self._shutting_down = threading.Event()
self._consumer_status = ConsumerStatus(0, 0, 0)

@property
def consumer_status(self):
return self._consumer_status

def _worker(self, idx, node):
self.status = ServiceStatus.RUNNING
Expand All @@ -144,9 +169,17 @@ def _worker(self, idx, node):
while not self._stopping.is_set(
) and not self._shutting_down.is_set():
cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs 0 --rand_read_msgs 0 --seq_read=1' % (
f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(),
f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(),
self._topic, self._msg_size)
self.execute_cmd(cmd, node)
for line in self.execute_cmd(cmd, node):
if not line.startswith("{"):
continue
data = json.loads(line)
self._consumer_status = ConsumerStatus(
data['ValidReads'], data['InvalidReads'],
data['OutOfScopeInvalidReads'])
self.logger.info(f"SeqConsumer {self._consumer_status}")

except Exception as ex:
self.save_exception(ex)
finally:
Expand All @@ -166,6 +199,11 @@ def __init__(self,
self).__init__(context, redpanda, topic, msg_size, nodes)
self._rand_read_msgs = rand_read_msgs
self._parallel = parallel
self._consumer_status = ConsumerStatus(0, 0, 0)

@property
def consumer_status(self):
return self._consumer_status

def _worker(self, idx, node):
self.status = ServiceStatus.RUNNING
Expand All @@ -174,17 +212,36 @@ def _worker(self, idx, node):
while not self._stopping.is_set(
) and not self._shutting_down.is_set():
cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs 0 --rand_read_msgs %s --parallel %s --seq_read=0' % (
f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(),
f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(),
self._topic, self._msg_size, self._rand_read_msgs,
self._parallel)

self.execute_cmd(cmd, node)
for line in self.execute_cmd(cmd, node):
if not line.startswith("{"):
continue
data = json.loads(line)
self._consumer_status = ConsumerStatus(
data['ValidReads'], data['InvalidReads'],
data['OutOfScopeInvalidReads'])
self.logger.info(f"RandomConsumer {self._consumer_status}")

except Exception as ex:
self.save_exception(ex)
finally:
self.status = ServiceStatus.FINISH


class ProduceStatus:
def __init__(self, sent, acked, bad_offsets, restarts):
self.sent = sent
self.acked = acked
self.bad_offsets = bad_offsets
self.restarts = restarts

def __str__(self):
return f"ProduceStatus<{self.sent} {self.acked} {self.bad_offsets} {self.restarts}>"


class FranzGoVerifiableProducer(FranzGoVerifiableService):
def __init__(self,
context,
Expand All @@ -196,16 +253,28 @@ def __init__(self,
super(FranzGoVerifiableProducer,
self).__init__(context, redpanda, topic, msg_size, custom_node)
self._msg_count = msg_count
self._status = ProduceStatus(0, 0, 0, 0)

@property
def produce_status(self):
return self._status

def _worker(self, idx, node):
self.status = ServiceStatus.RUNNING
self._stopping.clear()
try:
cmd = 'echo $$ ; %s --brokers %s --topic %s --msg_size %s --produce_msgs %s --rand_read_msgs 0 --seq_read=0' % (
f"{TESTS_DIR}/si-verifier", self._redpanda.brokers(),
f"{TESTS_DIR}/kgo-verifier", self._redpanda.brokers(),
self._topic, self._msg_size, self._msg_count)

self.execute_cmd(cmd, node)
for line in self.execute_cmd(cmd, node):
if line.startswith("{"):
data = json.loads(line)
self._status = ProduceStatus(data['Sent'], data['Acked'],
data['BadOffsets'],
data['Restarts'])
self.logger.info(str(self._status))

except Exception as ex:
self.save_exception(ex)
finally:
Expand Down
51 changes: 34 additions & 17 deletions tests/rptest/tests/franz_go_verifiable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ def __init__(self, test_context, *args, **kwargs):
self.RANDOM_READ_COUNT, self.RANDOM_READ_PARALLEL,
self._node_for_franz_go)

# In the future producer will signal about json creation
def _create_json_file(self, msg_count):
small_producer = FranzGoVerifiableProducer(self.test_context,
self.redpanda, self.topic,
self.MSG_SIZE, msg_count,
self._node_for_franz_go)
small_producer.start()
small_producer.wait()

def free_nodes(self):
# Free the normally allocated nodes (e.g. RedpandaService)
super().free_nodes()
Expand Down Expand Up @@ -96,19 +87,30 @@ def test_with_all_type_of_loads(self):
"Skipping test in debug mode (requires release build)")
return

# Need create json file for consumer at first
self._create_json_file(1000)

self._producer.start(clean=False)

# Don't start consumers until the producer has written out its first
# checkpoint with valid ranges.
wait_until(lambda: self._producer.produce_status.acked > 0,
timeout_sec=30,
backoff_sec=0.1)
wrote_at_least = self._producer.produce_status.acked

self._seq_consumer.start(clean=False)
self._rand_consumer.start(clean=False)

self._producer.wait()
assert self._producer.produce_status.acked == self.PRODUCE_COUNT

self._seq_consumer.shutdown()
self._rand_consumer.shutdown()

self._seq_consumer.wait()
self._rand_consumer.wait()

assert self._seq_consumer.consumer_status.valid_reads >= wrote_at_least
assert self._rand_consumer.consumer_status.total_reads == self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL


class FranzGoVerifiableWithSiTest(FranzGoVerifiableBase):
MSG_SIZE = 1000000
Expand Down Expand Up @@ -159,22 +161,37 @@ def test_with_all_type_of_loads_and_si(self):
rpk.alter_topic_config(self.topic, 'retention.bytes',
str(self.segment_size))

# Need create json file for consumer at first
self._create_json_file(10000)
self._producer.start(clean=False)

# Don't start consumers until the producer has written out its first
# checkpoint with valid ranges.
wait_until(lambda: self._producer.produce_status.acked > 0,
timeout_sec=30,
backoff_sec=5.0)

# Verify that we really enabled shadow indexing correctly, such
# that some objects were written
# nce we've written a lot of data, check that some of it showed up in S3
wait_until(lambda: self._producer.produce_status.acked > 10000,
timeout_sec=300,
backoff_sec=5)
objects = list(self.redpanda.get_objects_from_si())
assert len(objects) > 0
for o in objects:
self.logger.info(f"S3 object: {o.Key}, {o.ContentLength}")

self._producer.start(clean=False)
wrote_at_least = self._producer.produce_status.acked
self._seq_consumer.start(clean=False)
self._rand_consumer.start(clean=False)

# Wait until we have written all the data we expected to write
self._producer.wait()
assert self._producer.produce_status.acked >= self.PRODUCE_COUNT

# Wait for last iteration of consumers to finish: if they are currently
# mid-run, they'll run to completion.
self._seq_consumer.shutdown()
self._rand_consumer.shutdown()
self._seq_consumer.wait()
self._rand_consumer.wait()

assert self._seq_consumer.consumer_status.valid_reads >= wrote_at_least
assert self._rand_consumer.consumer_status.total_reads == self.RANDOM_READ_COUNT * self.RANDOM_READ_PARALLEL

0 comments on commit 3957dd5

Please sign in to comment.