diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index daa3ff239280f..3b1ee1163e4e7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; @@ -350,10 +351,11 @@ * href="https://beam.apache.org/blog/splittable-do-fn/">blog post and design doc. The major difference from {@link * KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link - * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link - * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the - * pipeline can populate these source descriptions during runtime. For example, the pipeline can - * query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}. + * KafkaIO.Read#getTopicPattern()}, {@link KafkaIO.Read#getTopicPartitions()}, {@link + * KafkaIO.Read#getTopics()}, {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline + * construction time. Instead, the pipeline can populate these source descriptions during runtime. + * For example, the pipeline can query Kafka topics from a BigQuery table and read these topics via + * {@link ReadSourceDescriptors}. * *

Common Kafka Consumer Configurations

* @@ -633,6 +635,9 @@ public abstract static class Read @Pure abstract @Nullable List getTopicPartitions(); + @Pure + abstract @Nullable Pattern getTopicPattern(); + @Pure abstract @Nullable Coder getKeyCoder(); @@ -692,6 +697,8 @@ abstract static class Builder { abstract Builder setTopicPartitions(List topicPartitions); + abstract Builder setTopicPattern(Pattern topicPattern); + abstract Builder setKeyCoder(Coder keyCoder); abstract Builder setValueCoder(Coder valueCoder); @@ -922,8 +929,9 @@ public Read withTopic(String topic) { */ public Read withTopics(List topics) { checkState( - getTopicPartitions() == null || getTopicPartitions().isEmpty(), - "Only topics or topicPartitions can be set, not both"); + (getTopicPartitions() == null || getTopicPartitions().isEmpty()) + && getTopicPattern() == null, + "Only one of topics, topicPartitions or topicPattern can be set"); return toBuilder().setTopics(ImmutableList.copyOf(topics)).build(); } @@ -936,11 +944,26 @@ public Read withTopics(List topics) { */ public Read withTopicPartitions(List topicPartitions) { checkState( - getTopics() == null || getTopics().isEmpty(), - "Only topics or topicPartitions can be set, not both"); + (getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null, + "Only one of topics, topicPartitions or topicPattern can be set"); return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build(); } + /** + * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions + * from each of the matching topics are read. + * + *

See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the + * partitions are distributed among the splits. + */ + public Read withTopicPattern(String topicPattern) { + checkState( + (getTopics() == null || getTopics().isEmpty()) + && (getTopicPartitions() == null || getTopicPartitions().isEmpty()), + "Only one of topics, topicPartitions or topicPattern can be set"); + return toBuilder().setTopicPattern(Pattern.compile(topicPattern)).build(); + } + /** * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. * @@ -1274,8 +1297,9 @@ public PCollection> expand(PBegin input) { if (!isDynamicRead()) { checkArgument( (getTopics() != null && getTopics().size() > 0) - || (getTopicPartitions() != null && getTopicPartitions().size() > 0), - "Either withTopic(), withTopics() or withTopicPartitions() is required"); + || (getTopicPartitions() != null && getTopicPartitions().size() > 0) + || getTopicPattern() != null, + "Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required"); } else { checkArgument( ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"), @@ -1537,6 +1561,7 @@ public PCollection> expand(PBegin input) { kafkaRead.getConsumerConfig(), kafkaRead.getCheckStopReadingFn(), topics, + kafkaRead.getTopicPattern(), kafkaRead.getStartReadTime(), kafkaRead.getStopReadTime())); } else { @@ -1561,6 +1586,7 @@ static class GenerateKafkaSourceDescriptor extends DoFn topics; + private final @Nullable Pattern topicPattern; + @ProcessElement public void processElement(OutputReceiver receiver) { List partitions = new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions)); if (partitions.isEmpty()) { try (Consumer consumer = consumerFactoryFn.apply(consumerConfig)) { - for (String topic : Preconditions.checkStateNotNull(topics)) { - for (PartitionInfo p : consumer.partitionsFor(topic)) { - partitions.add(new TopicPartition(p.topic(), p.partition())); + List topics = Preconditions.checkStateNotNull(this.topics); + if (topics.isEmpty()) { + Pattern pattern = Preconditions.checkStateNotNull(topicPattern); + for (Map.Entry> entry : + consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo p : entry.getValue()) { + partitions.add(new TopicPartition(p.topic(), p.partition())); + } + } + } + } else { + for (String topic : topics) { + for (PartitionInfo p : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(p.topic(), p.partition())); + } } } } @@ -1634,12 +1675,16 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); List topics = Preconditions.checkStateNotNull(getTopics()); List topicPartitions = Preconditions.checkStateNotNull(getTopicPartitions()); + Pattern topicPattern = getTopicPattern(); if (topics.size() > 0) { builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s")); } else if (topicPartitions.size() > 0) { builder.add( DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions)) .withLabel("Topic Partition/s")); + } else if (topicPattern != null) { + builder.add( + DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern")); } Set disallowedConsumerPropertiesKeys = KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet(); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 1db28d2a3e6f0..1c974f3bcc1e6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -79,6 +79,7 @@ enum KafkaIOReadProperties { CONSUMER_CONFIG, TOPICS, TOPIC_PARTITIONS, + TOPIC_PATTERN, KEY_CODER, VALUE_CODER, CONSUMER_FACTORY_FN, diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java index 4af13bbf4749d..bfb375b5eec90 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.UnboundedSource; @@ -65,14 +67,26 @@ public List> split(int desiredNumSplits, PipelineOpti if (partitions.isEmpty()) { try (Consumer consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) { - for (String topic : Preconditions.checkStateNotNull(spec.getTopics())) { - List partitionInfoList = consumer.partitionsFor(topic); - checkState( - partitionInfoList != null, - "Could not find any partitions info. Please check Kafka configuration and make sure " - + "that provided topics exist."); - for (PartitionInfo p : partitionInfoList) { - partitions.add(new TopicPartition(p.topic(), p.partition())); + List topics = Preconditions.checkStateNotNull(spec.getTopics()); + if (topics.isEmpty()) { + Pattern pattern = Preconditions.checkStateNotNull(spec.getTopicPattern()); + for (Map.Entry> entry : consumer.listTopics().entrySet()) { + if (pattern.matcher(entry.getKey()).matches()) { + for (PartitionInfo p : entry.getValue()) { + partitions.add(new TopicPartition(p.topic(), p.partition())); + } + } + } + } else { + for (String topic : topics) { + List partitionInfoList = consumer.partitionsFor(topic); + checkState( + partitionInfoList != null, + "Could not find any partitions info. Please check Kafka configuration and make sure " + + "that provided topics exist."); + for (PartitionInfo p : partitionInfoList) { + partitions.add(new TopicPartition(p.topic(), p.partition())); + } } } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java index 21fec1bd27b1c..6a25257bdac38 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.regex.Pattern; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; @@ -64,6 +65,7 @@ class WatchForKafkaTopicPartitions extends PTransform kafkaConsumerConfig; private final @Nullable SerializableFunction checkStopReadingFn; private final Set topics; + private final @Nullable Pattern topicPattern; private final @Nullable Instant startReadTime; private final @Nullable Instant stopReadTime; @@ -73,6 +75,7 @@ public WatchForKafkaTopicPartitions( Map kafkaConsumerConfig, @Nullable SerializableFunction checkStopReadingFn, Set topics, + @Nullable Pattern topicPattern, @Nullable Instant startReadTime, @Nullable Instant stopReadTime) { this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION); @@ -80,6 +83,7 @@ public WatchForKafkaTopicPartitions( this.kafkaConsumerConfig = kafkaConsumerConfig; this.checkStopReadingFn = checkStopReadingFn; this.topics = topics; + this.topicPattern = topicPattern; this.startReadTime = startReadTime; this.stopReadTime = stopReadTime; } @@ -91,7 +95,8 @@ public PCollection expand(PBegin input) { .apply( "Match new TopicPartitions", Watch.growthOf( - new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics)) + new WatchPartitionFn( + kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern)) .withPollInterval(checkDuration)) .apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime))); } @@ -134,14 +139,17 @@ private static class WatchPartitionFn extends PollFn { kafkaConsumerFactoryFn; private final Map kafkaConsumerConfig; private final Set topics; + private final @Nullable Pattern topicPattern; private WatchPartitionFn( SerializableFunction, Consumer> kafkaConsumerFactoryFn, Map kafkaConsumerConfig, - Set topics) { + Set topics, + @Nullable Pattern topicPattern) { this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn; this.kafkaConsumerConfig = kafkaConsumerConfig; this.topics = topics; + this.topicPattern = topicPattern; } @Override @@ -149,7 +157,9 @@ public Watch.Growth.PollResult apply(byte[] element, Context c) throws Exception { Instant now = Instant.now(); return Watch.Growth.PollResult.incomplete( - now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics)) + now, + getAllTopicPartitions( + kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern)) .withWatermark(now); } } @@ -158,7 +168,8 @@ now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics)) static List getAllTopicPartitions( SerializableFunction, Consumer> kafkaConsumerFactoryFn, Map kafkaConsumerConfig, - Set topics) { + Set topics, + @Nullable Pattern topicPattern) { List current = new ArrayList<>(); try (Consumer kafkaConsumer = kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) { @@ -168,12 +179,13 @@ static List getAllTopicPartitions( current.add(new TopicPartition(topic, partition.partition())); } } - } else { for (Map.Entry> topicInfo : kafkaConsumer.listTopics().entrySet()) { - for (PartitionInfo partition : topicInfo.getValue()) { - current.add(new TopicPartition(topicInfo.getKey(), partition.partition())); + if (topicPattern == null || topicPattern.matcher(topicInfo.getKey()).matches()) { + for (PartitionInfo partition : topicInfo.getValue()) { + current.add(new TopicPartition(partition.topic(), partition.partition())); + } } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index d170b2a237b25..577dbaf9827de 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -723,6 +723,87 @@ public void testUnboundedSourceWithExplicitPartitions() { p.run(); } + @Test + public void testUnboundedSourceWithPattern() { + int numElements = 1000; + + List topics = + ImmutableList.of( + "best", "gest", "hest", "jest", "lest", "nest", "pest", "rest", "test", "vest", "west", + "zest"); + + KafkaIO.Read reader = + KafkaIO.read() + .withBootstrapServers("none") + .withTopicPattern("[a-z]est") + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withMaxNumRecords(numElements); + + PCollection input = p.apply(reader.withoutMetadata()).apply(Values.create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + public void testUnboundedSourceWithPartiallyMatchedPattern() { + int numElements = 1000; + long numMatchedElements = numElements / 2; // Expected elements if split across 2 topics + + List topics = ImmutableList.of("test", "Test"); + + KafkaIO.Read reader = + KafkaIO.read() + .withBootstrapServers("none") + .withTopicPattern("[a-z]est") + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 1, numElements, OffsetResetStrategy.EARLIEST)) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withMaxNumRecords(numMatchedElements); + + PCollection input = p.apply(reader.withoutMetadata()).apply(Values.create()); + + // With 1 partition per topic element to partition allocation alternates between test and Test, + // producing even elements for test and odd elements for Test. + // The pattern only matches test, so we expect even elements. + PAssert.that(input).satisfies(new AssertMultipleOf(2)); + + PAssert.thatSingleton(input.apply("Count", Count.globally())).isEqualTo(numMatchedElements); + + p.run(); + } + + @Test + public void testUnboundedSourceWithUnmatchedPattern() { + // Expect an exception when provided pattern doesn't match any Kafka topics. + thrown.expect(PipelineExecutionException.class); + thrown.expectCause(instanceOf(IllegalStateException.class)); + thrown.expectMessage( + "Could not find any partitions. Please check Kafka configuration and topic names"); + + int numElements = 1000; + + List topics = ImmutableList.of("chest", "crest", "egest", "guest", "quest", "wrest"); + + KafkaIO.Read reader = + KafkaIO.read() + .withBootstrapServers("none") + .withTopicPattern("[a-z]est") + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer(LongDeserializer.class) + .withMaxNumRecords(numElements); + + p.apply(reader.withoutMetadata()).apply(Values.create()); + + p.run(); + } + @Test public void testUnboundedSourceWithWrongTopic() { // Expect an exception when provided Kafka topic doesn't exist. @@ -1829,6 +1910,25 @@ public void testSourceWithExplicitPartitionsDisplayData() { assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); } + @Test + public void testSourceWithPatternDisplayData() { + KafkaIO.Read read = + KafkaIO.readBytes() + .withBootstrapServers("myServer1:9092,myServer2:9092") + .withTopicPattern("[a-z]est") + .withConsumerFactoryFn( + new ConsumerFactoryFn( + Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topicPattern", "[a-z]est")); + assertThat(displayData, hasDisplayItem("enable.auto.commit", false)); + assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); + assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest")); + assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288)); + } + @Test public void testSinkDisplayData() { try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java index 78ec57728199c..25ca17ba91a99 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.util.Set; +import java.util.regex.Pattern; import org.apache.beam.sdk.io.kafka.KafkaMocks.PartitionGrowthMockConsumer; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SerializableMatcher; @@ -77,7 +78,8 @@ public void testGetAllTopicPartitions() throws Exception { new TopicPartition("topic1", 1), new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)), - WatchForKafkaTopicPartitions.getAllTopicPartitions((input) -> mockConsumer, null, null)); + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, null, null)); } @Test @@ -103,7 +105,47 @@ public void testGetAllTopicPartitionsWithGivenTopics() throws Exception { new TopicPartition("topic2", 0), new TopicPartition("topic2", 1)), WatchForKafkaTopicPartitions.getAllTopicPartitions( - (input) -> mockConsumer, null, givenTopics)); + (input) -> mockConsumer, null, givenTopics, null)); + } + + @Test + public void testGetAllTopicPartitionsWithGivenPattern() throws Exception { + Consumer mockConsumer = Mockito.mock(Consumer.class); + when(mockConsumer.listTopics()) + .thenReturn( + ImmutableMap.of( + "topic1", + ImmutableList.of( + new PartitionInfo("topic1", 0, null, null, null), + new PartitionInfo("topic1", 1, null, null, null)), + "topic2", + ImmutableList.of( + new PartitionInfo("topic2", 0, null, null, null), + new PartitionInfo("topic2", 1, null, null, null)), + "topicA", + ImmutableList.of( + new PartitionInfo("topicA", 0, null, null, null), + new PartitionInfo("topicA", 1, null, null, null)), + "topicB", + ImmutableList.of( + new PartitionInfo("topicB", 0, null, null, null), + new PartitionInfo("topicB", 1, null, null, null)))); + assertEquals( + ImmutableList.of( + new TopicPartition("topic1", 0), + new TopicPartition("topic1", 1), + new TopicPartition("topic2", 0), + new TopicPartition("topic2", 1)), + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, null, Pattern.compile("topic[0-9]"))); + assertEquals( + ImmutableList.of( + new TopicPartition("topicA", 0), + new TopicPartition("topicA", 1), + new TopicPartition("topicB", 0), + new TopicPartition("topicB", 1)), + WatchForKafkaTopicPartitions.getAllTopicPartitions( + (input) -> mockConsumer, null, null, Pattern.compile("topic[A-Z]"))); } @Test @@ -120,6 +162,7 @@ public void testPartitionSingle() { null, givenTopics, null, + null, null); PCollection descriptors = p.apply(watchForKafkaTopicPartitions); @@ -145,6 +188,7 @@ public void testPartitionGrowth() { null, givenTopics, null, + null, null); PCollection descriptors = p.apply(watchForKafkaTopicPartitions);