diff --git a/tests/rptest/tests/ot_consistency_test.py b/tests/rptest/tests/ot_consistency_test.py new file mode 100644 index 000000000000..de9b401a1c7d --- /dev/null +++ b/tests/rptest/tests/ot_consistency_test.py @@ -0,0 +1,50 @@ +# Copyright 2020 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import json +import os +import re +import sys +import time +import traceback +from collections import namedtuple, defaultdict +from typing import DefaultDict, List, Optional + +from ducktape.mark import matrix +from ducktape.utils.util import wait_until + +from rptest.clients.kafka_cat import KafkaCat +from rptest.clients.kafka_cli_tools import KafkaCliTools +from rptest.clients.rpk import RpkTool +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.redpanda import RedpandaService, SISettings, CloudStorageTypeAndUrlStyle, get_cloud_storage_type, get_cloud_storage_type_and_url_style +from rptest.tests.redpanda_test import RedpandaTest +from rptest.utils.node_operations import verify_offset_translator_state_consistent + + +class OffsetTranslatorConsistencyTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self._ctx = test_ctx + super(OffsetTranslatorConsistencyTest, self).__init__( + test_ctx, + si_settings=SISettings(test_ctx, + log_segment_size=1024 * 1024, + fast_uploads=True), + *args, + **kwargs, + ) + + @cluster(num_nodes=3) + def test_offset_translator_state_consistent(self): + cli = KafkaCliTools(self.redpanda) + topic = TopicSpec(partition_count=3, replication_factor=3) + + cli.create_topic(topic) + cli.produce(topic.name, 1000, 100) + verify_offset_translator_state_consistent(self.redpanda) diff --git a/tests/rptest/tests/random_node_operations_test.py b/tests/rptest/tests/random_node_operations_test.py index c152396bc20a..8ff8fc9ad924 100644 --- a/tests/rptest/tests/random_node_operations_test.py +++ b/tests/rptest/tests/random_node_operations_test.py @@ -26,7 +26,7 @@ from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, PREV_VERSION_LOG_ALLOW_LIST, CloudStorageType, LoggingConfig, PandaproxyConfig, SISettings, SchemaRegistryConfig, get_cloud_storage_type from rptest.services.redpanda_installer import RedpandaInstaller from rptest.utils.mode_checks import cleanup_on_early_exit, skip_debug_mode, skip_fips_mode -from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload +from rptest.utils.node_operations import FailureInjectorBackgroundThread, NodeOpsExecutor, generate_random_workload, verify_offset_translator_state_consistent from rptest.clients.offline_log_viewer import OfflineLogViewer from rptest.tests.datalake.utils import supported_storage_types @@ -58,6 +58,8 @@ def __init__(self, test_context, *args, **kwargs): # set disk timeout to value greater than max suspend time # not to emit spurious errors "raft_io_timeout_ms": 20000, + "compacted_log_segment_size": 1024 * 1024, + "log_segment_size": 2 * 1024 * 1024, }, # 2 nodes for kgo producer/consumer workloads node_prealloc_count=3, @@ -593,6 +595,7 @@ def cluster_version_updated(): err_msg="Error waiting for cluster to report consistent version" ) + verify_offset_translator_state_consistent(self.redpanda) # Validate that the controller log written during the test is readable by offline log viewer log_viewer = OfflineLogViewer(self.redpanda) for node in self.redpanda.started_nodes(): diff --git a/tests/rptest/utils/node_operations.py b/tests/rptest/utils/node_operations.py index 816d2dbe9472..3b00581d1811 100644 --- a/tests/rptest/utils/node_operations.py +++ b/tests/rptest/utils/node_operations.py @@ -10,6 +10,7 @@ from collections import defaultdict from enum import Enum import random +import re import threading import time import requests @@ -20,6 +21,7 @@ from rptest.services.failure_injector import FailureInjector, FailureSpec from rptest.services.redpanda import RedpandaService from rptest.services.redpanda_installer import VERSION_RE, int_tuple +from rptest.util import wait_until_result class OperationType(Enum): @@ -79,6 +81,51 @@ def add(idx): yield NodeOperation(op, idx, random.choice([True, False])) +def verify_offset_translator_state_consistent(redpanda: RedpandaService): + logger = redpanda.logger + last_delta_pattern = re.compile('^\\{.*, last delta: (?P\\d+)\\}$') + admin = Admin(redpanda) + + for n in redpanda.started_nodes(): + node_id = redpanda.node_id(n) + all_partitions = admin.get_partitions(node=n) + + def _state_consistent(ns, topic, partition): + + state = admin.get_partition_state(ns, topic, partition, node=n) + dirty_offset = state['replicas'][0]['dirty_offset'] + if all(r['dirty_offset'] == dirty_offset + for r in state['replicas']): + return True, state + return False, None + + for p in all_partitions: + namespace = p['ns'] + topic = p['topic'] + partition = p['partition_id'] + partition_name = f"{namespace}/{topic}/{partition}" + state = wait_until_result( + lambda: _state_consistent(namespace, topic, partition), + timeout_sec=180, + backoff_sec=1, + err_msg="Error waiting for offsets to be consistent") + + logger.debug( + f"debug state of {partition_name} replica on node {node_id}: {state}" + ) + last_deltas = set() + for r_state in state['replicas']: + ot_state = r_state['raft_state']['offset_translator_state'] + if "empty" in ot_state: + continue + m = last_delta_pattern.match(ot_state) + assert m, f"offset translator state {ot_state} does not match expected pattern" + last_deltas.add(m['delta']) + assert len( + last_deltas + ) <= 1, f"partition {p} has inconsistent offset translation. Last deltas: {last_deltas}" + + class NodeDecommissionWaiter(): def __init__(self, redpanda,