diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index 828547788c..84ec2f2eb4 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -12,12 +12,14 @@ mod cmd { use std::path::PathBuf; use async_trait::async_trait; + use fluvio_sc_schema::partition::PartitionMirrorConfig; + use fluvio_sc_schema::topic::{MirrorConfig, PartitionMap, ReplicaSpec, TopicSpec}; #[cfg(feature = "producer-file-io")] use futures::future::join_all; use clap::Parser; use tracing::{error, warn}; use humantime::parse_duration; - use anyhow::Result; + use anyhow::{bail, Result}; use fluvio::{ Compression, Fluvio, FluvioError, TopicProducerPool, TopicProducerConfigBuilder, RecordKey, @@ -173,8 +175,12 @@ mod cmd { pub transforms_line: Vec, /// Partition id - #[arg(short = 'p', long, value_name = "integer")] + #[arg(short = 'p', long, value_name = "integer", conflicts_with = "mirror")] pub partition: Option, + + /// Remote cluster to consume from + #[arg(short = 'm', long, conflicts_with = "partition")] + pub mirror: Option, } fn validate_key_separator(separator: &str) -> std::result::Result { @@ -247,6 +253,46 @@ mod cmd { let config_builder = config_builder.smartmodules(self.smartmodule_invocations(initial_param)?); + let config_builder = if let Some(mirror) = &self.mirror { + let admin = fluvio.admin().await; + let topics = admin.all::().await?; + let partition = topics.into_iter().find_map(|t| match t.spec.replicas() { + ReplicaSpec::Mirror(MirrorConfig::Home(home_mirror_config)) => { + let partitions_maps = + Vec::::from(home_mirror_config.as_partition_maps()); + partitions_maps.iter().find_map(|p| { + if let Some(PartitionMirrorConfig::Home(remote)) = &p.mirror { + if remote.remote_cluster == *mirror && remote.source { + return Some(p.id); + } + } + None + }) + } + ReplicaSpec::Mirror(MirrorConfig::Remote(remote_mirror_config)) => { + let partitions_maps = + Vec::::from(remote_mirror_config.as_partition_maps()); + partitions_maps.iter().find_map(|p| { + if let Some(PartitionMirrorConfig::Remote(remote)) = &p.mirror { + if remote.home_cluster == *mirror && remote.target { + return Some(p.id); + } + } + None + }) + } + _ => None, + }); + + if let Some(partition) = partition { + config_builder.set_specific_partitioner(partition) + } else { + bail!("No partition found for mirror '{}'", mirror); + } + } else { + config_builder + }; + let config_builder = if let Some(partition) = self.partition { config_builder.set_specific_partitioner(partition) } else { diff --git a/crates/fluvio/src/fluvio.rs b/crates/fluvio/src/fluvio.rs index 7ba828bf0a..eba10d03ab 100644 --- a/crates/fluvio/src/fluvio.rs +++ b/crates/fluvio/src/fluvio.rs @@ -358,6 +358,18 @@ impl Fluvio { None }) } + ReplicaSpec::Mirror(MirrorConfig::Remote(remote_mirror_config)) => { + let partitions_maps = + Vec::::from(remote_mirror_config.as_partition_maps()); + partitions_maps.iter().find_map(|p| { + if let Some(PartitionMirrorConfig::Remote(remote)) = &p.mirror { + if remote.home_cluster == *mirror { + return Some(p.id); + } + } + None + }) + } _ => None, } } else { diff --git a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats index 9903ac7e56..a6187197bb 100644 --- a/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats +++ b/tests/cli/mirroring_smoke_tests/e2e/fluvio-core.bats @@ -122,11 +122,11 @@ setup_file() { @test "Can produce message to reverse mirror topic from home" { run bash -c 'echo 3 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' assert_success - run bash -c 'echo c | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + run bash -c 'echo c | timeout 15s "$FLUVIO_BIN" produce -p 0 "$REVERSE_TOPIC_NAME"' assert_success - run bash -c 'echo 4 | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + run bash -c 'echo 4 | timeout 15s "$FLUVIO_BIN" produce -m "$REMOTE_NAME" "$REVERSE_TOPIC_NAME"' assert_success - run bash -c 'echo d | timeout 15s "$FLUVIO_BIN" produce "$REVERSE_TOPIC_NAME"' + run bash -c 'echo d | timeout 15s "$FLUVIO_BIN" produce -m "$REMOTE_NAME" "$REVERSE_TOPIC_NAME"' assert_success } @@ -186,6 +186,10 @@ setup_file() { run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d assert_output 3$'\n'c$'\n'4$'\n'd assert_success + + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" --mirror "$REMOTE_NAME" -B -d + assert_output 3$'\n'c$'\n'4$'\n'd + assert_success } @test "Can switch back to home cluster" { @@ -200,9 +204,9 @@ setup_file() { assert_success run bash -c 'echo e | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' assert_success - run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + run bash -c 'echo 6 | timeout 15s "$FLUVIO_BIN" produce -m "$REMOTE_NAME_2" "$REVERSE_TOPIC_NAME"' assert_success - run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -p 1 "$REVERSE_TOPIC_NAME"' + run bash -c 'echo f | timeout 15s "$FLUVIO_BIN" produce -m "$REMOTE_NAME_2" "$REVERSE_TOPIC_NAME"' assert_success } @@ -253,6 +257,10 @@ setup_file() { run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d assert_output 5$'\n'e$'\n'6$'\n'f assert_success + + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" --mirror "$REMOTE_NAME_2" -B -d + assert_output 5$'\n'e$'\n'6$'\n'f + assert_success } @test "Can't delete mirror topic from remote 2" { @@ -283,6 +291,14 @@ setup_file() { run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 0 -B -d assert_output 3$'\n'c$'\n'4$'\n'd assert_success + + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" -p 1 -B -d + assert_output 5$'\n'e$'\n'6$'\n'f + assert_success + + run timeout 15s "$FLUVIO_BIN" consume "$REVERSE_TOPIC_NAME" --mirror "$HOME" -B -d + assert_output 3$'\n'c$'\n'4$'\n'd$'\n'5$'\n'e$'\n'6$'\n'f + assert_success } @test "Can consume message from mirror topic produced from remote 1 by partition or remote" {