From bed79b9f03b4482bb432940fefd6b8eae22608d6 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Thu, 2 Nov 2023 15:37:15 +0100 Subject: [PATCH] vtgate/engine: Fix race condition in join logic This change fixes a race condition in the join logic. This is a classic check-then-act style bug. The problem is that we were using a field to track if we already sent fields. This can lead to a concurrency issue though: - Thread 1 checks if it needs to send fields and sees that it does. - Thread 2 checks if it needs to send fields and sees it does not. - Thread 2 sends the result with no fields. - Downstream consumers crash because they get a first result set with no fields. There's no locking around the critical section of checking if we need to send fields and actually calling the callback. We need both those to be covered under the same critical section. This change here introduces a lock around this to make sure this happens. With the lock, it normally would not be necessary to use an atomic boolean flag, as we'd only read and write the flag inside the lock. We have another optimization here though, which is that we only want to ask the right hand side for fields once as well but it's called in each callback for the left hand side. By using still an atomic, we can also read this value too. Note that this is still actually racy, but it's has a worse case condition of reading the fields of the right hand side twice which is not optimal, but it will not break things. Signed-off-by: Dirkjan Bussink --- go/vt/vtgate/engine/join.go | 39 ++++++++++++++++------- go/vt/vtgate/engine/scalar_aggregation.go | 4 +-- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgate/engine/join.go b/go/vt/vtgate/engine/join.go index 1c3adc1f5c9..ef50389c989 100644 --- a/go/vt/vtgate/engine/join.go +++ b/go/vt/vtgate/engine/join.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync" "sync/atomic" "vitess.io/vitess/go/sqltypes" @@ -115,22 +116,31 @@ func bindvarForType(t querypb.Type) *querypb.BindVariable { // TryStreamExecute performs a streaming exec. func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - var fieldNeeded atomic.Bool - fieldNeeded.Store(wantfields) - err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, fieldNeeded.Load(), func(lresult *sqltypes.Result) error { + var mu sync.Mutex + // We need to use this atomic since we're also reading this + // value outside of it being locked with the mu lock. + // This is still racy, but worst case it means that we may + // retrieve the right hand side fields twice instead of once. + var fieldsSent atomic.Bool + fieldsSent.Store(!wantfields) + err := vcursor.StreamExecutePrimitive(ctx, jn.Left, bindVars, wantfields, func(lresult *sqltypes.Result) error { joinVars := make(map[string]*querypb.BindVariable) for _, lrow := range lresult.Rows { for k, col := range jn.Vars { joinVars[k] = sqltypes.ValueBindVariable(lrow[col]) } var rowSent atomic.Bool - err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), fieldNeeded.Load(), func(rresult *sqltypes.Result) error { + err := vcursor.StreamExecutePrimitive(ctx, jn.Right, combineVars(bindVars, joinVars), !fieldsSent.Load(), func(rresult *sqltypes.Result) error { + // This needs to be locking since it's not safe to just use + // fieldsSent. This is because we can't have a race between + // checking fieldsSent and then actually calling the callback + // and in parallel another goroutine doing the same. That + // can lead to out of order execution of the callback. So the callback + // itself and the check need to be covered by the same lock. + mu.Lock() + defer mu.Unlock() result := &sqltypes.Result{} - if fieldNeeded.Load() { - // This code is currently unreachable because the first result - // will always be just the field info, which will cause the outer - // wantfields code path to be executed. But this may change in the future. - fieldNeeded.Store(false) + if fieldsSent.CompareAndSwap(false, true) { result.Fields = joinFields(lresult.Fields, rresult.Fields, jn.Cols) } for _, rrow := range rresult.Rows { @@ -154,8 +164,15 @@ func (jn *Join) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars return callback(result) } } - if fieldNeeded.Load() { - fieldNeeded.Store(false) + // This needs to be locking since it's not safe to just use + // fieldsSent. This is because we can't have a race between + // checking fieldsSent and then actually calling the callback + // and in parallel another goroutine doing the same. That + // can lead to out of order execution of the callback. So the callback + // itself and the check need to be covered by the same lock. + mu.Lock() + defer mu.Unlock() + if fieldsSent.CompareAndSwap(false, true) { for k := range jn.Vars { joinVars[k] = sqltypes.NullBindVariable } diff --git a/go/vt/vtgate/engine/scalar_aggregation.go b/go/vt/vtgate/engine/scalar_aggregation.go index 6190e2e5fd6..85e90420ff9 100644 --- a/go/vt/vtgate/engine/scalar_aggregation.go +++ b/go/vt/vtgate/engine/scalar_aggregation.go @@ -112,7 +112,7 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor var mu sync.Mutex var agg aggregationState var fields []*querypb.Field - var fieldsSent bool + fieldsSent := !wantfields err := vcursor.StreamExecutePrimitive(ctx, sa.Input, bindVars, wantfields, func(result *sqltypes.Result) error { // as the underlying primitive call is not sync @@ -121,7 +121,7 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor mu.Lock() defer mu.Unlock() - if agg == nil { + if agg == nil && len(result.Fields) != 0 { var err error agg, fields, err = newAggregation(result.Fields, sa.Aggregates) if err != nil {