Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PubSubLiteSinkConnector which acts as a sink connector for Pub/Sub Lite #243

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 70 additions & 7 deletions kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ These instructions assume you are using [Maven](https://maven.apache.org/).

`mvn package`

The resulting jar is at target/cps-kafka-connector.jar.
The resulting jar is at target/pubsub-kafka-connector.jar.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention Lite in the Introduction paragraph?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


### Pre-Running Steps

Expand All @@ -53,12 +53,13 @@ The resulting jar is at target/cps-kafka-connector.jar.

### Running a Connector

1. Copy the cps-kafka-connector.jar to the place where you will run your Kafka
1. Copy the pubsub-kafka-connector.jar to the place where you will run your Kafka
connector.

2. Create a configuration file for the Cloud Pub/Sub connector and copy it to
the place where you will run Kafka connect. The configuration should set up
the proper Kafka topics, Cloud Pub/Sub topic, and Cloud Pub/Sub project.
2. Create a configuration file for the Pub/Sub connector and copy it to the
place where you will run Kafka connect. The configuration should set up the
proper Kafka topics, Pub/Sub topic, and Pub/Sub project. For Pub/Sub Lite,
this should also set the correct location (google cloud zone).
Sample configuration files for the source and sink connectors are provided
at configs/.

Expand All @@ -75,7 +76,7 @@ The resulting jar is at target/cps-kafka-connector.jar.

### CloudPubSubConnector Configs

In addition to the configs supplied by the Kafka Connect API, the Pubsub
In addition to the configs supplied by the Kafka Connect API, the Cloud Pub/Sub
Connector supports the following configs:

#### Source Connector
Expand Down Expand Up @@ -111,7 +112,22 @@ Connector supports the following configs:
| headers.publish | Boolean | false | When true, include any headers as attributes when a message is published to Cloud Pub/Sub. |
| orderingKeySource | String | none, key, partition | none | When set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. |

#### Schema Support and Data Model
### PubSubLiteConnector Configs

In addition to the configs supplied by the Kafka Connect API, the Pub/Sub Lite
Connector supports the following configs:

#### Sink Connector

| Config | Value Range | Default | Description |
|---------------|-------------|-----------------------|------------------------------------------------------------------------------------------------------------------------------------|
| pubsublite.topic | String | REQUIRED (No default) | The topic in Pub/Sub Lite to publish to, e.g. "foo" for topic "/projects/bar/locations/europe-south7-q/topics/foo". |
| pubsublite.project | String | REQUIRED (No default) | The project in Pub/Sub Lite containing the topic, e.g. "bar" from above. |
| pubsublite.location | String | REQUIRED (No default) | The location in Pub/Sub Lite containing the topic, e.g. "europe-south7-q" from above. |

### Schema Support and Data Model

#### Cloud Pub/Sub Connector

A pubsub message has two main parts: the message body and attributes. The
message body is a [ByteString](https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ByteString)
Expand Down Expand Up @@ -169,4 +185,51 @@ from a Pubsub message into a SourceRecord with a relevant Schema.
* In these cases, to carry forward the structure of data stored in
attributes, we recommend using a converter that can represent a struct
schema type in a useful way, e.g. JsonConverter.


#### Pub/Sub Lite Connector

Pub/Sub Lite's messages have the following structure:

```java
class Message {
ByteString key;
ByteString data;
ListMultimap<String, ByteString> attributes;
Optional<Timestamp> eventTime;
}
```

This maps quite closely to the SinkRecord class, except for serialization. The
table below shows how each field in SinkRecord will be mapped to the underlying
message:

| SinkRecord | Message |
|---|---|
| key{Schema} | key |
| value{Schema} | data |
| headers | attributes |
| topic | attributes["x-goog-pubsublite-source-kafka-topic"] |
| kafkaPartition | attributes["x-goog-pubsublite-source-kafka-partition"] |
| kafkaOffset | attributes["x-goog-pubsublite-source-kafka-offset"] |
| timestamp | eventTime |
| timestampType | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |

When a key, value or header value with a schema is encoded as a ByteString, the
following logic will be used:

- null schemas are treated as Schema.STRING_SCHEMA
- Top level BYTES payloads are unmodified.
- Top level STRING payloads are encoded using copyFromUtf8.
- Top level Integral payloads are converted using
copyFromUtf8(Long.toString(x.longValue()))
- Top level Floating point payloads are converted using
copyFromUtf8(Double.toString(x.doubleValue()))
- All other payloads are encoded into a protobuf Value, then converted to a ByteString.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may ultimately need to make this configurable. For people who don't use proto at all, they probably aren't going to want to have to convert to using it. In particular, generic protocol buffers like this can be a little hard to use, I think. Granted, the vast majority of people should probably just be using a BytesConverter that treats the payload as BYTES, which leaves the data unmodified. Maybe don't need to do anything about it now, but just something to keep in mind.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the vast majority of people should probably just be using a BytesConverter that treats the payload as BYTES

Yeah that would be best :/ But I think there should be way to handle a more complex schema, even if its a bit clunky.

We may ultimately need to make this configurable

Agreed. If someone opens a FR for it, we can definitely add this. I'm just not sure anyone will care, since they'll probably want the more compressed data.

For people who don't use proto at all, they probably aren't going to want to have to convert to using it

Its a bit easier by the fact that Pub/Sub Lite has no Push, so you have to have proto built in to use it at all. In addition, the protobuf json formatter special cases Value and Struct to get nice json as an output. But I agree, its not the most user friendly fallback, just a reasonable one that is also lossless.

This also provides us an easy config-only option if we get asked for it: if set, dump the Value as json instead of as bytes.

- Nested STRING fields are encoded into a protobuf Value.
- Nested BYTES fields are encoded to a protobuf Value holding the base64 encoded bytes.
- Nested Numeric fields are encoded as a double into a protobuf Value.
- Maps with Array, Map, or Struct keys are not supported.
- BYTES keys in maps are base64 encoded.
- Integral keys are converted using Long.toString(x.longValue())
- Floating point keys are converted using Double.toString(x.doubleValue())
7 changes: 7 additions & 0 deletions kafka-connector/config/pubsub-lite-sink-connector.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name=PubSubLiteSinkConnector
connector.class=com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector
tasks.max=10
topics=my-kafka-topic
pubsublite.project=my-project
pubsublite.location=europe-south7-q
pubsublite.topic=my-topic
17 changes: 14 additions & 3 deletions kafka-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<artifactId>google-cloud-pubsub</artifactId>
<version>1.108.3</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
Expand All @@ -68,6 +73,12 @@
<version>3.3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -113,7 +124,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
</transformers>
<finalName>cps-kafka-connector</finalName>
<finalName>pubsub-kafka-connector</finalName>
</configuration>
</execution>
</executions>
Expand All @@ -122,8 +133,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.google.pubsublite.kafka.sink;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

final class ConfigDefs {

private ConfigDefs() {
}

static final String PROJECT_FLAG = "pubsublite.project";
static final String LOCATION_FLAG = "pubsublite.location";
static final String TOPIC_NAME_FLAG = "pubsublite.topic";

static ConfigDef config() {
return new ConfigDef()
.define(PROJECT_FLAG, ConfigDef.Type.STRING, Importance.HIGH,
"The project containing the topic to which to publish.")
.define(LOCATION_FLAG, ConfigDef.Type.STRING, Importance.HIGH,
"The cloud zone (like europe-south7-q) containing the topic to which to publish.")
.define(TOPIC_NAME_FLAG, ConfigDef.Type.STRING, Importance.HIGH,
"The name of the topic to which to publish.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.google.pubsublite.kafka.sink;

public final class Constants {

private Constants() {
}

public static final String KAFKA_TOPIC_HEADER = "x-goog-pubsublite-source-kafka-topic";
public static final String KAFKA_PARTITION_HEADER = "x-goog-pubsublite-source-kafka-partition";
public static final String KAFKA_OFFSET_HEADER = "x-goog-pubsublite-source-kafka-offset";
public static final String KAFKA_EVENT_TIME_TYPE_HEADER = "x-goog-pubsublite-source-kafka-event-time-type";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.google.pubsublite.kafka.sink;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

public class PubSubLiteSinkConnector extends SinkConnector {
private Map<String, String> props;

@Override
public String version() {
return AppInfoParser.getVersion();
}

@Override
public void start(Map<String, String> map) {
props = map;
}

@Override
public Class<? extends Task> taskClass() {
return PubSubLiteSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int i) {
return Collections.nCopies(i, props);
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return ConfigDefs.config();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.google.pubsublite.kafka.sink;

import static com.google.pubsublite.kafka.sink.Schemas.encodeToBytes;

import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import com.google.cloud.pubsublite.internal.Publisher;

public class PubSubLiteSinkTask extends SinkTask {

private final PublisherFactory factory;
private @Nullable
Publisher<PublishMetadata> publisher;

@VisibleForTesting
PubSubLiteSinkTask(PublisherFactory factory) {
this.factory = factory;
}

public PubSubLiteSinkTask() {
this(new PublisherFactoryImpl());
}

@Override
public String version() {
return AppInfoParser.getVersion();
}

@Override
public void start(Map<String, String> map) {
if (publisher != null) {
throw new IllegalStateException("Called start when publisher already exists.");
}
publisher = factory.newPublisher(map);
publisher.startAsync().awaitRunning();
}

@Override
public void put(Collection<SinkRecord> collection) {
if (publisher.state() != State.RUNNING) {
if (publisher.state() == State.FAILED) {
throw new IllegalStateException("Publisher has failed.", publisher.failureCause());
} else {
throw new IllegalStateException("Publisher not currently running.");
}
}
for (SinkRecord record : collection) {
Message.Builder message = Message.builder();
if (record.key() != null) {
message.setKey(encodeToBytes(record.keySchema(), record.key()));
}
if (record.value() != null) {
message.setData(encodeToBytes(record.valueSchema(), record.value()));
}
ImmutableListMultimap.Builder<String, ByteString> attributes = ImmutableListMultimap
.builder();
getRecordHeaders(record).forEach(header -> attributes
.put(header.key(), Schemas.encodeToBytes(header.schema(), header.value())));
if (record.topic() != null) {
attributes.put(Constants.KAFKA_TOPIC_HEADER, ByteString.copyFromUtf8(record.topic()));
}
if (record.kafkaPartition() != null) {
attributes.put(Constants.KAFKA_PARTITION_HEADER,
ByteString.copyFromUtf8(record.kafkaPartition().toString()));
attributes.put(Constants.KAFKA_OFFSET_HEADER,
ByteString.copyFromUtf8(Long.toString(record.kafkaOffset())));
}
if (record.timestamp() != null) {
attributes.put(Constants.KAFKA_EVENT_TIME_TYPE_HEADER,
ByteString.copyFromUtf8(record.timestampType().name));
message.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
message.setAttributes(attributes.build());
publisher.publish(message.build());
}
}

private Iterable<? extends Header> getRecordHeaders(SinkRecord record) {
ConnectHeaders headers = new ConnectHeaders();
if (record.headers() != null) {
int headerCount = 0;
for (Header header : record.headers()) {
if (header.key().getBytes().length < 257 &&
String.valueOf(header.value()).getBytes().length < 1025) {
headers.add(header);
headerCount++;
}
if (headerCount > 100) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to drop headers, the documentation should indicate that this happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't have any header count or value limits in Pub/Sub Lite, so I've just removed this. We're using duplicate keyed headers anyway so exact Pub/Sub parity isnt a concern.

break;
}
}
}
return headers;
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
try {
if (publisher != null) {
publisher.flush();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void stop() {
if (publisher == null) {
throw new IllegalStateException("Called stop when publisher doesn't exist.");
}
try {
publisher.flush();
publisher.stopAsync().awaitTerminated();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
publisher = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.google.pubsublite.kafka.sink;

import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import java.util.Map;

interface PublisherFactory {

Publisher<PublishMetadata> newPublisher(Map<String, String> params);
}
Loading