Skip to content

Commit

Permalink
Merge pull request redpanda-data#24073 from bashtanov/migrations-test…
Browse files Browse the repository at this point in the history
…-fixes2

Fixes for migrations tests
  • Loading branch information
bashtanov authored Nov 12, 2024
2 parents 1fb3f4e + eafe51f commit ac52b7e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ consensus::linearizable_barrier(model::timeout_clock::time_point deadline) {
});
} catch (const ss::broken_condition_variable& e) {
co_return ret_t(make_error_code(errc::shutting_down));
} catch (const ss::timed_out_error& e) {
} catch (const ss::condition_variable_timed_out& e) {
co_return errc::timeout;
}
// grab an oplock to serialize state updates i.e. wait for all updates in
Expand Down
97 changes: 47 additions & 50 deletions tests/rptest/tests/data_migrations_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,29 @@ def poll_hard(consumer, timeout):
self.logger.debug(f"second msg={format_message(msg)}")
assert msg is None

def check_migrations(self, migration_id, exp_topics_cnt,
exp_migrations_cnt):
""" make sure that, when the migration appears,
- it its state is planned,
- it contains this many topics,
- and also there are that many migrations in total """
def check():
migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
if migration_id not in migrations_map:
return False # finj may make things lag ...
migration = migrations_map[migration_id]
# ... but not lie
assert migration['state'] == 'planned'
assert len(migration['migration']['topics']) == exp_topics_cnt
assert len(migrations_map) == exp_migrations_cnt
return True

wait_until(check,
timeout_sec=10,
backoff_sec=1,
err_msg=f"Failed waiting for migration")

@cluster(num_nodes=3, log_allow_list=MIGRATION_LOG_ALLOW_LIST)
def test_mount_inexistent(self):
topic = TopicSpec(partition_count=3)
Expand All @@ -491,14 +514,7 @@ def test_mount_inexistent(self):
topics=[InboundTopic(make_namespaced_topic(topic.name))],
consumer_groups=[])
in_migration_id = self.create_and_wait(in_migration)

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(
migrations_map) == 1, "There should be one data migration"

assert len(migrations_map[in_migration_id]['migration']
['topics']) == 1, "migration should contain one topic"
self.check_migrations(in_migration_id, 1, 1)

self.execute_data_migration_action_flaky(in_migration_id,
MigrationAction.prepare)
Expand Down Expand Up @@ -528,7 +544,6 @@ def test_creating_and_listing_migrations(self):
self.client().create_topic(t)

migrations_map = self.get_migrations_map()

assert len(migrations_map) == 0, "There should be no data migrations"

with self.finj_thread():
Expand All @@ -538,16 +553,7 @@ def test_creating_and_listing_migrations(self):
consumer_groups=[])

out_migration_id = self.create_and_wait(out_migration)

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(
migrations_map) == 1, "There should be one data migration"

assert migrations_map[out_migration_id]['state'] == 'planned'

assert len(migrations_map[out_migration_id]['migration']['topics']
) == len(topics), "migration should contain all topics"
self.check_migrations(out_migration_id, len(topics), 1)

self.execute_data_migration_action_flaky(out_migration_id,
MigrationAction.prepare)
Expand Down Expand Up @@ -578,15 +584,7 @@ def test_creating_and_listing_migrations(self):
in_migration = InboundDataMigration(topics=inbound_topics,
consumer_groups=["g-1", "g-2"])
in_migration_id = self.create_and_wait(in_migration)

migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")
assert len(
migrations_map) == 2, "There should be two data migrations"

assert len(
migrations_map[in_migration_id]['migration']['topics']) == len(
inbound_topics), "migration should contain all topics"
self.check_migrations(in_migration_id, len(inbound_topics), 2)

for t in inbound_topics:
self.logger.info(
Expand Down Expand Up @@ -677,13 +675,12 @@ def test_higher_level_migration_api(self):
producer.begin_transaction()
producer.produce(topics[0].name, key="key2", value="value2")

# out
outbound_topics = [make_namespaced_topic(t.name) for t in topics]
reply = self.admin.unmount_topics(outbound_topics).json()
self.logger.info(f"create migration reply: {reply}")
out_migration_id = reply["id"]
with self.finj_thread():
# out
outbound_topics = [make_namespaced_topic(t.name) for t in topics]
reply = self.admin.unmount_topics(outbound_topics).json()
self.logger.info(f"create migration reply: {reply}")
out_migration_id = reply["id"]

self.logger.info('waiting for partitions be deleted')
self.wait_partitions_disappear(topics)
self.logger.info('waiting for migration to be deleted')
Expand All @@ -692,22 +689,22 @@ def test_higher_level_migration_api(self):
migrations_map = self.get_migrations_map()
self.logger.info(f"migrations: {migrations_map}")

# in
inbound_topics = [
InboundTopic(make_namespaced_topic(t.name),
alias=\
None if i == 0
else make_namespaced_topic(f"{t.name}-alias"))
for i, t in enumerate(topics[:3])
]
inbound_topics_spec = [
TopicSpec(name=(it.alias or it.source_topic_reference).topic,
partition_count=3) for it in inbound_topics
]
reply = self.admin.mount_topics(inbound_topics).json()
self.logger.info(f"create migration reply: {reply}")
in_migration_id = reply["id"]

# in
inbound_topics = [
InboundTopic(make_namespaced_topic(t.name),
alias=\
None if i == 0
else make_namespaced_topic(f"{t.name}-alias"))
for i, t in enumerate(topics[:3])
]
inbound_topics_spec = [
TopicSpec(name=(it.alias or it.source_topic_reference).topic,
partition_count=3) for it in inbound_topics
]
reply = self.admin.mount_topics(inbound_topics).json()
self.logger.info(f"create migration reply: {reply}")
in_migration_id = reply["id"]
with self.finj_thread():
self.logger.info('waiting for partitions to come back')
self.wait_partitions_appear(inbound_topics_spec)
self.logger.info('waiting for migration to be deleted')
Expand Down

0 comments on commit ac52b7e

Please sign in to comment.