Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single message consumer with 0.11.0 API #1022

Closed
bobrik opened this issue Jan 12, 2018 · 3 comments
Closed

Single message consumer with 0.11.0 API #1022

bobrik opened this issue Jan 12, 2018 · 3 comments

Comments

@bobrik
Copy link
Contributor

bobrik commented Jan 12, 2018

Versions

Sarama Version: f0c3255
Kafka Version: 0.11.0.1
Go Version: go1.9.2

Configuration

Sarama is using Kafka API 0.11.0.0.

Logs

I added some debug logging, here's the output:

018/01/12 06:17:11 Request: max wait time = 500, min bytes = 1, max bytes = 104857600, isolation = 0, num blocks = 1
2018/01/12 06:17:11 Claimed logs.www.nginx.access: [1]
2018/01/12 06:17:11   fetch request block for partition 1: &sarama.fetchRequestBlock{fetchOffset:15168197722, maxBytes:10000}
2018/01/12 06:17:11 Request: max wait time = 500, min bytes = 1, max bytes = 104857600, isolation = 0, num blocks = 1
2018/01/12 06:17:11   fetch request block for partition 1: &sarama.fetchRequestBlock{fetchOffset:15168197722, maxBytes:10000}
2018/01/12 06:17:11 Got response: version = 4, numTopics = 1, size = 10057 bytes
2018/01/12 06:17:11   Num blocks: 1
2018/01/12 06:17:11   Remaining: 10030
2018/01/12 06:17:11    block recordsSize = 10000
2018/01/12 06:17:11    block recordsSize after = 10000
2018/01/12 06:17:11      decoding record type = 2
2018/01/12 06:17:11      decoding record batch: offset = 15168197722, delta = 0, len = 901
2018/01/12 06:17:11      decoding record batch: numRecs = 1 (9939 bytes remaining)
2018/01/12 06:17:11      decoding record batch: codec = 0 (9087 bytes remaining)
2018/01/12 06:17:11   Remaining after block 0 (1 / 1 records): 0

Here's the diff that added the logging:

diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go
index 8c8e3a5..9701402 100644
--- a/vendor/github.com/Shopify/sarama/fetch_request.go
+++ b/vendor/github.com/Shopify/sarama/fetch_request.go
@@ -1,5 +1,7 @@
 package sarama
 
+import "log"
+
 type fetchRequestBlock struct {
 	fetchOffset int64
 	maxBytes    int32
@@ -54,6 +56,9 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 	if err != nil {
 		return err
 	}
+
+	log.Printf("Request: max wait time = %d, min bytes = %d, max bytes = %d, isolation = %d, num blocks = %d", r.MaxWaitTime, r.MinBytes, r.MaxBytes, r.Isolation, len(r.blocks))
+
 	for topic, blocks := range r.blocks {
 		err = pe.putString(topic)
 		if err != nil {
@@ -65,12 +70,14 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
 		}
 		for partition, block := range blocks {
 			pe.putInt32(partition)
+			log.Printf("  fetch request block for partition %d: %#v", partition, block)
 			err = block.encode(pe)
 			if err != nil {
 				return err
 			}
 		}
 	}
+
 	return nil
 }
 
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 86fa549..bb46305 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -1,6 +1,10 @@
 package sarama
 
-import "time"
+import (
+	"log"
+	"os"
+	"time"
+)
 
 type AbortedTransaction struct {
 	ProducerID  int64
@@ -75,10 +79,15 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
 		return err
 	}
 
+	log.Printf("   block recordsSize = %d", recordsSize)
+
 	recordsDecoder, err := pd.getSubset(int(recordsSize))
 	if err != nil {
 		return err
 	}
+
+	log.Printf("   block recordsSize after = %d", recordsSize)
+
 	if recordsSize > 0 {
 		if err = b.Records.decode(recordsDecoder); err != nil {
 			return err
@@ -136,6 +145,8 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 		return err
 	}
 
+	log.Printf("Got response: version = %d, numTopics = %d, size = %d bytes", r.Version, numTopics, pd.remaining())
+
 	r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
@@ -148,6 +159,9 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 			return err
 		}
 
+		log.Printf("  Num blocks: %d", numBlocks)
+		log.Printf("  Remaining: %d", pd.remaining())
+
 		r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
@@ -162,9 +176,14 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
 				return err
 			}
 			r.Blocks[name][id] = block
+
+			nr, _ := block.Records.numRecords()
+			log.Printf("  Remaining after block %d (%d / %d records): %d", j, len(block.Records.recordBatch.Records), nr, pd.remaining())
 		}
 	}
 
+	os.Exit(42)
+
 	return nil
 }
 
diff --git a/vendor/github.com/Shopify/sarama/record_batch.go b/vendor/github.com/Shopify/sarama/record_batch.go
index a8c533b..c798653 100644
--- a/vendor/github.com/Shopify/sarama/record_batch.go
+++ b/vendor/github.com/Shopify/sarama/record_batch.go
@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"fmt"
 	"io/ioutil"
+	"log"
 	"time"
 
 	"github.com/eapache/go-xerial-snappy"
@@ -123,6 +124,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
+
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Control = attributes&controlMask == controlMask
 
@@ -130,6 +132,8 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	log.Printf("     decoding record batch: offset = %d, delta = %d, len = %d", b.FirstOffset, b.LastOffsetDelta, batchLen)
+
 	if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
 		return err
 	}
