Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC/WIP] Embed client in producer and consumer #430

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ func forceFlushThreshold() int {
// leaks: it will not be garbage-collected automatically when it passes out of
// scope.
type AsyncProducer interface {
Client

// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
// buffered. The shutdown has completed when both the Errors and Successes channels
// have been closed. When calling AsyncClose, you *must* continue to read from those
// channels in order to drain the results of any messages in flight.
AsyncClose()

// Close shuts down the producer and flushes any messages it may have buffered.
// You must call this function before a producer object passes out of scope, as
// it may otherwise leak memory. You must call this before calling Close on the
// underlying client.
Close() error

// Input is the input channel for the user to write messages to that they wish to send.
Input() chan<- *ProducerMessage

Expand All @@ -48,7 +43,7 @@ type AsyncProducer interface {
}

type asyncProducer struct {
client Client
Client
conf *Config
ownClient bool

Expand Down Expand Up @@ -84,7 +79,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
}

p := &asyncProducer{
client: client,
Client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
Expand Down Expand Up @@ -252,7 +247,7 @@ func (p *asyncProducer) topicDispatcher() {
}

if p.ownClient {
err := p.client.Close()
err := p.Client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
Expand Down Expand Up @@ -308,11 +303,11 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch

breaker := breaker.New(3, 1, 10*time.Second)
doUpdate := func() (err error) {
if err = p.client.RefreshMetadata(topic); err != nil {
if err = p.RefreshMetadata(topic); err != nil {
return err
}

if leader, err = p.client.Leader(topic, partition); err != nil {
if leader, err = p.Leader(topic, partition); err != nil {
return err
}

Expand All @@ -322,7 +317,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch

// try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
// on the first message
leader, _ = p.client.Leader(topic, partition)
leader, _ = p.Leader(topic, partition)
if leader != nil {
output = p.getBrokerProducer(leader)
}
Expand Down Expand Up @@ -646,9 +641,9 @@ func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMe
var err error

if partitioner.RequiresConsistency() {
partitions, err = p.client.Partitions(msg.Topic)
partitions, err = p.Partitions(msg.Topic)
} else {
partitions, err = p.client.WritablePartitions(msg.Topic)
partitions, err = p.WritablePartitions(msg.Topic)
}

if err != nil {
Expand Down
21 changes: 10 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@ func (ce ConsumerErrors) Error() string {
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
type Consumer interface {
Client

// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
// literal offset, or OffsetNewest or OffsetOldest
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)

// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
Close() error
}

type consumer struct {
client Client
Client
conf *Config
ownClient bool

Expand Down Expand Up @@ -82,7 +81,7 @@ func NewConsumerFromClient(client Client) (Consumer, error) {
}

c := &consumer{
client: client,
Client: client,
conf: client.Config(),
children: make(map[string]map[int32]*partitionConsumer),
brokerConsumers: make(map[*Broker]*brokerConsumer),
Expand All @@ -93,7 +92,7 @@ func NewConsumerFromClient(client Client) (Consumer, error) {

func (c *consumer) Close() error {
if c.ownClient {
return c.client.Close()
return c.Client.Close()
}
return nil
}
Expand All @@ -117,7 +116,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)

var leader *Broker
var err error
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
if leader, err = c.Leader(child.topic, child.partition); err != nil {
return nil, err
}

Expand Down Expand Up @@ -297,13 +296,13 @@ func (child *partitionConsumer) dispatcher() {
}

func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
if err := child.consumer.RefreshMetadata(child.topic); err != nil {
return err
}

var leader *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
if leader, err = child.consumer.Leader(child.topic, child.partition); err != nil {
return err
}

Expand All @@ -315,11 +314,11 @@ func (child *partitionConsumer) dispatch() error {
}

func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
newestOffset, err := child.consumer.GetOffset(child.topic, child.partition, OffsetNewest)
if err != nil {
return err
}
oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
oldestOffset, err := child.consumer.GetOffset(child.topic, child.partition, OffsetOldest)
if err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@ import "sync"
// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
// it passes out of scope.
type SyncProducer interface {
Client

// SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
// It will return the partition and the offset of the produced message, or an error if the message
// failed to produce.
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
// on the underlying client.
Close() error
}

type syncProducer struct {
Client
producer *asyncProducer
wg sync.WaitGroup
}
Expand All @@ -45,7 +42,7 @@ func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
p.conf.Producer.Return.Successes = true
p.conf.Producer.Return.Errors = true
sp := &syncProducer{producer: p}
sp := &syncProducer{Client: p.Client, producer: p}

sp.wg.Add(2)
go withRecover(sp.handleSuccesses)
Expand Down