topicPartitions) {
checkState(
- getTopics() == null || getTopics().isEmpty(),
- "Only topics or topicPartitions can be set, not both");
+ (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
+ "Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}
+ /**
+ * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
+ * from each of the matching topics are read.
+ *
+ * See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
+ * partitions are distributed among the splits.
+ */
+ public Read withTopicPattern(String topicPattern) {
+ checkState(
+ (getTopics() == null || getTopics().isEmpty())
+ && (getTopicPartitions() == null || getTopicPartitions().isEmpty()),
+ "Only one of topics, topicPartitions or topicPattern can be set");
+ return toBuilder().setTopicPattern(Pattern.compile(topicPattern)).build();
+ }
+
/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
@@ -1274,8 +1297,9 @@ public PCollection> expand(PBegin input) {
if (!isDynamicRead()) {
checkArgument(
(getTopics() != null && getTopics().size() > 0)
- || (getTopicPartitions() != null && getTopicPartitions().size() > 0),
- "Either withTopic(), withTopics() or withTopicPartitions() is required");
+ || (getTopicPartitions() != null && getTopicPartitions().size() > 0)
+ || getTopicPattern() != null,
+ "Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required");
} else {
checkArgument(
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
@@ -1537,6 +1561,7 @@ public PCollection> expand(PBegin input) {
kafkaRead.getConsumerConfig(),
kafkaRead.getCheckStopReadingFn(),
topics,
+ kafkaRead.getTopicPattern(),
kafkaRead.getStartReadTime(),
kafkaRead.getStopReadTime()));
} else {
@@ -1561,6 +1586,7 @@ static class GenerateKafkaSourceDescriptor extends DoFn topics;
+ private final @Nullable Pattern topicPattern;
+
@ProcessElement
public void processElement(OutputReceiver receiver) {
List partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
- for (String topic : Preconditions.checkStateNotNull(topics)) {
- for (PartitionInfo p : consumer.partitionsFor(topic)) {
- partitions.add(new TopicPartition(p.topic(), p.partition()));
+ List topics = Preconditions.checkStateNotNull(this.topics);
+ if (topics.isEmpty()) {
+ Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
+ for (Map.Entry> entry :
+ consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo p : entry.getValue()) {
+ partitions.add(new TopicPartition(p.topic(), p.partition()));
+ }
+ }
+ }
+ } else {
+ for (String topic : topics) {
+ for (PartitionInfo p : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(p.topic(), p.partition()));
+ }
}
}
}
@@ -1634,12 +1675,16 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
List topics = Preconditions.checkStateNotNull(getTopics());
List topicPartitions = Preconditions.checkStateNotNull(getTopicPartitions());
+ Pattern topicPattern = getTopicPattern();
if (topics.size() > 0) {
builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
} else if (topicPartitions.size() > 0) {
builder.add(
DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
.withLabel("Topic Partition/s"));
+ } else if (topicPattern != null) {
+ builder.add(
+ DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern"));
}
Set disallowedConsumerPropertiesKeys =
KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index 1db28d2a3e6f..1c974f3bcc1e 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -79,6 +79,7 @@ enum KafkaIOReadProperties {
CONSUMER_CONFIG,
TOPICS,
TOPIC_PARTITIONS,
+ TOPIC_PATTERN,
KEY_CODER,
VALUE_CODER,
CONSUMER_FACTORY_FN,
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 4af13bbf4749..bfb375b5eec9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -65,14 +67,26 @@ public List> split(int desiredNumSplits, PipelineOpti
if (partitions.isEmpty()) {
try (Consumer, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
- for (String topic : Preconditions.checkStateNotNull(spec.getTopics())) {
- List partitionInfoList = consumer.partitionsFor(topic);
- checkState(
- partitionInfoList != null,
- "Could not find any partitions info. Please check Kafka configuration and make sure "
- + "that provided topics exist.");
- for (PartitionInfo p : partitionInfoList) {
- partitions.add(new TopicPartition(p.topic(), p.partition()));
+ List topics = Preconditions.checkStateNotNull(spec.getTopics());
+ if (topics.isEmpty()) {
+ Pattern pattern = Preconditions.checkStateNotNull(spec.getTopicPattern());
+ for (Map.Entry> entry : consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo p : entry.getValue()) {
+ partitions.add(new TopicPartition(p.topic(), p.partition()));
+ }
+ }
+ }
+ } else {
+ for (String topic : topics) {
+ List partitionInfoList = consumer.partitionsFor(topic);
+ checkState(
+ partitionInfoList != null,
+ "Could not find any partitions info. Please check Kafka configuration and make sure "
+ + "that provided topics exist.");
+ for (PartitionInfo p : partitionInfoList) {
+ partitions.add(new TopicPartition(p.topic(), p.partition()));
+ }
}
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
index 21fec1bd27b1..6a25257bdac3 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.regex.Pattern;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
@@ -64,6 +65,7 @@ class WatchForKafkaTopicPartitions extends PTransform kafkaConsumerConfig;
private final @Nullable SerializableFunction checkStopReadingFn;
private final Set topics;
+ private final @Nullable Pattern topicPattern;
private final @Nullable Instant startReadTime;
private final @Nullable Instant stopReadTime;
@@ -73,6 +75,7 @@ public WatchForKafkaTopicPartitions(
Map kafkaConsumerConfig,
@Nullable SerializableFunction checkStopReadingFn,
Set topics,
+ @Nullable Pattern topicPattern,
@Nullable Instant startReadTime,
@Nullable Instant stopReadTime) {
this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
@@ -80,6 +83,7 @@ public WatchForKafkaTopicPartitions(
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.checkStopReadingFn = checkStopReadingFn;
this.topics = topics;
+ this.topicPattern = topicPattern;
this.startReadTime = startReadTime;
this.stopReadTime = stopReadTime;
}
@@ -91,7 +95,8 @@ public PCollection expand(PBegin input) {
.apply(
"Match new TopicPartitions",
Watch.growthOf(
- new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
+ new WatchPartitionFn(
+ kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern))
.withPollInterval(checkDuration))
.apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
}
@@ -134,14 +139,17 @@ private static class WatchPartitionFn extends PollFn {
kafkaConsumerFactoryFn;
private final Map kafkaConsumerConfig;
private final Set topics;
+ private final @Nullable Pattern topicPattern;
private WatchPartitionFn(
SerializableFunction