Skip to content

Commit

Permalink
Refactor PubsubIO Lineage metrics to work with all runners (#32319)
Browse files Browse the repository at this point in the history
* Move Lineage report outside of PubsubUnboundedSource

* Move Lineage report outside of PubsubUnboundedSink
  • Loading branch information
Abacn authored Aug 27, 2024
1 parent d1157d8 commit 857ecce
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ public static Set<String> query(MetricResults results, Type type) {
.build();
Set<String> result = new HashSet<>();
for (MetricResult<StringSetResult> metrics : results.queryMetrics(filter).getStringSets()) {
result.addAll(metrics.getCommitted().getStringSet());
try {
result.addAll(metrics.getCommitted().getStringSet());
} catch (UnsupportedOperationException unused) {
// MetricsResult.getCommitted throws this exception when runner support missing, just skip.
}
result.addAll(metrics.getAttempted().getStringSet());
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
Expand All @@ -43,6 +44,8 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>

private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;
/** Last TopicPath that reported Lineage. */
private transient @Nullable String reportedLineage;

private final BadRecordRouter badRecordRouter;

Expand Down Expand Up @@ -165,6 +168,13 @@ public void process(
return;
}
}
String topic = message.getTopic();
// topic shouldn't be null, but lineage report is fail-safe
if (topic != null && !topic.equals(reportedLineage)) {
Lineage.getSinks()
.add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments());
reportedLineage = topic;
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -1148,6 +1149,20 @@ public PCollection<T> expand(PBegin input) {
getNeedsOrderingKey());

PCollection<PubsubMessage> preParse = input.apply(source);
return expandReadContinued(preParse, topicPath, subscriptionPath);
}

/**
* Runner agnostic part of the Expansion.
*
* <p>Common logics (MapElements, SDK metrics, DLQ, etc) live here as PubsubUnboundedSource is
* overridden on Dataflow runner.
*/
private PCollection<T> expandReadContinued(
PCollection<PubsubMessage> preParse,
@Nullable ValueProvider<TopicPath> topicPath,
@Nullable ValueProvider<SubscriptionPath> subscriptionPath) {

TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
PCollection<T> read;
if (getDeadLetterTopicProvider() == null
Expand All @@ -1174,7 +1189,7 @@ public PCollection<T> expand(PBegin input) {
"Map Failures To BadRecords",
ParDo.of(new ParseReadFailuresToBadRecords(preParse.getCoder())));
getBadRecordErrorHandler()
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline())));
.addErrorCollection(badRecords.setCoder(BadRecord.getCoder(preParse.getPipeline())));
} else {
// Write out failures to the provided dead-letter topic.
result
Expand Down Expand Up @@ -1215,7 +1230,31 @@ public PCollection<T> expand(PBegin input) {
.withClientFactory(getPubsubClientFactory()));
}
}

// report Lineage once
preParse
.getPipeline()
.apply(Impulse.create())
.apply(
ParDo.of(
new DoFn<byte[], Void>() {
@ProcessElement
public void process() {
if (topicPath != null) {
TopicPath topic = topicPath.get();
if (topic != null) {
Lineage.getSources()
.add("pubsub", "topic", topic.getDataCatalogSegments());
}
}
if (subscriptionPath != null) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
Lineage.getSources()
.add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
}));
return read.setCoder(getCoder());
}

Expand Down Expand Up @@ -1623,10 +1662,6 @@ public void finishBundle() throws IOException {
for (Map.Entry<PubsubTopic, OutgoingData> entry : output.entrySet()) {
publish(entry.getKey(), entry.getValue().messages);
}
// Report lineage for all topics seen
for (PubsubTopic topic : output.keySet()) {
Lineage.getSinks().add("pubsub", "topic", topic.dataCatalogSegments());
}
output = null;
pubsubClient.close();
pubsubClient = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.PubsubClientFactory;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -232,9 +231,6 @@ private static class WriterFn extends DoFn<Iterable<OutgoingMessage>, Void> {
/** Client on which to talk to Pubsub. Null until created by {@link #startBundle}. */
private transient @Nullable PubsubClient pubsubClient;

/** Last TopicPath that reported Lineage. */
private transient @Nullable TopicPath reportedLineage;

private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
private final Counter elementCounter = SinkMetrics.elementsWritten();
private final Counter byteCounter = SinkMetrics.bytesWritten();
Expand Down Expand Up @@ -294,14 +290,6 @@ private void publishBatch(List<OutgoingMessage> messages, int bytes) throws IOEx
batchCounter.inc();
elementCounter.inc(messages.size());
byteCounter.inc(bytes);
// Report Lineage multiple once for same topic
if (!topicPath.equals(reportedLineage)) {
List<String> segments = topicPath.getDataCatalogSegments();
if (segments.size() != 0) {
Lineage.getSinks().add("pubsub", "topic", segments);
}
reportedLineage = topicPath;
}
}

@StartBundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages.DeserializeBytesIntoPubsubMessagePayloadOnly;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -1042,19 +1041,6 @@ public List<PubsubSource> split(int desiredNumSplits, PipelineOptions options)
splitSource =
new PubsubSource(
outer, StaticValueProvider.of(outer.createRandomSubscription(options)));
TopicPath topic = outer.getTopic();
if (topic != null) {
// is initial split on Read.fromTopic, report Lineage based on topic
Lineage.getSources().add("pubsub", "source", topic.getDataCatalogSegments());
}
} else {
if (subscriptionPath.equals(outer.getSubscriptionProvider())) {
SubscriptionPath sub = subscriptionPath.get();
if (sub != null) {
// is a split on Read.fromSubscription
Lineage.getSources().add("pubsub", "subscription", sub.getDataCatalogSegments());
}
}
}
for (int i = 0; i < desiredNumSplits * SCALE_OUT; i++) {
// Since the source is immutable and Pubsub automatically shards we simply
Expand Down

0 comments on commit 857ecce

Please sign in to comment.