Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add two new optional channels to the ProducerConfig for handling failures and acks #114

Closed
wants to merge 3 commits into from
Closed

Conversation

schleppy
Copy link
Contributor

FailedChannel, when provided will have all messages that are not acked sent as type *ConsumerEvent. The application using this package is responsible for reading from the failed channel or it will eventually block.

AckingChannel, when provided will provide counts per partition, per topic of acked messages after each flush. The application using this package is responsible for reading from the acking channel or is will eventually block.

This commit also includes a change for the error sent in the DroppedMessagesError struct to the errorCb to be the block error instead of the error returned from Produce.

@schleppy
Copy link
Contributor Author

Please let me know any suggestions or required changes to get this accepted.

@eapache
Copy link
Contributor

eapache commented Jun 26, 2014

@wvanbergen @fw42 @burke

MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
ReturnFailedMessages bool // Whether messages that have failed to be stored in Kafka should be returned via the Failed() channel. If true (default false) then you *must* read the values from the Failed() channel.
ReturnAckCounts bool // Whether to return the number of acks per partition (per topic) on the Acked() channel. If true (defaul false) then you *must* read the values from the Acked() channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/defaul/default

@eapache
Copy link
Contributor

eapache commented Jun 27, 2014

In the commit message, the AckCounts paragraph still refs ReturnFailedMessages in one spot instead of ReturnAckCounts

// Setting ReturnAckCounts = true in the producer config is required for the ack counts to
// be sent. You must consume this channel or it
// will eventually block
func (p *Producer) Acked() chan map[int32]int64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and Failed) should return read-only channels.

@schleppy
Copy link
Contributor Author

For getting the key back with the failed message, how would you prefer it done? It could just be a simple struct like

type failedMessage struct {
    key []byte
    value []byte
}

and they could be sent one at a time. Or it could be a []map[string][]byte where the key is cast to a string, and all failed messages for the prb all are sent as a slice.

@eapache
Copy link
Contributor

eapache commented Jun 30, 2014

I'd almost be tempted to just return each entire produceMessage since the topic and partition might also be useful information, but I think a new struct makes the most sense. Perhaps the produceMessage struct could embed it to reduce duplication? I'm not sure what makes the most sense long-term.

@eapache
Copy link
Contributor

eapache commented Jun 30, 2014

Other than figuring out the return format this looks fine to me now.

@wvanbergen
Copy link
Contributor

