Skip to content

Commit

Permalink
Merge pull request #28782 [YAML] Java PubSubIO
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Oct 17, 2023
2 parents 2a764dd + f12f529 commit ef97c4d
Show file tree
Hide file tree
Showing 6 changed files with 487 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -786,12 +786,12 @@ public FieldValueBuilder withFieldValues(Map<String, Object> values) {
// withFieldValue or
// withFieldValues.

public Builder addValue(@Nullable Object values) {
this.values.add(values);
public Builder addValue(@Nullable Object value) {
this.values.add(value);
return this;
}

public Builder addValues(List<Object> values) {
public Builder addValues(List<@Nullable Object> values) {
this.values.addAll(values);
return this;
}
Expand Down Expand Up @@ -822,7 +822,7 @@ public <T> Builder addIterable(Iterable<T> values) {
// method is largely
// used internal to Beam.
@Internal
public Row attachValues(List<Object> attachedValues) {
public Row attachValues(List<@Nullable Object> attachedValues) {
checkState(this.values.isEmpty());
return new RowWithStorage(schema, attachedValues);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.client.util.Clock;
import com.google.auto.value.AutoValue;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand Down Expand Up @@ -60,12 +61,61 @@ public abstract class PubsubReadSchemaTransformConfiguration {
+ "For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).")
public abstract String getSchema();

@SchemaFieldDescription(
"Any additional pubsub attributes that should be populated as String fields in the ouptut rows.")
public abstract @Nullable List<String> getAttributes();

@SchemaFieldDescription(
"Any additional field that should be populated with the full set of PubSub attributes.")
public abstract @Nullable String getAttributesMap();

@SchemaFieldDescription(
"When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message attributes, "
+ "specifies the name of the attribute containing the unique identifier. "
+ "The value of the attribute can be any string that uniquely identifies this record. "
+ "Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. "
+ "If idAttribute is not provided, Beam cannot guarantee that no duplicate data will be delivered, "
+ "and deduplication of the stream will be strictly best effort.")
public abstract @Nullable String getIdAttribute();

@SchemaFieldDescription(
"Specifies the name of the attribute that contains the timestamp, if any. "
+ "The timestamp value is expected to be represented in the attribute as either "
+ "(1) a numerical value representing the number of milliseconds since the Unix epoch. "
+ "For example, if using the Joda time classes, "
+ "Instant.getMillis() returns the correct value for this attribute."
+ " or (2) a String in RFC 3339 format. For example, 2015-10-29T23:41:41.123Z. "
+ "The sub-second component of the timestamp is optional, and digits beyond the first three "
+ "(i.e., time units smaller than milliseconds) will be ignored.")
public abstract @Nullable String getTimestampAttribute();

@SchemaFieldDescription("Specifies how to handle errors.")
public abstract @Nullable ErrorHandling getErrorHandling();

// Used for testing only.
public abstract @Nullable PubsubTestClientFactory getClientFactory();

// Used for testing only.
public abstract @Nullable Clock getClock();

@AutoValue
public abstract static class ErrorHandling {
@SchemaFieldDescription("The name of the output PCollection containing failed reads.")
public abstract String getOutput();

public static PubsubReadSchemaTransformConfiguration.ErrorHandling.Builder builder() {
return new AutoValue_PubsubReadSchemaTransformConfiguration_ErrorHandling.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract PubsubReadSchemaTransformConfiguration.ErrorHandling.Builder setOutput(
String output);

public abstract PubsubReadSchemaTransformConfiguration.ErrorHandling build();
}
}

public static Builder builder() {
return new AutoValue_PubsubReadSchemaTransformConfiguration.Builder();
}
Expand All @@ -80,6 +130,16 @@ public abstract static class Builder {

public abstract Builder setSchema(String schema);

public abstract Builder setAttributes(@Nullable List<String> attributes);

public abstract Builder setAttributesMap(@Nullable String attributesMap);

public abstract Builder setIdAttribute(@Nullable String schema);

public abstract Builder setTimestampAttribute(@Nullable String schema);

public abstract Builder setErrorHandling(@Nullable ErrorHandling errorHandling);

// Used for testing only.
public abstract Builder setClientFactory(@Nullable PubsubTestClientFactory clientFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
import org.apache.beam.sdk.metrics.Counter;
Expand All @@ -35,8 +34,6 @@
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
Expand All @@ -63,7 +60,7 @@
public class PubsubReadSchemaTransformProvider
extends TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {

public static final String VALID_FORMATS_STR = "AVRO,JSON";
public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
public static final Set<String> VALID_DATA_FORMATS =
Sets.newHashSet(VALID_FORMATS_STR.split(","));

Expand All @@ -89,38 +86,43 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration
"To read from Pubsub, a subscription name or a topic name must be provided. Not both.");
}

if ((Strings.isNullOrEmpty(configuration.getSchema())
&& !Strings.isNullOrEmpty(configuration.getFormat()))
|| (!Strings.isNullOrEmpty(configuration.getSchema())
&& Strings.isNullOrEmpty(configuration.getFormat()))) {
throw new IllegalArgumentException(
"A schema was provided without a data format (or viceversa). Please provide "
+ "both of these parameters to read from Pubsub, or if you would like to use the Pubsub schema service,"
+ " please leave both of these blank.");
if (!"RAW".equals(configuration.getFormat())) {
if ((Strings.isNullOrEmpty(configuration.getSchema())
&& !Strings.isNullOrEmpty(configuration.getFormat()))
|| (!Strings.isNullOrEmpty(configuration.getSchema())
&& Strings.isNullOrEmpty(configuration.getFormat()))) {
throw new IllegalArgumentException(
"A schema was provided without a data format (or viceversa). Please provide "
+ "both of these parameters to read from Pubsub, or if you would like to use the Pubsub schema service,"
+ " please leave both of these blank.");
}
}

Schema beamSchema;
SerializableFunction<byte[], Row> valueMapper;
Schema payloadSchema;
SerializableFunction<byte[], Row> payloadMapper;

if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
String format =
configuration.getFormat() == null ? null : configuration.getFormat().toUpperCase();
if ("RAW".equals(format)) {
payloadSchema = Schema.of(Schema.Field.of("payload", Schema.FieldType.BYTES));
payloadMapper = input -> Row.withSchema(payloadSchema).addValue(input).build();
} else if ("JSON".equals(format)) {
payloadSchema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());
payloadMapper = JsonUtils.getJsonBytesToRowFunction(payloadSchema);
} else if ("AVRO".equals(format)) {
payloadSchema =
AvroUtils.toBeamSchema(
new org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
payloadMapper = AvroUtils.getAvroBytesToRowFunction(payloadSchema);
} else {
throw new IllegalArgumentException(
String.format(
"Format %s not supported. Only supported formats are %s",
configuration.getFormat(), VALID_FORMATS_STR));
}
beamSchema =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema())
: AvroUtils.toBeamSchema(
new org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
valueMapper =
Objects.equals(configuration.getFormat(), "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);

PubsubReadSchemaTransform transform =
new PubsubReadSchemaTransform(
configuration.getTopic(), configuration.getSubscription(), beamSchema, valueMapper);
new PubsubReadSchemaTransform(configuration, payloadSchema, payloadMapper);

if (configuration.getClientFactory() != null) {
transform.setClientFactory(configuration.getClientFactory());
Expand All @@ -135,45 +137,101 @@ public SchemaTransform from(PubsubReadSchemaTransformConfiguration configuration
private static class PubsubReadSchemaTransform extends SchemaTransform implements Serializable {
final Schema beamSchema;
final SerializableFunction<byte[], Row> valueMapper;
final @Nullable String topic;
final @Nullable String subscription;
final PubsubReadSchemaTransformConfiguration configuration;
@Nullable PubsubTestClientFactory clientFactory;
@Nullable Clock clock;

PubsubReadSchemaTransform(
@Nullable String topic,
@Nullable String subscription,
Schema beamSchema,
PubsubReadSchemaTransformConfiguration configuration,
Schema payloadSchema,
SerializableFunction<byte[], Row> valueMapper) {
this.topic = topic;
this.subscription = subscription;
this.beamSchema = beamSchema;
this.configuration = configuration;
Schema outputSchema;
List<String> attributes = configuration.getAttributes();
String attributesMap = configuration.getAttributesMap();
if (attributes == null && attributesMap == null) {
outputSchema = payloadSchema;
} else {
Schema.Builder outputSchemaBuilder = Schema.builder();
outputSchemaBuilder.addFields(payloadSchema.getFields());
if (attributes != null) {
for (String attribute : attributes) {
outputSchemaBuilder.addStringField(attribute);
}
}
if (attributesMap != null) {
outputSchemaBuilder.addMapField(
attributesMap, Schema.FieldType.STRING, Schema.FieldType.STRING);
}
outputSchema = outputSchemaBuilder.build();
}
this.beamSchema = outputSchema;
this.valueMapper = valueMapper;
}

private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
private Counter pubsubErrorCounter;
private final Counter pubsubErrorCounter;
private Long errorsInBundle = 0L;
private SerializableFunction<byte[], Row> valueMapper;
private final SerializableFunction<byte[], Row> valueMapper;
private final @Nullable List<String> attributes;
private final @Nullable String attributesMap;
private final Schema outputSchema;

final boolean useErrorOutput;

ErrorCounterFn(String name, SerializableFunction<byte[], Row> valueMapper) {
ErrorCounterFn(
String name,
SerializableFunction<byte[], Row> valueMapper,
@Nullable List<String> attributes,
@Nullable String attributesMap,
Schema outputSchema,
boolean useErrorOutput) {
this.pubsubErrorCounter = Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
this.valueMapper = valueMapper;
this.attributes = attributes;
this.attributesMap = attributesMap;
this.outputSchema = outputSchema;
this.useErrorOutput = useErrorOutput;
}

@ProcessElement
public void process(@DoFn.Element PubsubMessage message, MultiOutputReceiver receiver) {
public void process(@DoFn.Element PubsubMessage message, MultiOutputReceiver receiver)
throws Exception {

try {
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
Row payloadRow = valueMapper.apply(message.getPayload());
Row outputRow;
if (attributes == null && attributesMap == null) {
outputRow = payloadRow;
} else {
Row.Builder rowBuilder = Row.withSchema(outputSchema);
List<@Nullable Object> payloadValues = payloadRow.getValues();
if (payloadValues != null) {
rowBuilder.addValues(payloadValues);
}
if (attributes != null) {
for (String attribute : attributes) {
rowBuilder.addValue(message.getAttribute(attribute));
}
}
if (attributesMap != null) {
rowBuilder.addValue(message.getAttributeMap());
}
outputRow = rowBuilder.build();
}
receiver.get(OUTPUT_TAG).output(outputRow);
} catch (Exception e) {
errorsInBundle += 1;
receiver
.get(ERROR_TAG)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(e.toString(), message.getPayload())
.build());
if (useErrorOutput) {
receiver
.get(ERROR_TAG)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(e.toString(), message.getPayload())
.build());
} else {
throw e;
}
}
}

Expand All @@ -194,11 +252,14 @@ void setClock(@Nullable Clock clock) {

@SuppressWarnings("nullness")
PubsubIO.Read<PubsubMessage> buildPubsubRead() {
PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages();
if (!Strings.isNullOrEmpty(topic)) {
pubsubRead = pubsubRead.fromTopic(topic);
PubsubIO.Read<PubsubMessage> pubsubRead =
(configuration.getAttributes() == null && configuration.getAttributesMap() == null)
? PubsubIO.readMessages()
: PubsubIO.readMessagesWithAttributes();
if (!Strings.isNullOrEmpty(configuration.getTopic())) {
pubsubRead = pubsubRead.fromTopic(configuration.getTopic());
} else {
pubsubRead = pubsubRead.fromSubscription(subscription);
pubsubRead = pubsubRead.fromSubscription(configuration.getSubscription());
}
if (clientFactory != null && clock != null) {
pubsubRead = pubsubRead.withClientFactory(clientFactory);
Expand All @@ -207,26 +268,47 @@ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
throw new IllegalArgumentException(
"Both PubsubTestClientFactory and Clock need to be specified for testing, but only one is provided");
}
if (!Strings.isNullOrEmpty(configuration.getIdAttribute())) {
pubsubRead = pubsubRead.withIdAttribute(configuration.getIdAttribute());
}
if (!Strings.isNullOrEmpty(configuration.getTimestampAttribute())) {
pubsubRead = pubsubRead.withTimestampAttribute(configuration.getTimestampAttribute());
}
return pubsubRead;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead();
@SuppressWarnings("nullness")
String errorOutput =
configuration.getErrorHandling() == null
? null
: configuration.getErrorHandling().getOutput();

PCollectionTuple outputTuple =
input
.getPipeline()
.apply(pubsubRead)
.apply(
ParDo.of(new ErrorCounterFn("PubSub-read-error-counter", valueMapper))
ParDo.of(
new ErrorCounterFn(
"PubSub-read-error-counter",
valueMapper,
configuration.getAttributes(),
configuration.getAttributesMap(),
beamSchema,
errorOutput != null))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema);
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA);

return PCollectionRowTuple.of(
"output",
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
"errors",
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
PCollectionRowTuple result = PCollectionRowTuple.of("output", outputTuple.get(OUTPUT_TAG));
if (errorOutput == null) {
return result;
} else {
return result.and(errorOutput, outputTuple.get(ERROR_TAG));
}
}
}

Expand Down
Loading

0 comments on commit ef97c4d

Please sign in to comment.