-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ClusterAdmin Support #1055
ClusterAdmin Support #1055
Conversation
admin.go
Outdated
} | ||
|
||
// Check that we are not dealing with a closed Client before processing any other arguments | ||
if client.Closed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary since you literally just created it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will delete that
admin.go
Outdated
return ca, nil | ||
} | ||
|
||
func (ca *clusterAdmin) handleResponses(rsp interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the motivation for grouping all the response-handling in a giant switch here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will handle all the responses . For now I clubbed it but I will handle them individually.
client.go
Outdated
@@ -35,6 +35,8 @@ type Client interface { | |||
// topic/partition, as determined by querying the cluster metadata. | |||
Leader(topic string, partitionID int32) (*Broker, error) | |||
|
|||
Any() *Broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a good reason to expose this publicly, it's never been a well-defined method in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will use client.Brokers instead of any.
@eapache what is the convenient way to access broker (CreateTopic , DeleteTopic from client. I exposed Any() from client interface but i agree it is not needed.I tried using brokers but it is not guaranteed they are open always. My tests failed with broker not connected. Can you please suggest any other way to access the underlying brokers which are opened . I will make those changes . |
I would suggest that we first settle on the design in #1048 before finalising the implementation. Also, part of the answer to the question about how to access the broker is that some of the methods ( |
Thanks @buyology , will wait for the design to be finalized and method for retrieving the controller. |
b12bde3
to
ce27e4e
Compare
admin.go
Outdated
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, | ||
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. | ||
// Methods with stricter requirements will specify the minimum broker version required. | ||
// This client was introduced in 0.11.0.0 and the API is still evolving. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this line is necessary in the documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. will remove that . I was thinking mentioning the version dependency would be needed especially admin related stuff is supported only in latest versions of kafka. i just followed how we have description on brokers package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed it
admin.go
Outdated
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. | ||
// Methods with stricter requirements will specify the minimum broker version required. | ||
// This client was introduced in 0.11.0.0 and the API is still evolving. | ||
// You MUST call Close() on a client to avoid leaks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/client/cluster admin
admin.go
Outdated
// This client was introduced in 0.11.0.0 and the API is still evolving. | ||
// You MUST call Close() on a client to avoid leaks | ||
type ClusterAdmin interface { | ||
// Creates a new topic .This operation is supported by brokers with version 0.10.1.0 or higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo on the period
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will address that.
admin.go
Outdated
// This client was introduced in 0.11.0.0 and the API is still evolving. | ||
// You MUST call Close() on a client to avoid leaks | ||
type ClusterAdmin interface { | ||
// Creates a new topic .This operation is supported by brokers with version 0.10.1.0 or higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to specify the version requirement here, I prefer it at the end of the description where it already is for other methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok , will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
admin.go
Outdated
// The validateOnly option is supported from version 0.10.2.0. | ||
CreateTopic(topic string, detail *TopicDetail) error | ||
|
||
// Delete a batch of topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method only takes one topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is what we though in the initial design. So i went with that . Should i change it to take multiple topics ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably? At the very least, the code and the documentation should be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the comments to singular topic.
admin.go
Outdated
// to become aware that the topics have been created. During this time, ClusterAdmin#listTopics | ||
// and ClusterAdmin#describeTopics may not return information about the new topics. | ||
// This operation is supported by brokers with version 0.10.1.0 or higher. | ||
// The validateOnly option is supported from version 0.10.2.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method doesn't take a validateOnly
flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gave option for validateOnly flag
admin.go
Outdated
// Creates access control lists (ACLs) which are bound to specific resources. | ||
// This operation is not transactional so it may succeed for some ACLs while fail for others. | ||
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but | ||
// no changes will be made.This operation is supported by brokers with version 0.11.0.0 or higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
admin.go
Outdated
// no changes will be made.This operation is supported by brokers with version 0.11.0.0 or higher. | ||
CreateACL(resource Resource, acl Acl) error | ||
|
||
ListAcls(filter AclFilter) ([]ResourceAcls, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gave support for ListAcls and added the description for it.
admin.go
Outdated
} | ||
|
||
if detail == nil { | ||
return ErrInvalidInput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's worth naming an error for this, it's better to give something less computer-friendly and more human-friendly (e.g. errors.New("You must specify topic details")
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sure , will do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the ErrInvalidInput from errors.go
Did you intend to push #1116 to this PR instead? |
admin.go
Outdated
|
||
import "errors" | ||
|
||
type ClusterAdmin interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my previous comment wasn't clear. I still think we should have a description for ClusterAdmin
, I just think it doesn't need to mention the bit about "This client was introduced in 0.11.0.0 and the API is still evolving."
admin.go
Outdated
import "errors" | ||
|
||
type ClusterAdmin interface { | ||
// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've still got a duplicate version sentence here.
admin.go
Outdated
// The validateOnly option is supported from version 0.10.2.0. | ||
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error | ||
|
||
// Delete a batch of topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence needs to be singular still since the method only deletes one at a time.
admin.go
Outdated
// new partitions. This operation is supported by brokers with version 1.0.0 or higher. | ||
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error | ||
|
||
//Delete records whose offset is smaller than the given offset of the corresponding partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing after the //
errors.go
Outdated
@@ -49,6 +49,8 @@ var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable") | |||
// the metadata. | |||
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") | |||
|
|||
var ErrReferenceNodeNotFound = errors.New("kafka: response did not contain a reference to node") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this fundamentally any different from ErrIncompleteResponse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was looking at the java documentation in kafka when ever topics creation is not successful or the response we dont get a response to the topic which we requested changes for it was returning some thing like that . I can change it to use ErrIncompleteResponse but it doesn't say the intent that the specified resource is not found. I will wait for your response and take that action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I fully understand, if something doesn't work the broker should still reply with that topic/partition, just with an error code set? If it's omitting the topic/partition entirely that's new behaviour.
At least as written, NodeNotFound doesn't contain any more information beyond IncompleteResponse about the "why", so I'd prefer to stick with IncompleteResponse until we have something more meaningful to put here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok sure will switch to do that .
This looks pretty good to me now. Could you please squash/rebase your commits? |
Done squashing all the commits . |
Thanks! |
@eapache @buyology @Mongey @mimaison @ongardie-ebay
I started working on ClusterAdmin support and wrote an initial draft version. Want to get your comments that I am moving in the right direction. After your comments i will close to the pull request.