Maybe we can just reuse the ConsumerEvent type (http://godoc.org/github.com/Shopify/sarama#ConsumerEvent) or Message type (http://godoc.org/github.com/Shopify/sarama#Message) for failed messages, instead of introducer another type for it?

@schleppy
Copy link
Contributor Author

@wvanbergen I am all for reuse of an existing type. Of the two, ConsumerEvent seems to fit the bill pretty well. The only thing that cannot be populated is the offset field. Since it is an int64, I suppose it could be set to -1, as I don't believe offsets are ever given as negative values. I'm going to move forward with that solution, unless I hear otherwise.

@wvanbergen
Copy link
Contributor

I think not setting it (so value = 0) is acceptable as well. Also, we should make sure Err is set to something. @eapache what do you think ?

@eapache
Copy link
Contributor

eapache commented Jun 30, 2014

I prefer -1 I think, since 0 is in principle a valid offset. Err should be set if possible to the same err that comes out the Errors channel.

Also, should we rename ConsumerEvent at this point to something less consumer-specific?

@wvanbergen
Copy link
Contributor

+1 on the rename. Does it make sense to merge Message and ConsumerEvent while we are at it?

@schleppy
Copy link
Contributor Author

schleppy commented Jul 1, 2014

Is the current plan to do whatever naming rewrite before merging this? Or is that something that can be done afterward?

@wvanbergen
Copy link
Contributor

Let's ;eave the renaming for a separate PR; use ConsumerEvent for now.

@wvanbergen
Copy link
Contributor

After looking at this again, I think the API is a bit too complicated. You have to set a config variable, AND consume a channel that gets returned by a function call. Can we change it so as a user, you just provide a channel to the ProducerConfig if you want to use it? ProducerConfig.Errors = make(chan ...). Sarama then only produces errors onto this channel if it is set.

Does this make sense?

@schleppy
Copy link
Contributor Author

schleppy commented Jul 2, 2014

Sure, it makes sense. I will get on this tonight. I think after it is done it will be a lot more apparent how much improvement it offers to the api.

If this only for Errors, or do you want a similar approach take for acks?

Kafka and to retrieve messages that failed for possible replay.

ReturnFailedMessages in the config will trigger any messages that were not
successfully sent / acked to be sent on the failed channel as []byte.
The application using this package is responsible for reading from the
failed channel, retrieved via the Failed() function on the Producer   If
ReturnFailedMessages is true, then the application must consume the
Failed() channel or it will eventually block.

ReturnAckCounts in the config will trigger ack counts per partition, per
topic to be sent on the acked channel after each flush.
The application using this package is responsible for reading from the
acked channel, retrieved via the Acked() function on the Producer   If
ReturnFailedMessages is true, then the application must consume the
Acked() channel or it will eventually block.

This commit also includes a change for the error sent in the
DroppedMessagesError struct to the errorCb to be the block error instead
of the error returned from Produce.
@schleppy
Copy link
Contributor Author

schleppy commented Jul 3, 2014

I don't know what the heck happened with that upstream merge, but I hope all is right in the world.

}
default:
errorCb(DroppedMessagesError{len(prb), err})
errorCb(DroppedMessagesError{len(prb), block.Err})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oop, good catch

@eapache
Copy link
Contributor

eapache commented Jul 3, 2014

Not sure how one of my commits ended up in the history here... time for a rebase squish I think.

schleppy added 2 commits July 3, 2014 07:43
…n potential key with []byte message of failures. Possibly a new struct.

Change failed channel to type *ConsumerEvent so all relevent information including Key, Value, Partition, Topic and Err can be consumed by the async caller.

A rewrite based on PR comments.  Instead of boolean config values and
needing to get a channel via a function call, put the onus on the user
to provide an AckedChannel and/or a FailedChannel to read the acks /
failures from.

Add two new channels to the Producer for handling acknowledgments from
Kafka and to retrieve messages that failed for possible replay.

ReturnFailedMessages in the config will trigger any messages that were not
successfully sent / acked to be sent on the failed channel as []byte.
The application using this package is responsible for reading from the
failed channel, retrieved via the Failed() function on the Producer   If
ReturnFailedMessages is true, then the application must consume the
Failed() channel or it will eventually block.

ReturnAckCounts in the config will trigger ack counts per partition, per
topic to be sent on the acked channel after each flush.
The application using this package is responsible for reading from the
acked channel, retrieved via the Acked() function on the Producer   If
ReturnFailedMessages is true, then the application must consume the
Acked() channel or it will eventually block.

This commit also includes a change for the error sent in the
DroppedMessagesError struct to the errorCb to be the block error instead
of the error returned from Produce.

Changes based on PR review.  Still need to decide on best way to return potential key with []byte message of failures.  Possibly a new struct.

Change failed channel to type *ConsumerEvent so all relevent information including Key, Value, Partition, Topic and Err can be consumed by the async caller.

A rewrite based on PR comments.  Instead of boolean config values and
needing to get a channel via a function call, put the onus on the user
to provide an AckedChannel and/or a FailedChannel to read the acks /
failures from.
@schleppy
Copy link
Contributor Author

schleppy commented Jul 3, 2014

If you like I can open a new, fresh PR with all of the latest changes to clean things up a bit.

@schleppy schleppy changed the title Add two new channels to the Producer for handling acknowledgments from Add two new optional channels to the ProducerConfig for handling failures and acks Jul 3, 2014
@eapache
Copy link
Contributor

eapache commented Jul 3, 2014

Fine by me.

@schleppy schleppy closed this Jul 3, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants