-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update kafka broker query #1
Update kafka broker query #1
Conversation
@urso Change LGTM. Tests are failing because updates to the fields are missing. Can you update this? |
- on connect try to find the broker id (address must match advertised host). - check broker is leader before querying offsets - query offsets for all replicas - remove 'isr' from event, and replace with boolean flag `insync_replica` - replace `replicas` from event with per event `replica`-id - update sarama to get offset per replica id
19a2e8d
to
b0ceb7a
Compare
|
||
response, err := m.broker.GetMetadata(&sarama.MetadataRequest{}) | ||
defer b.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we close and reconnect every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. We query the broker every 10 seconds. No need to keep mostly idle connections.
offsets = common.MapStr{ | ||
"newest": offNewest, | ||
"oldest": offOldest, | ||
"error": nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we even add error if it is nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using nil, to keep mostly the same event format then before. We can remove the error field though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove it if empty as we do the same for other fields which do not have content.
@@ -22,11 +22,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) { | |||
} | |||
|
|||
type OffsetRequest struct { | |||
blocks map[string]map[int32]*offsetRequestBlock | |||
replicaID *int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure that we port these changes back to your fork of sarama
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's already in my fork. Isn't the glide.yaml file updated in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. it is updated.
return false | ||
} | ||
|
||
func queryOffsetRange( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a doc comment would be nice here on what the different offset are that are returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe, this one needs some more testing (like, see if it really returns correct indidces).
@urso Merging this one, so I can continue on the other PR. |
on connect try to find the broker id (address must match advertised host).
check broker is leader before querying offsets
query offsets for all replicas
remove 'isr' from event, and replace with boolean flag
insync_replica
replace
replicas
from event with per eventreplica
-idupdate sarama to get offset per replica id