diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 356fd112e488e..fcb69d7251bcb 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -1938,7 +1938,7 @@ func (s *testIntegrationSuite3) TestInsertIntoGeneratedColumnWithDefaultExpr(c * // generated columns with default function is not allowed tk.MustExec("create table t5 (a int default 10, b int as (a+1))") - assertErrorCode(c, tk, "insert into t5 values (20, default(a))", mysql.ErrBadGeneratedColumn) + tk.MustGetErrCode("insert into t5 values (20, default(a))", mysql.ErrBadGeneratedColumn) tk.MustExec("drop table t1") tk.MustExec("drop table t2") diff --git a/executor/aggregate.go b/executor/aggregate.go index 1663736c49804..1a23837e2d267 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -86,8 +86,9 @@ type HashAggFinalWorker struct { // AfFinalResult indicates aggregation functions final result. type AfFinalResult struct { - chk *chunk.Chunk - err error + chk *chunk.Chunk + err error + giveBackCh chan *chunk.Chunk } // HashAggExec deals with all the aggregate functions. @@ -152,7 +153,6 @@ type HashAggExec struct { finishCh chan struct{} finalOutputCh chan *AfFinalResult - finalInputCh chan *chunk.Chunk partialOutputChs []chan *HashAggIntermData inputCh chan *HashAggInput partialInputChs []chan *chunk.Chunk @@ -257,7 +257,6 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { partialConcurrency := sessionVars.HashAggPartialConcurrency e.isChildReturnEmpty = true e.finalOutputCh = make(chan *AfFinalResult, finalConcurrency) - e.finalInputCh = make(chan *chunk.Chunk, finalConcurrency) e.inputCh = make(chan *HashAggInput, partialConcurrency) e.finishCh = make(chan struct{}, 1) @@ -302,10 +301,11 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { groupSet: set.NewStringSet(), inputCh: e.partialOutputChs[i], outputCh: e.finalOutputCh, - finalResultHolderCh: e.finalInputCh, + finalResultHolderCh: make(chan *chunk.Chunk, 1), rowBuffer: make([]types.Datum, 0, e.Schema().Len()), mutableRow: chunk.MutRowFromTypes(retTypes(e)), } + e.finalWorkers[i].finalResultHolderCh <- newFirstChunk(e) } } @@ -490,14 +490,14 @@ func (w *HashAggFinalWorker) getFinalResult(sctx sessionctx.Context) { result.SetNumVirtualRows(result.NumRows() + 1) } if result.IsFull() { - w.outputCh <- &AfFinalResult{chk: result} + w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} result, finished = w.receiveFinalResultHolder() if finished { return } } } - w.outputCh <- &AfFinalResult{chk: result} + w.outputCh <- &AfFinalResult{chk: result, giveBackCh: w.finalResultHolderCh} } func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) { @@ -626,28 +626,26 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error if e.executed { return nil } - for !chk.IsFull() { - e.finalInputCh <- chk + for { result, ok := <-e.finalOutputCh - if !ok { // all finalWorkers exited + if !ok { e.executed = true - if chk.NumRows() > 0 { // but there are some data left - return nil - } if e.isChildReturnEmpty && e.defaultVal != nil { chk.Append(e.defaultVal, 0, 1) } - e.isChildReturnEmpty = false return nil } if result.err != nil { return result.err } + chk.SwapColumns(result.chk) + result.chk.Reset() + result.giveBackCh <- result.chk if chk.NumRows() > 0 { e.isChildReturnEmpty = false + return nil } } - return nil } // unparallelExec executes hash aggregation algorithm in single thread. diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index fca05420430ad..d5f1b80a25944 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -566,6 +566,20 @@ func (s *testSuite1) TestOnlyFullGroupBy(c *C) { c.Assert(terror.ErrorEqual(err, plannercore.ErrAmbiguous), IsTrue, Commentf("err %v", err)) } +func (s *testSuite1) TestIssue13652(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set sql_mode = 'ONLY_FULL_GROUP_BY'") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a real)") + tk.MustQuery("select a from t group by (a)") + tk.MustQuery("select a from t group by ((a))") + tk.MustQuery("select a from t group by +a") + tk.MustQuery("select a from t group by ((+a))") + _, err := tk.Exec("select a from t group by (-a)") + c.Assert(err.Error(), Equals, "[planner:1055]Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column 'test.t.a' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by") +} + func (s *testSuite1) TestHaving(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 5883a904c09e9..804ccec90031d 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -675,67 +675,6 @@ func (s *testExecSuite) TestStreamAggRequiredRows(c *C) { } } -func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) { - maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize - testCases := []struct { - totalRows int - aggFunc string - requiredRows []int - expectedRows []int - expectedRowsDS []int - gen func(valType *types.FieldType) interface{} - }{ - { - totalRows: maxChunkSize, - aggFunc: ast.AggFuncSum, - requiredRows: []int{1, 2, 3, 4, 5, 6, 7}, - expectedRows: []int{1, 2, 3, 4, 5, 6, 7}, - expectedRowsDS: []int{maxChunkSize, 0}, - gen: divGenerator(1), - }, - { - totalRows: maxChunkSize * 3, - aggFunc: ast.AggFuncAvg, - requiredRows: []int{1, 3}, - expectedRows: []int{1, 2}, - expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0}, - gen: divGenerator(maxChunkSize), - }, - { - totalRows: maxChunkSize * 3, - aggFunc: ast.AggFuncAvg, - requiredRows: []int{maxChunkSize, maxChunkSize}, - expectedRows: []int{maxChunkSize, maxChunkSize / 2}, - expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 0}, - gen: divGenerator(2), - }, - } - - for _, hasDistinct := range []bool{false, true} { - for _, testCase := range testCases { - sctx := defaultCtx() - ctx := context.Background() - ds := newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen) - childCols := ds.Schema().Columns - schema := expression.NewSchema(childCols...) - groupBy := []expression.Expression{childCols[1]} - aggFunc, err := aggregation.NewAggFuncDesc(sctx, testCase.aggFunc, []expression.Expression{childCols[0]}, hasDistinct) - c.Assert(err, IsNil) - aggFuncs := []*aggregation.AggFuncDesc{aggFunc} - exec := buildHashAggExecutor(sctx, ds, schema, aggFuncs, groupBy) - c.Assert(exec.Open(ctx), IsNil) - chk := newFirstChunk(exec) - for i := range testCase.requiredRows { - chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize) - c.Assert(exec.Next(ctx, chk), IsNil) - c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i]) - } - c.Assert(exec.Close(), IsNil) - c.Assert(ds.checkNumNextCalled(), IsNil) - } - } -} - func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) { justReturn1 := func(valType *types.FieldType) interface{} { switch valType.Tp { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index a79f29a08fa88..489abead9afbf 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -1737,14 +1737,15 @@ func (b *PlanBuilder) checkOnlyFullGroupByWithGroupClause(p LogicalPlan, sel *as gbyExprs := make([]ast.ExprNode, 0, len(sel.Fields.Fields)) schema := p.Schema() for _, byItem := range sel.GroupBy.Items { - if colExpr, ok := byItem.Expr.(*ast.ColumnNameExpr); ok { + expr := getInnerFromParenthesesAndUnaryPlus(byItem.Expr) + if colExpr, ok := expr.(*ast.ColumnNameExpr); ok { col, err := schema.FindColumn(colExpr.Name) if err != nil || col == nil { continue } gbyCols[col] = struct{}{} } else { - gbyExprs = append(gbyExprs, byItem.Expr) + gbyExprs = append(gbyExprs, expr) } } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 0f07bb6863027..fff7fbed84c43 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -815,35 +815,30 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err1 != nil { return errors.Trace(err1) } - // Check lock conflict error for nowait, if nowait set and key locked by others, - // report error immediately and do no more resolve locks. - // if the lock left behind whose related txn is already committed or rollbacked, - // (eg secondary locks not committed or rollbacked yet) - // we cant return "nowait conflict" directly - if lock.LockType == pb.Op_PessimisticLock { - if action.lockWaitTime == kv.LockNoWait { - // 3.0 release not supported yet - return kv.ErrNotImplemented - } else if action.lockWaitTime == kv.LockAlwaysWait { - // do nothing but keep wait - } else { - // the lockWaitTime is set, check the lock wait timeout or not - // the pessimistic lock found could be invalid locks which is timeout but not recycled yet - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { - return ErrLockWaitTimeout - } - } - } - } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. - _, err = c.store.lockResolver.ResolveLocks(bo, locks) + msBeforeTxnExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } + // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring + // the pessimistic lock. We should return timeout error if necessary. + if msBeforeTxnExpired > 0 { + if action.lockWaitTime == kv.LockNoWait { + // 3.0 release not supported yet + return kv.ErrNotImplemented + } else if action.lockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock + if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { + return ErrLockWaitTimeout + } + } + } + // Handle the killed flag when waiting for the pessimistic lock. // When a txn runs into LockKeys() and backoff here, it has no chance to call // executor.Next() and check the killed flag. diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 065a00e0a1e55..52989752881b9 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -585,6 +585,40 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) { c.Assert(lockInfo.LockTtl-PessimisticLockTTL, Less, uint64(150)) } +// TestAcquireFalseTimeoutLock tests acquiring a key which is a secondary key of another transaction. +// The lock's own TTL is expired but the primary key is still alive due to heartbeats. +func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) { + // k1 is the primary lock of txn1 + k1 := kv.Key("k1") + // k2 is a secondary lock of txn1 and a key txn2 wants to lock + k2 := kv.Key("k2") + + txn1 := s.begin(c) + txn1.SetOption(kv.Pessimistic, true) + // lock the primary key + err := txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k1) + c.Assert(err, IsNil) + // lock the secondary key + err = txn1.LockKeys(context.Background(), nil, txn1.startTS, kv.LockAlwaysWait, time.Now(), k2) + c.Assert(err, IsNil) + + // Heartbeats will increase the TTL of the primary key + + // wait until secondary key exceeds its own TTL + time.Sleep(time.Duration(PessimisticLockTTL) * time.Millisecond) + txn2 := s.begin(c) + txn2.SetOption(kv.Pessimistic, true) + + // test for wait limited time (300ms) + startTime := time.Now() + err = txn2.LockKeys(context.Background(), nil, txn1.startTS, 300, time.Now(), k2) + elapsed := time.Now().Sub(startTime) + // cannot acquire lock in time thus error + c.Assert(err.Error(), Equals, ErrLockWaitTimeout.Error()) + // it should return after about 300ms + c.Assert(elapsed, Less, 350*time.Millisecond) +} + func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { txn := s.begin(c) err := txn.Set(key, key)