Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker/client: quiet more noisy logging #399

Merged
merged 1 commit into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions broker/client/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (pl *WatchedList) watch() {
} else {
stream = nil // Must restart.

if err == io.EOF {
attempt = 0 // Clean server-side close.
}
// Wait for back-off timer or context cancellation.
select {
case <-pl.ctx.Done():
Expand Down
16 changes: 12 additions & 4 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,18 @@ func (r *Reader) Read(p []byte) (n int, err error) {

// Lazy initialization: begin the Read RPC.
if r.stream == nil {
if r.stream, err = r.client.Read(
pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false),
&r.Request,
); err == nil {
var ctx context.Context

// Prefer a prior response header route, and fall back to the route cache.
if r.Response.Header != nil {
ctx = pb.WithDispatchRoute(r.ctx, r.Response.Header.Route, pb.ProcessSpec_ID{})
} else {
ctx = pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false)
}
// Clear last response of previous stream.
r.Response = pb.ReadResponse{}

if r.stream, err = r.client.Read(ctx, &r.Request); err == nil {
n, err = r.Read(p) // Recurse to attempt read against opened |r.stream|.
} else {
err = mapGRPCCtxErr(r.ctx, err)
Expand Down
4 changes: 3 additions & 1 deletion broker/client/retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) {
// any data before being closed server-side, so clear `attempts` to
// reduce log noise: it's common to next see ErrNotJournalBroker
// on broker topology changes or when authorizations are refreshed.
attempt = 0
attempt = -1
squelch = true
default:
}
Expand Down Expand Up @@ -222,6 +222,8 @@ func backoff(attempt int) time.Duration {
// involves a couple of Nagle-like read delays (~30ms) as Etcd watch
// updates are applied by participants.
switch attempt {
case -1:
return 0
case 0, 1:
return time.Millisecond * 50
case 2, 3:
Expand Down
Loading