Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PubSubLiteSinkConnector which acts as a sink connector for Pub/Sub Lite #243

Merged

Conversation

dpcollins-google
Copy link
Collaborator

No description provided.

@antonmry
Copy link

I've been also working on a Sink connector but with a slightly different approach: https://github.com/antonmry/pubsub/tree/feature/pubsublite

Are you going to work also in a source connector?

@dpcollins-google
Copy link
Collaborator Author

with a slightly different approach:

I considered (and independently implemented) this, however, I thought it would be possible to provide a better mapping of the kafka concepts to the Pub/Sub Lite wire protocol than to the Cloud Pub/Sub emulation layer. The approach you took is separately valid, with different tradeoffs.

Are you going to work also in a source connector?

I was going to defer on implementing a source connector until someone asked for it: due to how new Pub/Sub Lite is, it's unlikely that anyone will have data already in Pub/Sub Lite that they want to clone to Kafka, but if you have a use case for this, I have no problem implementing this.

@antonmry
Copy link

Our use case is replication between cloud and on-prem so services generating data in cloud can easily send it to on-prem using the source connector (and viceversa with the sink connector). So it's an interesting feature for us but we aren't planning to use it in the short term. Right now we are focusing more in PubSub than in Lite but it may change in the future when things like this are solved.

I guess the approach of the pubsub source connector doesn't work for the Lite version, right? It doesn't have a poll() method as the pubsub version does. Does it make sense to use Pub/Sub Lite Kafka shim for this?

@dpcollins-google
Copy link
Collaborator Author

doesn't work for the Lite version

A bit off topic for the PR, perhaps this would be best moved to a Feature Request thread. But you can adapt streaming -> pull based by using buffering, which is what we've done for both apache Beam and Kafka which use a pull model. The issue with directly using the Cloud Pub/Sub connector is that there are messages that can be in the Pub/Sub Lite system that cannot be interpreted as Cloud Pub/Sub messages without custom handling, which cannot be easily installed by a user of the source connector. Using the kafka shim to implement the subscribe connector (since it already handles buffering and auto-assignment) was what I was planning on doing.

@@ -27,7 +27,7 @@ These instructions assume you are using [Maven](https://maven.apache.org/).

`mvn package`

The resulting jar is at target/cps-kafka-connector.jar.
The resulting jar is at target/pubsub-kafka-connector.jar.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention Lite in the Introduction paragraph?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

copyFromUtf8(Long.toString(x.longValue()))
- Top level Floating point payloads are converted using
copyFromUtf8(Double.toString(x.doubleValue()))
- All other payloads are encoded into a protobuf Value, then converted to a ByteString.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may ultimately need to make this configurable. For people who don't use proto at all, they probably aren't going to want to have to convert to using it. In particular, generic protocol buffers like this can be a little hard to use, I think. Granted, the vast majority of people should probably just be using a BytesConverter that treats the payload as BYTES, which leaves the data unmodified. Maybe don't need to do anything about it now, but just something to keep in mind.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the vast majority of people should probably just be using a BytesConverter that treats the payload as BYTES

Yeah that would be best :/ But I think there should be way to handle a more complex schema, even if its a bit clunky.

We may ultimately need to make this configurable

Agreed. If someone opens a FR for it, we can definitely add this. I'm just not sure anyone will care, since they'll probably want the more compressed data.

For people who don't use proto at all, they probably aren't going to want to have to convert to using it

Its a bit easier by the fact that Pub/Sub Lite has no Push, so you have to have proto built in to use it at all. In addition, the protobuf json formatter special cases Value and Struct to get nice json as an output. But I agree, its not the most user friendly fallback, just a reasonable one that is also lossless.

This also provides us an easy config-only option if we get asked for it: if set, dump the Value as json instead of as bytes.

headers.add(header);
headerCount++;
}
if (headerCount > 100) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to drop headers, the documentation should indicate that this happens.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't have any header count or value limits in Pub/Sub Lite, so I've just removed this. We're using duplicate keyed headers anyway so exact Pub/Sub parity isnt a concern.

@kamalaboulhosn kamalaboulhosn merged commit e13f610 into GoogleCloudPlatform:master Nov 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants