Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue when producing a message with multiple headers #994

Closed
acehko opened this issue Dec 3, 2017 · 4 comments
Closed

Issue when producing a message with multiple headers #994

acehko opened this issue Dec 3, 2017 · 4 comments

Comments

@acehko
Copy link

acehko commented Dec 3, 2017

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
Sarama Version: 6a8d89d
Kafka Version: 1.0.0
Go Version: go1.9.2 linux/amd64

Configuration

What configuration values are you using for Sarama and Kafka?

Kafka: wurstmeister/kafka:1.0.0 docker image with default config.
Sarama: cfg.Version = sarama.V1_0_0_0

Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

Producer logs:

2017/12/03 16:04:57 Initializing new client
2017/12/03 16:04:57 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:57 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:57 client/metadata fetching metadata for all topics from broker localhost:9092
2017/12/03 16:04:57 Connected to broker at localhost:9092 (unregistered)
2017/12/03 16:04:57 client/brokers registered new broker #1 at 127.0.0.1:9092
2017/12/03 16:04:57 Successfully initialized new client
2017/12/03 16:04:57 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:57 producer/broker/1 starting up
2017/12/03 16:04:57 producer/broker/1 state change to [open] on my_topic/0
2017/12/03 16:04:57 Connected to broker at 127.0.0.1:9092 (registered as #1)

Consumer logs:

2017/12/03 16:04:56 Initializing new client
2017/12/03 16:04:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:56 client/metadata fetching metadata for all topics from broker localhost:9092
2017/12/03 16:04:56 Connected to broker at localhost:9092 (unregistered)
2017/12/03 16:04:56 client/brokers registered new broker #1 at 127.0.0.1:9092
2017/12/03 16:04:56 Successfully initialized new client
2017/12/03 16:04:56 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
2017/12/03 16:04:56 Connected to broker at 127.0.0.1:9092 (registered as #1)
2017/12/03 16:04:56 consumer/broker/1 added subscription to my_topic/0
2017/12/03 16:04:57 Key: header-3, Value: value-3
2017/12/03 16:04:57 Key: header-3, Value: value-3
2017/12/03 16:04:57 Key: header-3, Value: value-3
Problem Description

When producing a message with multiple headers the last header overwrites all other headers.

Code examples:
Producer:

package main

import (
	"log"
	"os"

	"github.com/Shopify/sarama"
)

func main() {
	sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)

	cfg := sarama.NewConfig()
	cfg.Version = sarama.V1_0_0_0
	cfg.Producer.Return.Successes = true
	cfg.Producer.Return.Errors = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}

	msg := &sarama.ProducerMessage{
		Topic: "my_topic",
		Value: sarama.StringEncoder("testing 123"),
		Headers: []sarama.RecordHeader{
			sarama.RecordHeader{
				Key:   []byte("header-1"),
				Value: []byte("value-1"),
			},
			sarama.RecordHeader{
				Key:   []byte("header-2"),
				Value: []byte("value-2"),
			},
			sarama.RecordHeader{
				Key:   []byte("header-3"),
				Value: []byte("value-3"),
			},
		},
	}
	_, _, err = producer.SendMessage(msg)
	if err != nil {
		panic(err)
	}
}

Consumer:

package main

import (
	"log"
	"os"

	"github.com/Shopify/sarama"
)

func main() {
	sarama.Logger = log.New(os.Stdout, "", log.LstdFlags)

	cfg := sarama.NewConfig()
	cfg.Version = sarama.V1_0_0_0

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, cfg)
	if err != nil {
		panic(err)
	}

	partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}

	msg := <-partitionConsumer.Messages()
	for _, h := range msg.Headers {
		log.Printf("Key: %s, Value: %s", h.Key, h.Value)
	}
}
@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

Hmm, the encode/decode logic looks right. Can you take a look on the network and see if this is a bug happening on encode or on decode? Or maybe (though unlikely) it's a bug on the broker side?

cc @wladh on the off-chance you know what could be causing this.

@wladh
Copy link
Contributor

wladh commented Dec 4, 2017

The problem is with the way we copy the headers from ProducerMessage to Record. We shouldn't take the reference to the loop variable because that's a copy of the header, not to the actual header.

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

Oh right. This is one of the more annoying subtleties of Go, it's bitten me before too.

@acehko
Copy link
Author

acehko commented Dec 4, 2017

Merge fixed the issue. Thanks.

@acehko acehko closed this as completed Dec 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants