Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ProduceResponse v1 and v2 encoding #970

Conversation

slaunay
Copy link
Contributor

@slaunay slaunay commented Oct 18, 2017

I have been running into an issue when using v1.13.0 of Sarama for unit testing a producer with the MockBroker abstraction.

When using an AsyncProducer with config.version >= V0_9_0_0 decoding produce responses from a MockBroker fails with: insufficient data to decode packet, more bytes expected.

Example to reproduce error

Example of application that fails when sending a V0_10_0_0 ProduceRequest as it expects the same version in the ProduceResponse:

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/Shopify/sarama"
)

func main() {
	sarama.Logger = log.New(os.Stdout, "[SARAMA]", log.LstdFlags|log.Lmicroseconds)
	reporter := reporter{}
	seedBroker := sarama.NewMockBroker(reporter, 1)
	leader := sarama.NewMockBroker(reporter, 2)

	metadataResponse := new(sarama.MetadataResponse)
	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
	metadataResponse.AddTopicPartition("topic", 0, leader.BrokerID(), nil, nil, sarama.ErrNoError)
	seedBroker.Returns(metadataResponse)

	prodSuccess := new(sarama.ProduceResponse)
	// We are using V0_10_0_0 so the response version should match the request version
	prodSuccess.Version = 2
	prodSuccess.AddTopicPartition("topic", 0, sarama.ErrNoError)
	leader.Returns(prodSuccess)

	config := sarama.NewConfig()
	config.Metadata.Retry.Max = 0
	config.Producer.Retry.Max = 0
	config.Producer.Return.Successes = true
	config.Version = sarama.V0_10_0_0

	producer, err := sarama.NewAsyncProducer([]string{seedBroker.Addr()}, config)
	if err != nil {
		log.Fatal(err)
	}

	producer.Input() <- &sarama.ProducerMessage{
		Topic: "topic",
		Value: sarama.StringEncoder("value"),
	}

	select {
	case <-producer.Successes():
		log.Println("Success")
	case err := <-producer.Errors():
		log.Println("Error", err)
	}

	producer.Close()
	leader.Close()
	seedBroker.Close()
}

type reporter struct{}

func (r reporter) Error(args ...interface{}) {
	log.Print(args...)
}
func (r reporter) Errorf(format string, args ...interface{}) {
	log.Printf(format, args...)
}
func (r reporter) Fatal(args ...interface{}) {
	log.Fatal(args...)
}
func (r reporter) Fatalf(format string, args ...interface{}) {
	log.Fatalf(format, args...)
}

Logs

The application fails with:

