From f981077e71d247fedb00189dc91a301cb4e3790b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 23 Jan 2024 17:53:08 -0800 Subject: [PATCH] distsql: reduce logging around outbox / inbox connection errors This commit unifies the error handling around outboxes of two execution engines as well as hides those errors behind verbosity level of 1 to reduce the log spam that might occur in some excepted cases (e.g. during the rolling upgrade). It also hides a warning during distsql physical planning in UA / Serverless mode behind a verbosity level. Release note: None --- pkg/sql/colflow/colrpc/inbox.go | 4 +--- pkg/sql/colflow/colrpc/outbox.go | 20 ++++---------------- pkg/sql/distsql_physical_planner.go | 2 +- pkg/sql/flowinfra/outbox.go | 11 +++-------- 4 files changed, 9 insertions(+), 28 deletions(-) diff --git a/pkg/sql/colflow/colrpc/inbox.go b/pkg/sql/colflow/colrpc/inbox.go index 6896425ee6c3..b31e08bbf1f4 100644 --- a/pkg/sql/colflow/colrpc/inbox.go +++ b/pkg/sql/colflow/colrpc/inbox.go @@ -442,9 +442,7 @@ func (i *Inbox) GetNumMessages() int64 { func (i *Inbox) sendDrainSignal(ctx context.Context) error { log.VEvent(ctx, 2, "Inbox sending drain signal to Outbox") if err := i.stream.Send(&execinfrapb.ConsumerSignal{DrainRequest: &execinfrapb.DrainRequest{}}); err != nil { - if log.V(1) { - log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err) - } + log.VWarningf(ctx, 1, "Inbox unable to send drain signal to Outbox: %+v", err) return err } return nil diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 9d1549118800..5a3fd945e456 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -186,11 +186,7 @@ func (o *Outbox) Run( if err := func() error { conn, err := execinfra.GetConnForOutbox(ctx, dialer, sqlInstanceID, connectionTimeout) if err != nil { - log.Warningf( - ctx, - "Outbox Dial connection error, distributed query will fail: %+v", - err, - ) + log.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err) return err } @@ -202,11 +198,7 @@ func (o *Outbox) Run( // gRPC stream being ungracefully shutdown too. stream, err = client.FlowStream(flowCtx) if err != nil { - log.Warningf( - ctx, - "Outbox FlowStream connection error, distributed query will fail: %+v", - err, - ) + log.VWarningf(ctx, 1, "Outbox FlowStream connection error, distributed query will fail: %+v", err) return err } @@ -214,14 +206,10 @@ func (o *Outbox) Run( // the first message with data, consider doing that here too. log.VEvent(ctx, 2, "Outbox sending header") // Send header message to establish the remote server (consumer). - if err := stream.Send( + if err = stream.Send( &execinfrapb.ProducerMessage{Header: &execinfrapb.ProducerHeader{FlowID: o.flowCtx.ID, StreamID: streamID}}, ); err != nil { - log.Warningf( - ctx, - "Outbox Send header error, distributed query will fail: %+v", - err, - ) + log.VWarningf(ctx, 1, "Outbox Send header error, distributed query will fail: %+v", err) return err } return nil diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 567f2fdce585..3ab8f05dd600 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1577,7 +1577,7 @@ func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver( if _, ok := healthyNodes[sqlInstance]; ok { return sqlInstance, SpanPartitionReason_TARGET_HEALTHY } - log.Warningf(ctx, "not planning on node %d", sqlInstance) + log.VWarningf(ctx, 1, "not planning on node %d", sqlInstance) return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY } } diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 917d5f534e7c..7f8608bccda1 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -189,7 +189,7 @@ func (m *Outbox) flush(ctx context.Context) error { HandleStreamErr(ctx, "flushing", sendErr, m.flowCtxCancel, m.outboxCtxCancel) // Make sure the stream is not used any more. m.stream = nil - log.VErrEventf(ctx, 1, "Outbox flush error: %s", sendErr) + log.VWarningf(ctx, 1, "Outbox flush error: %s", sendErr) } else { log.VEvent(ctx, 2, "Outbox flushed") } @@ -240,10 +240,7 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error ctx, m.flowCtx.Cfg.SQLInstanceDialer, m.sqlInstanceID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV), ) if err != nil { - // Log any Dial errors. This does not have a verbosity check due to being - // a critical part of query execution: if this step doesn't work, the - // receiving side might end up hanging or timing out. - log.Infof(ctx, "outbox: connection dial error: %+v", err) + log.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err) return err } client := execinfrapb.NewDistSQLClient(conn) @@ -252,9 +249,7 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error } m.stream, err = client.FlowStream(ctx) if err != nil { - if log.V(1) { - log.Infof(ctx, "FlowStream error: %s", err) - } + log.VWarningf(ctx, 1, "Outbox FlowStream connection error, distributed query will fail: %+v", err) return err } return nil