-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enables zstd (for real this time) #1574
Conversation
a little bit of more info, I think the message got produced, the error is on handling the produce response. I'm also running kafkacat with debug mode, and I think it's consuming messages from a topic with zstd compression according to
|
Error is being fired by: https://github.com/Shopify/sarama/blob/diego_zstd-support-kafka-2_1_0_0/broker.go#L735-L751 Wich confirms my data that the message is actually being sent. |
It looks like we haven't read all bytes from the response, according to the error message and https://github.com/Shopify/sarama/blob/diego_zstd-support-kafka-2_1_0_0/real_decoder.go |
I noticed you were interested on this @lizthegrey might want help me to test it and report back, thanks |
Oh thank fuck. Yes, we'd be delighted to test this. Let me figure out how to feature flag this... cc @ianwilkes @tredman |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from nits mentioned, code LGTM. Good stuff, thanks.
it looks like consumer is still not working:
might be similar to the producer part, protocol changes between latest sarama protocol support and |
not looking good for the consumer side :( It's what @lizthegrey mentioned here #1443 |
Wondering if we should merge this as it's right now and leave a note like: looks like the consumer part has more protocol gaps that needs to be fixed, I can try to work on that on the next weeks in a new PR. What do you think @bai , we also need to wait a little bit for more confirmation that producing works. I'm still not sure how ruby-kafka added zstd support with only this minor changes https://github.com/zendesk/ruby-kafka/pull/724/files I'd like to test it to see if that even works 🤷♂ |
@d1egoaz can you add this patch to your branch ( From aea82107ffee2880ed8394326da7efa14ef586d0 Mon Sep 17 00:00:00 2001
From: Dominic Evans <dominic.evans@uk.ibm.com>
Date: Thu, 16 Jan 2020 23:23:36 +0000
Subject: [PATCH] fix: fill in the Fetch{Request,Response} protocol
In order to consume zstd-compressed records the consumer needs to send
and receive version 10 FetchRequest/FetchResponses, but they need to do
so in a well-formed manner that adheres to the encoding format.
Ref: https://kafka.apache.org/protocol
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
---
fetch_request.go | 123 +++++++++++++++++++++++++++++++++++++-----
fetch_request_test.go | 44 +++++++++------
fetch_response.go | 33 +++++++++++-
request.go | 2 +-
4 files changed, 172 insertions(+), 30 deletions(-)
diff --git a/fetch_request.go b/fetch_request.go
index f2a4643..d5f0776 100644
--- a/fetch_request.go
+++ b/fetch_request.go
@@ -1,20 +1,41 @@
package sarama
type fetchRequestBlock struct {
- fetchOffset int64
- maxBytes int32
+ Version int16
+ currentLeaderEpoch int32
+ fetchOffset int64
+ logStartOffset int64
+ maxBytes int32
}
-func (b *fetchRequestBlock) encode(pe packetEncoder) error {
+func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
+ b.Version = version
+ if b.Version >= 9 {
+ pe.putInt32(b.currentLeaderEpoch)
+ }
pe.putInt64(b.fetchOffset)
+ if b.Version >= 5 {
+ pe.putInt64(b.logStartOffset)
+ }
pe.putInt32(b.maxBytes)
return nil
}
-func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
+func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
+ b.Version = version
+ if b.Version >= 9 {
+ if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
+ return err
+ }
+ }
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
+ if b.Version >= 5 {
+ if b.logStartOffset, err = pd.getInt64(); err != nil {
+ return err
+ }
+ }
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
@@ -25,12 +46,15 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
- MaxWaitTime int32
- MinBytes int32
- MaxBytes int32
- Version int16
- Isolation IsolationLevel
- blocks map[string]map[int32]*fetchRequestBlock
+ MaxWaitTime int32
+ MinBytes int32
+ MaxBytes int32
+ Version int16
+ Isolation IsolationLevel
+ SessionID int32
+ SessionEpoch int32
+ blocks map[string]map[int32]*fetchRequestBlock
+ forgotten map[string][]int32
}
type IsolationLevel int8
@@ -50,6 +74,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
+ if r.Version >= 7 {
+ pe.putInt32(r.SessionID)
+ pe.putInt32(r.SessionEpoch)
+ }
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
@@ -65,17 +93,38 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
for partition, block := range blocks {
pe.putInt32(partition)
- err = block.encode(pe)
+ err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
+ if r.Version >= 7 {
+ err = pe.putArrayLength(len(r.forgotten))
+ if err != nil {
+ return err
+ }
+ for topic, partitions := range r.forgotten {
+ err = pe.putString(topic)
+ if err != nil {
+ return err
+ }
+ err = pe.putArrayLength(len(partitions))
+ if err != nil {
+ return err
+ }
+ for _, partition := range partitions {
+ pe.putInt32(partition)
+ }
+ }
+ }
+
return nil
}
func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
+
if _, err = pd.getInt32(); err != nil {
return err
}
@@ -97,6 +146,16 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
r.Isolation = IsolationLevel(isolation)
}
+ if r.Version >= 7 {
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.SessionEpoch, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
topicCount, err := pd.getArrayLength()
if err != nil {
return err
@@ -121,12 +180,43 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
return err
}
fetchBlock := &fetchRequestBlock{}
- if err = fetchBlock.decode(pd); err != nil {
+ if err = fetchBlock.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}
+
+ if r.Version >= 7 {
+ forgottenCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ if forgottenCount == 0 {
+ return nil
+ }
+ r.forgotten = make(map[string][]int32)
+ for i := 0; i < forgottenCount; i++ {
+ topic, err := pd.getString()
+ if err != nil {
+ return err
+ }
+ partitionCount, err := pd.getArrayLength()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic] = make([]int32, partitionCount)
+
+ for j := 0; j < partitionCount; j++ {
+ partition, err := pd.getInt32()
+ if err != nil {
+ return err
+ }
+ r.forgotten[topic][j] = partition
+ }
+ }
+ }
+
return nil
}
@@ -138,6 +228,7 @@ func (r *FetchRequest) version() int16 {
return r.Version
}
+// FIXME: get these correct
func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
@@ -160,13 +251,21 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}
+ if r.Version >= 7 && r.forgotten == nil {
+ r.forgotten = make(map[string][]int32)
+ }
+
if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}
tmp := new(fetchRequestBlock)
+ tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
+ if r.Version >= 9 {
+ tmp.currentLeaderEpoch = int32(-1)
+ }
r.blocks[topic][partitionID] = tmp
}
diff --git a/fetch_request_test.go b/fetch_request_test.go
index 1a94c2d..2fdd905 100644
--- a/fetch_request_test.go
+++ b/fetch_request_test.go
@@ -29,20 +29,32 @@ var (
)
func TestFetchRequest(t *testing.T) {
- request := new(FetchRequest)
- testRequest(t, "no blocks", request, fetchRequestNoBlocks)
-
- request.MaxWaitTime = 0x20
- request.MinBytes = 0xEF
- testRequest(t, "with properties", request, fetchRequestWithProperties)
-
- request.MaxWaitTime = 0
- request.MinBytes = 0
- request.AddBlock("topic", 0x12, 0x34, 0x56)
- testRequest(t, "one block", request, fetchRequestOneBlock)
-
- request.Version = 4
- request.MaxBytes = 0xFF
- request.Isolation = ReadCommitted
- testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
+ t.Run("no blocks", func(t *testing.T) {
+ request := new(FetchRequest)
+ testRequest(t, "no blocks", request, fetchRequestNoBlocks)
+ })
+
+ t.Run("with properties", func(t *testing.T) {
+ request := new(FetchRequest)
+ request.MaxWaitTime = 0x20
+ request.MinBytes = 0xEF
+ testRequest(t, "with properties", request, fetchRequestWithProperties)
+ })
+
+ t.Run("one block", func(t *testing.T) {
+ request := new(FetchRequest)
+ request.MaxWaitTime = 0
+ request.MinBytes = 0
+ request.AddBlock("topic", 0x12, 0x34, 0x56)
+ testRequest(t, "one block", request, fetchRequestOneBlock)
+ })
+
+ t.Run("one block v4", func(t *testing.T) {
+ request := new(FetchRequest)
+ request.Version = 4
+ request.MaxBytes = 0xFF
+ request.Isolation = ReadCommitted
+ request.AddBlock("topic", 0x12, 0x34, 0x56)
+ testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
+ })
}
diff --git a/fetch_response.go b/fetch_response.go
index 3afc187..79cc015 100644
--- a/fetch_response.go
+++ b/fetch_response.go
@@ -33,6 +33,7 @@ type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
+ LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
@@ -57,6 +58,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}
+ if version >= 5 {
+ b.LogStartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
numTransact, err := pd.getArrayLength()
if err != nil {
return err
@@ -166,6 +174,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
if version >= 4 {
pe.putInt64(b.LastStableOffset)
+ if version >= 5 {
+ pe.putInt64(b.LogStartOffset)
+ }
+
if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
@@ -200,7 +212,9 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
- Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ ErrorCode int16
+ SessionID int32
+ Version int16
LogAppendTime bool
Timestamp time.Time
}
@@ -216,6 +230,17 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}
+ if r.Version >= 7 {
+ r.ErrorCode, err = pd.getInt16()
+ if err != nil {
+ return err
+ }
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
+
numTopics, err := pd.getArrayLength()
if err != nil {
return err
@@ -258,6 +283,11 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
+ if r.Version >= 7 {
+ pe.putInt16(r.ErrorCode)
+ pe.putInt32(r.SessionID)
+ }
+
err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
@@ -294,6 +324,7 @@ func (r *FetchResponse) version() int16 {
return r.Version
}
+// FIXME: get these correct
func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
diff --git a/request.go b/request.go
index 97437d6..6e4ad87 100644
--- a/request.go
+++ b/request.go
@@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody {
case 0:
return &ProduceRequest{}
case 1:
- return &FetchRequest{}
+ return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
--
2.24.0 |
I've pushed the above patch up under a separate PR here #1582 |
Great @dnwe I'll try to give it a go, I'm currently checking the producer as I've got some other |
@d1egoaz hmm I had already checked the ProduceRequest and it seemed there weren't any new fields required. V4 added a new error type, V5 and V6 were identical to V4 and V7 just meant "supports zstd" |
@lizthegrey my tests found and error that was fixed, on my local env and some extended tests this has been working great! @dnwe also added support for the consumer and it worked great for me and for CI. Please let us know if you/your team were able to test this |
This comment has been minimized.
This comment has been minimized.
c394c83
to
eca4c40
Compare
Zstd support was initially added to sarama before https://issues.apache.org/jira/browse/KAFKA-4514 https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression was done. The final release added some changes like bumping the produce and fetch requests to only allow new clients to use zstd. This PR tries to do that, however, there are some other protocol changes that are not addressed on this PR, and I'm not sure what would be the effect of bumping the produce and fetch requests without filling the protocol gaps.
eca4c40
to
37faed7
Compare
That’s amazing. 👍🏼
…On Fri, Jan 31, 2020 at 4:52 PM Liz Fong-Jones ***@***.***> wrote:
Thank you for saving us 35% off our Kafka bill <3 <3
[image: Screenshot 2020-01-31 at 2 51 42 PM]
<https://user-images.githubusercontent.com/614704/73580102-4c2c0780-4439-11ea-9e93-656492482dbe.png>
—
You are receiving this because your review was requested.
Reply to this email directly, view it on GitHub
<#1574>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJZPZWRWSGEWRWQTTZ47F3RASTS7ANCNFSM4KHFWQWA>
.
|
Love that, thanks everyone for making it happen. |
Might want also to report this in https://github.com/klauspost/compress/ |
Couldn't it be related? klauspost/compress#264 |
Zstd support was initially added to sarama before
https://issues.apache.org/jira/browse/KAFKA-4514
https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression
was done.
The final release added some changes like bumping the produce and fetch
requests to only allow new clients to use zstd.
This PR tries to do that, however, there are some other protocol changes
that are not addressed on this PR, and I'm not sure what would be the
effect of bumping the produce and fetch requests without filling the
protocol gaps.
Tries to solve #1252
TODO:
Update 2020-01-15 13:00 ET
Now I need to figure it out what to do with the new producer response field, or just leave it like that?
Update 2020-01-15 11:00 ET
It's however and improvement from the current state, this is the new error message:
current master result: (from #1252)
Proposed change: (
make testzstd
)Code is failing here:
I'm not sure what I'm missing, I'm currently 👀 the java source code.
Any help would be appreciated.