[SARAMA]2017/10/18 15:57:30.136867 *** mockbroker/1 listening on 127.0.0.1:55595
[SARAMA]2017/10/18 15:57:30.137777 *** mockbroker/2 listening on 127.0.0.1:55596
[SARAMA]2017/10/18 15:57:30.137819 Initializing new client
[SARAMA]2017/10/18 15:57:30.137826 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[SARAMA]2017/10/18 15:57:30.137866 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[SARAMA]2017/10/18 15:57:30.137874 client/metadata fetching metadata for all topics from broker 127.0.0.1:55595
[SARAMA]2017/10/18 15:57:30.138172 Connected to broker at 127.0.0.1:55595 (unregistered)
[SARAMA]2017/10/18 15:57:30.138253 *** mockbroker/1/0: connection opened
[SARAMA]2017/10/18 15:57:30.138375 *** mockbroker/1/0: served &{0 sarama 0xc420110020} -> &{[0xc4200d6000] [0xc42007cc30]}
[SARAMA]2017/10/18 15:57:30.138493 client/brokers registered new broker #2 at 127.0.0.1:55596
[SARAMA]2017/10/18 15:57:30.138535 Successfully initialized new client
[SARAMA]2017/10/18 15:57:30.138645 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[SARAMA]2017/10/18 15:57:30.138739 producer/broker/2 starting up
[SARAMA]2017/10/18 15:57:30.138760 producer/broker/2 state change to [open] on topic/0
[SARAMA]2017/10/18 15:57:30.138925 Connected to broker at 127.0.0.1:55596 (registered as #2)
[SARAMA]2017/10/18 15:57:30.138986 *** mockbroker/2/0: connection opened
[SARAMA]2017/10/18 15:57:30.139205 *** mockbroker/2/0: served &{0 sarama 0xc420140400} -> &{map[topic:map[0:0xc42007ccc0]] 2 0s}
[SARAMA]2017/10/18 15:57:30.139299 producer/broker/2 state change to [closing] because kafka: insufficient data to decode packet, more bytes expected
[SARAMA]2017/10/18 15:57:30.139331 Closed connection to broker 127.0.0.1:55596
2017/10/18 15:57:30 Error kafka: Failed to produce message to topic topic: kafka: insufficient data to decode packet, more bytes expected
[SARAMA]2017/10/18 15:57:30.139371 Producer shutting down.
[SARAMA]2017/10/18 15:57:30.139379 Closing Client
[SARAMA]2017/10/18 15:57:30.139414 producer/broker/2 shut down
[SARAMA]2017/10/18 15:57:30.139433 Closed connection to broker 127.0.0.1:55595
[SARAMA]2017/10/18 15:57:30.139385 *** mockbroker/2/0: invalid request: err=EOF, (*sarama.request)(<nil>)
[SARAMA]2017/10/18 15:57:30.139443 *** mockbroker/2/0: connection closed, err=<nil>
[SARAMA]2017/10/18 15:57:30.139466 *** mockbroker/2: listener closed, err=accept tcp 127.0.0.1:55596: use of closed network connection
[SARAMA]2017/10/18 15:57:30.139484 *** mockbroker/1/0: invalid request: err=EOF, (*sarama.request)(<nil>)
[SARAMA]2017/10/18 15:57:30.139492 *** mockbroker/1/0: connection closed, err=<nil>
[SARAMA]2017/10/18 15:57:30.139510 *** mockbroker/1: listener closed, err=accept tcp 127.0.0.1:55595: use of closed network connection

Code change

  • sort Blocks by increasing order prior to encoding to ensure deterministic payload to simplify unit tests
  • encode ProduceResponse ThrottleTime when version >= 1
  • encode ProduceResponseBlock Timestamp when version >= 2
  • add unit tests for ProduceResponse decoding (version 1 and 2)
  • add unit tests for ProduceResponse encoding

if !prb.Timestamp.Before(time.Unix(0, 0)) {
timestamp = prb.Timestamp.UnixNano() / int64(time.Millisecond)
} else if !prb.Timestamp.IsZero() {
return PacketDecodingError{fmt.Sprintf("invalid timestamp (%v)", prb.Timestamp)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PacketEncodingError?

}
sort.Ints(partitionNumbers)
for _, p := range partitionNumbers {
prb := partitions[int32(p)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arguably this whole block should get moved to ProduceResponseBlock.encode?

@eapache
Copy link
Contributor

eapache commented Oct 19, 2017

Not entirely sold on the sorting, but since these encode methods are basically test-only I guess it's fine.

encode ProduceResponse ThrottleTime when version >= 1

This... was not changed in this PR?

@slaunay
Copy link
Contributor Author

slaunay commented Oct 19, 2017

Thanks for the quick review @eapache.

Not entirely sold on the sorting, but since these encode methods are basically test-only I guess it's fine.

Not supper happy with that either so I'm going to remove the sorting and update the test to use a single topic and a single partition so that order does not matter (that's how unit tests are done for ProduceRequest that also relies on a map for the msgSets field).

encode ProduceResponse ThrottleTime when version >= 1
This... was not changed in this PR?

No it was not, I guess by adding the unit test for it (decoding and encoding I got confused).

- add ProduceResponseBlock.encode(packetEncoder, int16)
- encode ProduceResponseBlock Timestamp field when version >= 2
- add unit tests for ProduceResponse decoding (version 1 and 2)
- add unit tests for ProduceResponse encoding
@slaunay slaunay force-pushed the enhancement/support-produce-response-timestamp-encoding branch from c1bc8e1 to ca457b7 Compare October 19, 2017 17:14
@slaunay
Copy link
Contributor Author

slaunay commented Oct 19, 2017

I decided to rebase the changes as they are quite simple now and the commit message has been updated accordingly.

Let me know what you think, the only drawback IMHO is that the coverage for the unit tests has decreased (not checking multiple topics/partitions or validating the -1 timestamp case anymore).

@eapache
Copy link
Contributor

eapache commented Oct 19, 2017

Looks good, thanks!

@eapache eapache merged commit 630b33c into IBM:master Oct 19, 2017
@slaunay slaunay deleted the enhancement/support-produce-response-timestamp-encoding branch October 19, 2017 19:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants