Skip to content

Commit

Permalink
flowinfra: fix incomplete shutdown in some cases
Browse files Browse the repository at this point in the history
This commit fixes an incomplete shutdown of distributed plans in face of
"no inbound stream connection" errors. Note that "incomplete" here means
allowing some parts of the plan run to completion (rather than leaking
resources).

Whenever this "no inbound stream connection" error occurs, we send it to
the timed out receivers, and eventually the error will make its way to
the DistSQLReceiver which transitions the whole plan into the draining
state. However, non-timed out streams will only learn about the draining
state when they send something on the streams which might not happen for
a while (and in case of TTL processors will not happen until
completion).

This commit addresses this inefficiency by also cancelling the flow on
the node to which some streams didn't connect in time. This triggers
ungraceful and quick shutdown of the whole plan, and it seems like
a more intuitive behavior given that the whole plan still resulted in an
error. The only downside of this change that I can see is that now the
resulting error can be "query canceled" rather than "no inbound stream"
since there could be a race between the original error and the context
cancellation error.

Release note: None
  • Loading branch information
yuzefovich committed Jan 25, 2024
1 parent a5ed7a0 commit f931435
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
5 changes: 3 additions & 2 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,9 @@ func (f *FlowBase) MemUsage() int64 {
func (f *FlowBase) Cancel() {
f.mu.Lock()
defer f.mu.Unlock()
if f.mu.status == flowFinished {
// The Flow is already done, nothing to cancel.
if f.mu.status == flowFinished || f.mu.ctxCancel == nil {
// The Flow is already done, nothing to cancel. ctxCancel can be nil in
// some tests.
return
}
f.mu.ctxCancel()
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,16 @@ func (fr *FlowRegistry) RegisterFlow(
// drain all the processors.
numTimedOutReceivers := fr.cancelPendingStreams(id, errNoInboundStreamConnection)
if numTimedOutReceivers != 0 {
// The whole plan will error out. So far we only pushed the
// error to the timed out receivers, and eventually it'll make
// its way to the DistSQLReceiver which will transition the plan
// into draining state. However, non-timed out streams will only
// learn about the error and the draining state the next time
// the producer sends something on the stream which might not
// happen for a while. To speed up the shutdown of the whole
// plan we cancel the flow on this node which will trigger quick
// ungraceful shutdown of the whole plan.
f.Cancel()
// The span in the context might be finished by the time this runs. In
// principle, we could ForkSpan() beforehand, but we don't want to
// create the extra span every time.
Expand Down
19 changes: 14 additions & 5 deletions pkg/sql/flowinfra/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -215,7 +216,10 @@ func TestStreamConnectionTimeout(t *testing.T) {
// to connect a stream, but it'll be too late.
id1 := execinfrapb.FlowID{UUID: uuid.MakeV4()}
f1 := &FlowBase{}
f1.mu.ctxCancel = func() {}
var canceled atomic.Bool
f1.mu.ctxCancel = func() {
canceled.Store(true)
}
streamID1 := execinfrapb.StreamID(1)
consumer := &distsqlutils.RowBuffer{}
wg := &sync.WaitGroup{}
Expand All @@ -239,6 +243,9 @@ func TestStreamConnectionTimeout(t *testing.T) {
if !si.mu.canceled {
return errors.Errorf("not timed out yet")
}
if !canceled.Load() {
return errors.New("expected ctxCancel to have been called")
}
return nil
})

Expand Down Expand Up @@ -739,10 +746,10 @@ func (s *delayedErrorServerStream) Send(*execinfrapb.ConsumerSignal) error {
// inbound stream timeout is reached
// - that timeout "cancels" the single pending flow; this cancellation results
// in the flow being marked as "canceled" and the wait group being
// decremented (as well as calling Timeout() on the receiver)
// decremented (as well as calling Timeout() on the receiver). It also
// results in the flow cancellation.
// - after the flow cancellation is performed, the "handshake" RPC results in
// an error which results in flow being properly canceled (by calling
// FlowBase.ctxCancel).
// an error too, which doesn't actually matter at this point.
//
// Before #94113 was fixed, the flow would be incorrectly marked as "connected"
// before the "handshake" RPC was issued, so the inbound stream timeout would
Expand Down Expand Up @@ -814,7 +821,9 @@ func TestErrorOnSlowHandshake(t *testing.T) {
// Make sure that the wait group is properly decremented (this must have
// been done by flowEntry.streamTimer too).
wg.Wait()
// Since the RPC resulted in an error, we expect that the flow is canceled.
// We expect that Flow.Cancel is called twice - once when the inbound stream
// timed out, and again when the RPC results in an error.
<-cancelCh
<-cancelCh
err := <-errCh
if err == nil {
Expand Down

0 comments on commit f931435

Please sign in to comment.