Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-3.0' into release-3.0-…
Browse files Browse the repository at this point in the history
…13734
  • Loading branch information
tangenta committed Dec 17, 2019
2 parents f573d04 + a3ca490 commit db6bf5b
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 101 deletions.
2 changes: 1 addition & 1 deletion ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,7 +1938,7 @@ func (s *testIntegrationSuite3) TestInsertIntoGeneratedColumnWithDefaultExpr(c *

// generated columns with default function is not allowed
tk.MustExec("create table t5 (a int default 10, b int as (a+1))")
assertErrorCode(c, tk, "insert into t5 values (20, default(a))", mysql.ErrBadGeneratedColumn)
tk.MustGetErrCode("insert into t5 values (20, default(a))", mysql.ErrBadGeneratedColumn)

tk.MustExec("drop table t1")
tk.MustExec("drop table t2")
Expand Down
28 changes: 13 additions & 15 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ type HashAggFinalWorker struct {

// AfFinalResult indicates aggregation functions final result.
type AfFinalResult struct {
chk *chunk.Chunk
err error
chk *chunk.Chunk
err error
giveBackCh chan *chunk.Chunk
}

// HashAggExec deals with all the aggregate functions.
Expand Down Expand Up @@ -152,7 +153,6 @@ type HashAggExec struct {

finishCh chan struct{}
finalOutputCh chan *AfFinalResult
finalInputCh chan *chunk.Chunk
partialOutputChs []chan *HashAggIntermData
inputCh chan *HashAggInput
partialInputChs []chan *chunk.Chunk
Expand Down Expand Up @@ -257,7 +257,6 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
partialConcurrency := sessionVars.HashAggPartialConcurrency
e.isChildReturnEmpty = true
e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency)
e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency)
e.inputCh = make(chan *HashAggInput, partialConcurrency)
e.finishCh = make(chan struct{}, 1)

Expand Down Expand Up @@ -302,10 +301,11 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
groupSet: set.NewStringSet(),
inputCh: e.partialOutputChs[i],
outputCh: e.finalOutputCh,
finalResultHolderCh: e.finalInputCh,
finalResultHolderCh: make(chan *chunk.Chunk, 1),
rowBuffer: make([]types.Datum, 0, e.Schema().Len()),
mutableRow: chunk.MutRowFromTypes(retTypes(e)),
}
e.finalWorkers[i].finalResultHolderCh <- newFirstChunk(e)
}
}

Expand Down Expand Up @@ -490,14 +490,14 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) {
result.SetNumVirtualRows(result.NumRows() + 1)
}
if result.IsFull() {
w.outputCh <- &AfFinalResult{chk: result}
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
result, finished = w.receiveFinalResultHolder()
if finished {
return
}
}
}
w.outputCh <- &AfFinalResult{chk: result}
w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh}
}

func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
Expand Down Expand Up @@ -626,28 +626,26 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
if e.executed {
return nil
}
for !chk.IsFull() {
e.finalInputCh <- chk
for {
result, ok := <-e.finalOutputCh
if !ok { // all finalWorkers exited
if !ok {
e.executed = true
if chk.NumRows() > 0 { // but there are some data left
return nil
}
if e.isChildReturnEmpty && e.defaultVal != nil {
chk.Append(e.defaultVal, 0, 1)
}
e.isChildReturnEmpty = false
return nil
}
if result.err != nil {
return result.err
}
chk.SwapColumns(result.chk)
result.chk.Reset()
result.giveBackCh <- result.chk
if chk.NumRows() > 0 {
e.isChildReturnEmpty = false
return nil
}
}
return nil
}

// unparallelExec executes hash aggregation algorithm in single thread.
Expand Down
14 changes: 14 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,20 @@ func (s *testSuite1) TestOnlyFullGroupBy(c *C) {
c.Assert(terror.ErrorEqual(err, plannercore.ErrAmbiguous), IsTrue, Commentf("err %v", err))
}

func (s *testSuite1) TestIssue13652(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set sql_mode = 'ONLY_FULL_GROUP_BY'")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a real)")
tk.MustQuery("select a from t group by (a)")
tk.MustQuery("select a from t group by ((a))")
tk.MustQuery("select a from t group by +a")
tk.MustQuery("select a from t group by ((+a))")
_, err := tk.Exec("select a from t group by (-a)")
c.Assert(err.Error(), Equals, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by")
}

func (s *testSuite1) TestHaving(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand Down
61 changes: 0 additions & 61 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,67 +675,6 @@ func (s *testExecSuite) TestStreamAggRequiredRows(c *C) {
}
}

func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
aggFunc string
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: maxChunkSize,
aggFunc: ast.AggFuncSum,
requiredRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRows: []int{1, 2, 3, 4, 5, 6, 7},
expectedRowsDS: []int{maxChunkSize, 0},
gen: divGenerator(1),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{1, 3},
expectedRows: []int{1, 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(maxChunkSize),
},
{
totalRows: maxChunkSize * 3,
aggFunc: ast.AggFuncAvg,
requiredRows: []int{maxChunkSize, maxChunkSize},
expectedRows: []int{maxChunkSize, maxChunkSize / 2},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0},
gen: divGenerator(2),
},
}

