Skip to content

Commit

Permalink
Merge #118260
Browse files Browse the repository at this point in the history
118260: distsql: reduce logging around outbox / inbox connection errors r=yuzefovich a=yuzefovich

This commit unifies the error handling around outboxes of two execution engines as well as hides those errors behind verbosity level of 1 to reduce the log spam that might occur in some excepted cases (e.g. during the rolling upgrade).

It also hides a warning during distsql physical planning in UA / Serverless mode behind a verbosity level.

Fixes: #117975.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 24, 2024
2 parents 7f540c4 + f981077 commit a5ed7a0
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 28 deletions.
4 changes: 1 addition & 3 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,7 @@ func (i *Inbox) GetNumMessages() int64 {
func (i *Inbox) sendDrainSignal(ctx context.Context) error {
log.VEvent(ctx, 2, "Inbox sending drain signal to Outbox")
if err := i.stream.Send(&execinfrapb.ConsumerSignal{DrainRequest: &execinfrapb.DrainRequest{}}); err != nil {
if log.V(1) {
log.Warningf(ctx, "Inbox unable to send drain signal to Outbox: %+v", err)
}
log.VWarningf(ctx, 1, "Inbox unable to send drain signal to Outbox: %+v", err)
return err
}
return nil
Expand Down
20 changes: 4 additions & 16 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@ func (o *Outbox) Run(
if err := func() error {
conn, err := execinfra.GetConnForOutbox(ctx, dialer, sqlInstanceID, connectionTimeout)
if err != nil {
log.Warningf(
ctx,
"Outbox Dial connection error, distributed query will fail: %+v",
err,
)
log.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err)
return err
}

Expand All @@ -202,26 +198,18 @@ func (o *Outbox) Run(
// gRPC stream being ungracefully shutdown too.
stream, err = client.FlowStream(flowCtx)
if err != nil {
log.Warningf(
ctx,
"Outbox FlowStream connection error, distributed query will fail: %+v",
err,
)
log.VWarningf(ctx, 1, "Outbox FlowStream connection error, distributed query will fail: %+v", err)
return err
}

// TODO(yuzefovich): the row-based outbox sends the header as part of
// the first message with data, consider doing that here too.
log.VEvent(ctx, 2, "Outbox sending header")
// Send header message to establish the remote server (consumer).
if err := stream.Send(
if err = stream.Send(
&execinfrapb.ProducerMessage{Header: &execinfrapb.ProducerHeader{FlowID: o.flowCtx.ID, StreamID: streamID}},
); err != nil {
log.Warningf(
ctx,
"Outbox Send header error, distributed query will fail: %+v",
err,
)
log.VWarningf(ctx, 1, "Outbox Send header error, distributed query will fail: %+v", err)
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance, SpanPartitionReason_TARGET_HEALTHY
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
log.VWarningf(ctx, 1, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID, SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY
}
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *Outbox) flush(ctx context.Context) error {
HandleStreamErr(ctx, "flushing", sendErr, m.flowCtxCancel, m.outboxCtxCancel)
// Make sure the stream is not used any more.
m.stream = nil
log.VErrEventf(ctx, 1, "Outbox flush error: %s", sendErr)
log.VWarningf(ctx, 1, "Outbox flush error: %s", sendErr)
} else {
log.VEvent(ctx, 2, "Outbox flushed")
}
Expand Down Expand Up @@ -240,10 +240,7 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
ctx, m.flowCtx.Cfg.SQLInstanceDialer, m.sqlInstanceID, SettingFlowStreamTimeout.Get(&m.flowCtx.Cfg.Settings.SV),
)
if err != nil {
// Log any Dial errors. This does not have a verbosity check due to being
// a critical part of query execution: if this step doesn't work, the
// receiving side might end up hanging or timing out.
log.Infof(ctx, "outbox: connection dial error: %+v", err)
log.VWarningf(ctx, 1, "Outbox Dial connection error, distributed query will fail: %+v", err)
return err
}
client := execinfrapb.NewDistSQLClient(conn)
Expand All @@ -252,9 +249,7 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
}
m.stream, err = client.FlowStream(ctx)
if err != nil {
if log.V(1) {
log.Infof(ctx, "FlowStream error: %s", err)
}
log.VWarningf(ctx, 1, "Outbox FlowStream connection error, distributed query will fail: %+v", err)
return err
}
return nil
Expand Down

0 comments on commit a5ed7a0

Please sign in to comment.