Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle kafka "null messages" for deleting in a compacted topic #634

Closed
tcrayford opened this issue Mar 26, 2016 · 4 comments · Fixed by #669
Closed

Handle kafka "null messages" for deleting in a compacted topic #634

tcrayford opened this issue Mar 26, 2016 · 4 comments · Fixed by #669

Comments

@tcrayford
Copy link
Contributor

According to Kafka's source, the way to "delete" messages is to send a payload with a size of -1. Looking at sarama, it doesn't seem like this is supported, and https://github.com/Shopify/sarama/blob/2b18ad7079cca9c46a012cb6e7043e7de2985482/async_producer.go#L151 seems to suggest that this will end up with an incorrect header being sent to Kafka. Are you interested in a change to support this? I think we'd just want an additional check in that function that doesn't change size if the value encoder's length is -1.

As another supporting addition, it seems like it would be a good idea to have a DeleteMessageEncoder that is just an empty struct that handles this logic.

Thoughts? I'm happy doing all the coding/testing work here.

@tcrayford
Copy link
Contributor Author

I've been digging into this a bit further. There are some things:

  1. Sarama does seem to support this, if and only if the topic is uncompressed, and the Message.Value == nil. Then it'll write a -1 byte just fine.
  2. As of Kafka 0.8, compression wasn't supported in compacted topics, so writing a nil value wouldn't work. However, as of 0.9, compression is supported with compacted topics. This breaks in sarama, because of https://github.com/Shopify/sarama/blob/master/message.go#L118 - getting a nil message in a compacted topic returns a decoding error.

I'm not 100% clear on what sarama's opinion on backwards compatibility with Kafka broker versions is here - getting a nil Value on a compressed topic in 0.8 is "impossible" and right now sarama errors there. However, under 0.9, getting a nil on a compressed topic is a very normal thing.

Thoughts? I've already written the sarama code for handling this case and tested it against a kafka broker. I'm somewhat loathe to use uncompressed topics just to workaround this issue with sarama.

@eapache
Copy link
Contributor

eapache commented Mar 27, 2016

Quick note since it's the weekend: compaction is not something that we use internally, so we have put no effort into properly supporting it, but we have no objections to merging fixes or taking suggestions of that sort. Our supported versions of Kafka are documented in the README; we've had enough of these cases that I plan to introduce a configuration value so you can specify which broker version you are talking to and it will behave correctly.

More tomorrow.

@eapache
Copy link
Contributor

eapache commented Mar 28, 2016

Kafka uses a length of -1 for bytes and strings to distinguish null values from empty-but-non-null values (length of 0). The Length value from sarama's encoder is for calculating batch sizes and has no concept of nulls, so it should return 0 in either case. An encoder that returns 0 for length and nil for encode should be enough to send the -1 byte, as I think you've already discovered.

Because kafka's protocol is weird about compression (e.g. the compressed payload must be a nested message set) I would not be surprised if we have some issues sending or receiving nils in those cases. Bug fixes welcome, and we're happy to err on the side of being more permissive for older versions, at least until I get around to implementing the "which kafka version" config value.

@zanhsieh
Copy link

zanhsieh commented Apr 6, 2016

@tcrayford : Do you mind to share your sample code for deleting message? I set ProducerMessage.Value to nil but found nothing deleted. Tried Kafka 0.9 and 0.8.1.1.

eapache added a commit that referenced this issue Jun 9, 2016
Per https://kafka.apache.org/documentation.html#compaction messages with null
payloads are valid when using log compaction. Make this work for all messages,
not just uncompressed ones. Should fix #634.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants