From 43035f6653fc5f06ec1ba1322a622688e3e9d40b Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Sun, 8 Sep 2024 11:21:28 -0500 Subject: [PATCH] broker/client: quiet more noisy logging Don't warn on a clean server-side close of a List RPC that doesn't ever return a response. This is expected for a resumed listing which doesn't change. Use an attached Route of a previously completed Read, if available, and more importantly: clear the last Response of the previous stream. We could enter a condition where this was never cleared (for example, on NOT_JOURNAL_BROKER) if the _following_ request went to the correct broker, stayed open for a while with no data, and was then closed server-side. This can cause mis-leading and false-positive logged warnings. Update RetryReader to account for attempt being incremented on the next loop iteration. --- broker/client/list.go | 3 +++ broker/client/reader.go | 16 ++++++++++++---- broker/client/retry_reader.go | 4 +++- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/broker/client/list.go b/broker/client/list.go index 6f54125d..f6903943 100644 --- a/broker/client/list.go +++ b/broker/client/list.go @@ -97,6 +97,9 @@ func (pl *WatchedList) watch() { } else { stream = nil // Must restart. + if err == io.EOF { + attempt = 0 // Clean server-side close. + } // Wait for back-off timer or context cancellation. select { case <-pl.ctx.Done(): diff --git a/broker/client/reader.go b/broker/client/reader.go index 0f7d6dfe..05418769 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -95,10 +95,18 @@ func (r *Reader) Read(p []byte) (n int, err error) { // Lazy initialization: begin the Read RPC. if r.stream == nil { - if r.stream, err = r.client.Read( - pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false), - &r.Request, - ); err == nil { + var ctx context.Context + + // Prefer a prior response header route, and fall back to the route cache. + if r.Response.Header != nil { + ctx = pb.WithDispatchRoute(r.ctx, r.Response.Header.Route, pb.ProcessSpec_ID{}) + } else { + ctx = pb.WithDispatchItemRoute(r.ctx, r.client, r.Request.Journal.String(), false) + } + // Clear last response of previous stream. + r.Response = pb.ReadResponse{} + + if r.stream, err = r.client.Read(ctx, &r.Request); err == nil { n, err = r.Read(p) // Recurse to attempt read against opened |r.stream|. } else { err = mapGRPCCtxErr(r.ctx, err) diff --git a/broker/client/retry_reader.go b/broker/client/retry_reader.go index c62da1ea..00409720 100644 --- a/broker/client/retry_reader.go +++ b/broker/client/retry_reader.go @@ -117,7 +117,7 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) { // any data before being closed server-side, so clear `attempts` to // reduce log noise: it's common to next see ErrNotJournalBroker // on broker topology changes or when authorizations are refreshed. - attempt = 0 + attempt = -1 squelch = true default: } @@ -222,6 +222,8 @@ func backoff(attempt int) time.Duration { // involves a couple of Nagle-like read delays (~30ms) as Etcd watch // updates are applied by participants. switch attempt { + case -1: + return 0 case 0, 1: return time.Millisecond * 50 case 2, 3: