From 818728705301fa1ef99a5959441ffb5f62bc9915 Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Thu, 7 Nov 2024 14:42:58 +0000 Subject: [PATCH 1/3] tests/migrations: run mount/unmount commands without finjector otherwise we'll need to be prepared to a situation where we get no successful reply, but the migration is created --- .../rptest/tests/data_migrations_api_test.py | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index 0a3eaa1bac1e7..fb2e9d3036507 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -661,13 +661,12 @@ def test_higher_level_migration_api(self): producer.begin_transaction() producer.produce(topics[0].name, key="key2", value="value2") + # out + outbound_topics = [make_namespaced_topic(t.name) for t in topics] + reply = self.admin.unmount_topics(outbound_topics).json() + self.logger.info(f"create migration reply: {reply}") + out_migration_id = reply["id"] with self.finj_thread(): - # out - outbound_topics = [make_namespaced_topic(t.name) for t in topics] - reply = self.admin.unmount_topics(outbound_topics).json() - self.logger.info(f"create migration reply: {reply}") - out_migration_id = reply["id"] - self.logger.info('waiting for partitions be deleted') self.wait_partitions_disappear(topics) self.logger.info('waiting for migration to be deleted') @@ -676,22 +675,22 @@ def test_higher_level_migration_api(self): migrations_map = self.get_migrations_map() self.logger.info(f"migrations: {migrations_map}") - # in - inbound_topics = [ - InboundTopic(make_namespaced_topic(t.name), - alias=\ - None if i == 0 - else make_namespaced_topic(f"{t.name}-alias")) - for i, t in enumerate(topics[:3]) - ] - inbound_topics_spec = [ - TopicSpec(name=(it.alias or it.source_topic_reference).topic, - partition_count=3) for it in inbound_topics - ] - reply = self.admin.mount_topics(inbound_topics).json() - self.logger.info(f"create migration reply: {reply}") - in_migration_id = reply["id"] - + # in + inbound_topics = [ + InboundTopic(make_namespaced_topic(t.name), + alias=\ + None if i == 0 + else make_namespaced_topic(f"{t.name}-alias")) + for i, t in enumerate(topics[:3]) + ] + inbound_topics_spec = [ + TopicSpec(name=(it.alias or it.source_topic_reference).topic, + partition_count=3) for it in inbound_topics + ] + reply = self.admin.mount_topics(inbound_topics).json() + self.logger.info(f"create migration reply: {reply}") + in_migration_id = reply["id"] + with self.finj_thread(): self.logger.info('waiting for partitions to come back') self.wait_partitions_appear(inbound_topics_spec) self.logger.info('waiting for migration to be deleted') From a6c86fdbfc0e727ca26362f3318a3f5108f3b215 Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Thu, 7 Nov 2024 17:29:37 +0000 Subject: [PATCH 2/3] raft/consensus: catch ss::condition_variable_timed_out not timed_out_error when waiting for a condvar with timeout --- src/v/raft/consensus.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 2c1a45ab445af..6460b971e9c86 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -767,7 +767,7 @@ consensus::linearizable_barrier(model::timeout_clock::time_point deadline) { }); } catch (const ss::broken_condition_variable& e) { co_return ret_t(make_error_code(errc::shutting_down)); - } catch (const ss::timed_out_error& e) { + } catch (const ss::condition_variable_timed_out& e) { co_return errc::timeout; } // grab an oplock to serialize state updates i.e. wait for all updates in From eafe51f70c4eab42421d6e409c1f3117c7fb2b89 Mon Sep 17 00:00:00 2001 From: Alexey Bashtanov Date: Fri, 8 Nov 2024 16:44:45 +0000 Subject: [PATCH 3/3] tests/migrations: when under failure injector, allow for migration absence Finj may make things lag, so tolerate migration absence, but not wrong data. --- .../rptest/tests/data_migrations_api_test.py | 54 +++++++++---------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/tests/rptest/tests/data_migrations_api_test.py b/tests/rptest/tests/data_migrations_api_test.py index fb2e9d3036507..e5049e4ae1050 100644 --- a/tests/rptest/tests/data_migrations_api_test.py +++ b/tests/rptest/tests/data_migrations_api_test.py @@ -466,6 +466,29 @@ def assure_exactly_one_message(self, self.logger.debug(f"second msg={msg}") assert msg is None + def check_migrations(self, migration_id, exp_topics_cnt, + exp_migrations_cnt): + """ make sure that, when the migration appears, + - it its state is planned, + - it contains this many topics, + - and also there are that many migrations in total """ + def check(): + migrations_map = self.get_migrations_map() + self.logger.info(f"migrations: {migrations_map}") + if migration_id not in migrations_map: + return False # finj may make things lag ... + migration = migrations_map[migration_id] + # ... but not lie + assert migration['state'] == 'planned' + assert len(migration['migration']['topics']) == exp_topics_cnt + assert len(migrations_map) == exp_migrations_cnt + return True + + wait_until(check, + timeout_sec=10, + backoff_sec=1, + err_msg=f"Failed waiting for migration") + @cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST) def test_mount_inexistent(self): topic = TopicSpec(partition_count=3) @@ -475,14 +498,7 @@ def test_mount_inexistent(self): topics=[InboundTopic(make_namespaced_topic(topic.name))], consumer_groups=[]) in_migration_id = self.create_and_wait(in_migration) - - migrations_map = self.get_migrations_map() - self.logger.info(f"migrations: {migrations_map}") - assert len( - migrations_map) == 1, "There should be one data migration" - - assert len(migrations_map[in_migration_id]['migration'] - ['topics']) == 1, "migration should contain one topic" + self.check_migrations(in_migration_id, 1, 1) self.execute_data_migration_action_flaky(in_migration_id, MigrationAction.prepare) @@ -512,7 +528,6 @@ def test_creating_and_listing_migrations(self): self.client().create_topic(t) migrations_map = self.get_migrations_map() - assert len(migrations_map) == 0, "There should be no data migrations" with self.finj_thread(): @@ -522,16 +537,7 @@ def test_creating_and_listing_migrations(self): consumer_groups=[]) out_migration_id = self.create_and_wait(out_migration) - - migrations_map = self.get_migrations_map() - self.logger.info(f"migrations: {migrations_map}") - assert len( - migrations_map) == 1, "There should be one data migration" - - assert migrations_map[out_migration_id]['state'] == 'planned' - - assert len(migrations_map[out_migration_id]['migration']['topics'] - ) == len(topics), "migration should contain all topics" + self.check_migrations(out_migration_id, len(topics), 1) self.execute_data_migration_action_flaky(out_migration_id, MigrationAction.prepare) @@ -562,15 +568,7 @@ def test_creating_and_listing_migrations(self): in_migration = InboundDataMigration(topics=inbound_topics, consumer_groups=["g-1", "g-2"]) in_migration_id = self.create_and_wait(in_migration) - - migrations_map = self.get_migrations_map() - self.logger.info(f"migrations: {migrations_map}") - assert len( - migrations_map) == 2, "There should be two data migrations" - - assert len( - migrations_map[in_migration_id]['migration']['topics']) == len( - inbound_topics), "migration should contain all topics" + self.check_migrations(in_migration_id, len(inbound_topics), 2) for t in inbound_topics: self.logger.info(