for _, hasDistinct := range []bool{false, true} {
for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
childCols := ds.Schema().Columns
schema := expression.NewSchema(childCols...)
groupBy := []expression.Expression{childCols[1]}
aggFunc, err := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, hasDistinct)
c.Assert(err, IsNil)
aggFuncs := []*aggregation.AggFuncDesc{aggFunc}
exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy)
c.Assert(exec.Open(ctx), IsNil)
chk := newFirstChunk(exec)
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
}

func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) {
justReturn1 := func(valType *types.FieldType) interface{} {
switch valType.Tp {
Expand Down
5 changes: 3 additions & 2 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1737,14 +1737,15 @@ func (b *PlanBuilder) checkOnlyFullGroupByWithGroupClause(p LogicalPlan, sel *as
gbyExprs := make([]ast.ExprNode, 0, len(sel.Fields.Fields))
schema := p.Schema()
for _, byItem := range sel.GroupBy.Items {
if colExpr, ok := byItem.Expr.(*ast.ColumnNameExpr); ok {
expr := getInnerFromParenthesesAndUnaryPlus(byItem.Expr)
if colExpr, ok := expr.(*ast.ColumnNameExpr); ok {
col, err := schema.FindColumn(colExpr.Name)
if err != nil || col == nil {
continue
}
gbyCols[col] = struct{}{}
} else {
gbyExprs = append(gbyExprs, byItem.Expr)
gbyExprs = append(gbyExprs, expr)
}
}

Expand Down
39 changes: 17 additions & 22 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,35 +815,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if err1 != nil {
return errors.Trace(err1)
}
// Check lock conflict error for nowait, if nowait set and key locked by others,
// report error immediately and do no more resolve locks.
// if the lock left behind whose related txn is already committed or rollbacked,
// (eg secondary locks not committed or rollbacked yet)
// we cant return "nowait conflict" directly
if lock.LockType == pb.Op_PessimisticLock {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, check the lock wait timeout or not
// the pessimistic lock found could be invalid locks which is timeout but not recycled yet
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
msBeforeTxnExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}

// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return timeout error if necessary.
if msBeforeTxnExpired > 0 {
if action.lockWaitTime == kv.LockNoWait {
// 3.0 release not supported yet
return kv.ErrNotImplemented
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
Expand Down
34 changes: 34 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,40 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150))
}

// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction.
// The lock's own TTL is expired but the primary key is still alive due to heartbeats.
func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
// k1 is the primary lock of txn1
k1 := kv.Key("k1")
// k2 is a secondary lock of txn1 and a key txn2 wants to lock
k2 := kv.Key("k2")

txn1 := s.begin(c)
txn1.SetOption(kv.Pessimistic, true)
// lock the primary key
err := txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k1)
c.Assert(err, IsNil)
// lock the secondary key
err = txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k2)
c.Assert(err, IsNil)

// Heartbeats will increase the TTL of the primary key

// wait until secondary key exceeds its own TTL
time.Sleep(time.Duration(PessimisticLockTTL) * time.Millisecond)
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)

// test for wait limited time (300ms)
startTime := time.Now()
err = txn2.LockKeys(context.Background(), nil, txn1.startTS, 300, time.Now(), k2)
elapsed := time.Now().Sub(startTime)
// cannot acquire lock in time thus error
c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error())
// it should return after about 300ms
c.Assert(elapsed, Less, 350*time.Millisecond)
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
txn := s.begin(c)
err := txn.Set(key, key)
Expand Down

0 comments on commit db6bf5b

Please sign in to comment.