@@ -154,6 +158,9 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
+
+	log.Printf("     decoding record batch: numRecs = %d (%d bytes remaining)", numRecs, pd.remaining())
+
 	if numRecs >= 0 {
 		b.Records = make([]*Record, numRecs)
 	}
@@ -173,6 +180,8 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
+	log.Printf("     decoding record batch: codec = %d (%d bytes remaining)", b.Codec, pd.remaining())
+
 	switch b.Codec {
 	case CompressionNone:
 	case CompressionGZIP:
diff --git a/vendor/github.com/Shopify/sarama/records.go b/vendor/github.com/Shopify/sarama/records.go
index 54ee7e3..ec873f5 100644
--- a/vendor/github.com/Shopify/sarama/records.go
+++ b/vendor/github.com/Shopify/sarama/records.go
@@ -1,6 +1,9 @@
 package sarama
 
-import "fmt"
+import (
+	"fmt"
+	"log"
+)
 
 const (
 	unknownRecords = iota
@@ -90,6 +93,8 @@ func (r *Records) decode(pd packetDecoder) error {
 		}
 	}
 
+	log.Printf("     decoding record type = %d", r.recordsType)
+
 	switch r.recordsType {
 	case legacyRecords:
 		r.msgSet = &MessageSet{}
Problem Description

This is related to #901.

See the ticket I filed previously for Kafka itself: https://issues.apache.org/jira/browse/KAFKA-6441

The problem is that Sarama decodes 0.11.0.0 FetchResponse in such a way, that only the very first message is extracted. We have pretty deep buffers and this resulted in Kafka outbound jumping from 125Mbit/s to 55000Mbit/s for a tiny topic. Essentially, consumers are only able to advance 1 message at a time, while Kafka always replies with filled buffers with thousands of messages.

I'm not sure if the problem is within Kafka or Sarama.

In reply for the logged request above we get this reply (truncated):

	reply := []byte{
		// base offset: 8 bytes
		0x0, 0x0, 0x0, 0x3, 0x88, 0x18, 0x54, 0x5a,
		// len: 4 bytes
		0x0, 0x0, 0x3, 0x85,
		// leader epoch: 4 bytes,
		0x0, 0x0, 0x0, 0x8c,
		// version: 1 byte,
		0x2,
		// crc: 4 bytes
		0x78, 0xb4, 0x40, 0x58,
		// attributes: 2 bytes
		0x0, 0x0,
		// last offset delta: 4 bytes
		0x0, 0x0, 0x0, 0x0,
		// first timestamp: 8 bytes
		0x0, 0x0, 0x1, 0x60, 0xe9, 0x3, 0x46, 0x1e,
		// last timestamp: 8 bytes
		0x0, 0x0, 0x1, 0x60, 0xe9, 0x3, 0x46, 0x1e,
		// producer id: 8 bytes
		0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
		// producer epoch: 2 bytes
		0xff, 0xff,
		// base sequence: 4 bytes
		0xff, 0xff, 0xff, 0xff,
		// records count: 4 bytes
		0x0, 0x0, 0x0, 0x1,
		// except from the first record below
		0xa4, 0xd, 0x0, 0x0, 0x0, 0x10, 0x31, 0x30, 0x38, 0x63, 0x6f, 0x6d, 0x32,
		0x32, 0x86, 0xd, 0x7b, 0x22, 0x40, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74,
		0x61, 0x6d, 0x70, 0x22, 0x3a, 0x20, 0x22, 0x32, 0x30, 0x31, 0x38, 0x2d,
		0x30, 0x31, 0x2d, 0x31, 0x32, 0x54, 0x30, 0x36, 0x3a, 0x31, 0x36, 0x3a,
		0x33, 0x34, 0x2b, 0x30, 0x30, 0x3a, 0x30, 0x30, 0x22, 0x2c, 0x20, 0x22,
		0x40, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x20, 0x31,
		0x2c, 0x22, 0x68, 0x6f, 0x73, 0x74, 0x22, 0x3a,
		// ... lots more bytes than just one record ...
	}

Kafka sets recordsSize in FetchResponse to a large number (10000 in the logs), that is supposed to hold multiple messages (which are a lot shorter than 10000 bytes). Then in RecordBatch Kafka clearly says that there is only one record (0x0, 0x0, 0x0, 0x1) in this record and the length of the whole thing is 901 bytes (0x3, 0x85).

The gap between 901 bytes and 10000 bytes filled with other messages (records? record batches?), which Sarama does not see.

I looked at Kafka protocol docs, but they do not mention anything about this:

cc @wladh

@wladh
Copy link
Contributor

wladh commented Jan 12, 2018

Digging a bit around the Java code, it seems that indeed there could be more than one batch in a stream (eg, a fetch response). I guess this is done for efficiency reasons, as the batches are stored/retrieved from the log with zero-copy so the records are not re-batched in the broker. I will work on a fix for this.

@bobrik
Copy link
Contributor Author

bobrik commented Jan 12, 2018

@wladh, can you point me at relevant Java code, so I can also take a look?

@bobrik
Copy link
Contributor Author

bobrik commented Jan 12, 2018

Indeed there may be multiple of RecordBatch.

It's a bit confusing to have different names for entities in Kafka and Sarama, you have to match concepts by field names within them.

eapache added a commit that referenced this issue Jan 22, 2018
Support multiple record batches, closes #1022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants