-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Kafka 0.9 protocol additions #577
Conversation
type GroupProtocol interface { | ||
encodeGroupProtocol(packetEncoder) error | ||
decodeGroupProtocol(packetDecoder) error | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is a bit tricky; the protocol defines this to be extensible based on the ProtocolType
. It continues to only define the specifics for the "consumer" protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am tempted to just make this []byte
and handle the consumer-specific implementation via Encoder
interface? Not sure, but I dislike a public interface with no public methods.
c48c940
to
7a36173
Compare
@@ -37,6 +37,9 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process | |||
// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max | |||
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") | |||
|
|||
// ErrMessageTooLarge is returned when a JoinGroup request returns a protocol type that is not supported by sarama. | |||
var ErrUnknownGroupProtocol = errors.New("kafka: encountered an unknown group protocol") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't happen, the broker selects the protocols supported by all members, and if no such set exists throws a InconsistentGroupProtocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this error is only used when the ProtocolType on the request is set (by the user) to an unknown value (i.e. anything but consumer
).
You're going to need to add all the new |
} | ||
|
||
func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) { | ||
r.GroupId, err = pd.getString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these can all be collapsed into their if
lines because err
is predefined
dce94b6
to
821ca5b
Compare
Added new |
07eec68
to
18a7ed2
Compare
case ErrGroupAuthorizationFailed: | ||
return "kafka server: The client is not authorized to access this group" | ||
case ErrClusterAuthorizationFailed: | ||
return "kafka server: The client is not authorized to send this request type" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for consistency, the existing errors end in punctuation
3bb8696
to
21f7755
Compare
I've implemented all the new response/request pairs. I have also removed the ConsumerGroup specialization stuff and left it at just byte-arrays. We can later decide how to handle that best. |
LGTM. Collapse all the |
39e4207
to
611688b
Compare
0, 3, 't', 'w', 'o', // Protocol name | ||
0, 0, 0, 3, 0x04, 0x05, 0x06, // protocol metadata | ||
0, 3, 'o', 'n', 'e', // Protocol name | ||
0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets assembled from a map, so the order is not deterministic. @eapache how have you gotten around this elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only test the 0/1 cases :P
611688b
to
9a8900b
Compare
9a8900b
to
cacc655
Compare
I changed some slice types into maps to make the types easier to use, and added tests to all the things. This is ready for final review. |
cacc655
to
756801d
Compare
Note: some messages, fields, and errors are also renamed as part of the 0.9 release. For backwards compatibility reasons, these are not included. |
package sarama | ||
|
||
type DescribeGroupsResponse struct { | ||
Groups []*GroupDescription |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kinda/almost/sorta makes sense for this to be a map keyed by group ID, but not sure if it's worth it at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would make decode rather more complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like that this matches the request: you ask for a slice of groups, you get a slice of group descriptions back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair
Implement Kafka 0.9 protocol additions
@eapache Am I on the right track here?