Skip to content

Commit

Permalink
ddl: remove unused copReqSenderPool and related structures (#55302)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
tangenta authored Aug 8, 2024
1 parent 0cfa66f commit cf2c703
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 340 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ go_test(
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/operator",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/errctx",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func generateNonPartitionPlan(
return true, nil
}

regionBatch := calculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)
regionBatch := CalculateRegionBatch(len(recordRegionMetas), instanceCnt, !useCloud)

for i := 0; i < len(recordRegionMetas); i += regionBatch {
end := i + regionBatch
Expand Down Expand Up @@ -329,7 +329,8 @@ func generateNonPartitionPlan(
return subTaskMetas, nil
}

func calculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool) int {
// CalculateRegionBatch is exported for test.
func CalculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool) int {
failpoint.Inject("mockRegionBatch", func(val failpoint.Value) {
failpoint.Return(val.(int))
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {

func TestCalculateRegionBatch(t *testing.T) {
// Test calculate in cloud storage.
batchCnt := ddl.CalculateRegionBatchForTest(100, 8, false)
batchCnt := ddl.CalculateRegionBatch(100, 8, false)
require.Equal(t, 13, batchCnt)
batchCnt = ddl.CalculateRegionBatchForTest(2, 8, false)
batchCnt = ddl.CalculateRegionBatch(2, 8, false)
require.Equal(t, 1, batchCnt)
batchCnt = ddl.CalculateRegionBatchForTest(8, 8, false)
batchCnt = ddl.CalculateRegionBatch(8, 8, false)
require.Equal(t, 1, batchCnt)

// Test calculate in local storage.
batchCnt = ddl.CalculateRegionBatchForTest(100, 8, true)
batchCnt = ddl.CalculateRegionBatch(100, 8, true)
require.Equal(t, 13, batchCnt)
batchCnt = ddl.CalculateRegionBatchForTest(2, 8, true)
batchCnt = ddl.CalculateRegionBatch(2, 8, true)
require.Equal(t, 1, batchCnt)
batchCnt = ddl.CalculateRegionBatchForTest(24, 8, true)
batchCnt = ddl.CalculateRegionBatch(24, 8, true)
require.Equal(t, 3, batchCnt)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func BenchmarkExtractDatumByOffsets(b *testing.B) {
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(b, err)
copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
copChunk, err := FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10)
require.NoError(b, err)
require.NoError(b, txn.Rollback())

Expand All @@ -66,7 +66,7 @@ func BenchmarkExtractDatumByOffsets(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
ddl.ExtractDatumByOffsetsForTest(tk.Session().GetExprCtx().GetEvalCtx(), row, offsets, c.ExprColumnInfos, handleDataBuf)
ddl.ExtractDatumByOffsets(tk.Session().GetExprCtx().GetEvalCtx(), row, offsets, c.ExprColumnInfos, handleDataBuf)
}
}

Expand Down
70 changes: 32 additions & 38 deletions pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,74 +12,68 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl
package ddl_test

import (
"context"
"time"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/mock"
)

type resultChanForTest struct {
ch chan IndexRecordChunk
}

func (r *resultChanForTest) AddTask(rs IndexRecordChunk) {
r.ch <- rs
}

func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, endKey kv.Key, store kv.Storage,
batchSize int) *chunk.Chunk {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
physicalTable: tbl,
}
taskCh := make(chan *reorgBackfillTask, 5)
resultCh := make(chan IndexRecordChunk, 5)
batchSize int) (*chunk.Chunk, error) {
resPool := pools.NewResourcePool(func() (pools.Resource, error) {
ctx := mock.NewContext()
ctx.Store = store
return ctx, nil
}, 8, 8, 0)
sessPool := session.NewSessionPool(resPool)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, sessPool, nil)
pool.chunkSender = &resultChanForTest{ch: resultCh}
pool.adjustSize(1)
pool.tasksCh <- task
rs := <-resultCh
close(taskCh)
pool.close(false)
sessPool.Close()
return rs.Chunk
srcChkPool := make(chan *chunk.Chunk, 10)
for i := 0; i < 10; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
}
opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1)
src := testutil.NewOperatorTestSource(ddl.TableScanTask{1, startKey, endKey})
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil)
sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]()

operator.Compose[ddl.TableScanTask](src, scanOp)
operator.Compose[ddl.IndexRecordChunk](scanOp, sink)

pipeline := operator.NewAsyncPipeline(src, scanOp, sink)
err := pipeline.Execute()
if err != nil {
return nil, err
}
err = pipeline.Close()
if err != nil {
return nil, err
}

results := sink.Collect()
return results[0].Chunk, nil
}

func ConvertRowToHandleAndIndexDatum(
ctx expression.EvalContext,
handleDataBuf, idxDataBuf []types.Datum,
row chunk.Row, copCtx copr.CopContext, idxID int64) (kv.Handle, []types.Datum, error) {
c := copCtx.GetBase()
idxData := extractDatumByOffsets(ctx, row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := extractDatumByOffsets(ctx, row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
idxData := ddl.ExtractDatumByOffsets(ctx, row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
handleData := ddl.ExtractDatumByOffsets(ctx, row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handle, err := ddl.BuildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, time.Local, errctx.StrictNoWarningContext)
return handle, idxData, err
}

// ExtractDatumByOffsetsForTest is used for test.
var ExtractDatumByOffsetsForTest = extractDatumByOffsets

// CalculateRegionBatchForTest is used for test.
var CalculateRegionBatchForTest = calculateRegionBatch
6 changes: 3 additions & 3 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,20 +1710,20 @@ func writeChunkToLocal(
restoreDataBuf = make([]types.Datum, len(c.HandleOutputOffsets))
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handleDataBuf := extractDatumByOffsets(ectx, row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
handleDataBuf := ExtractDatumByOffsets(ectx, row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf)
if restore {
// restoreDataBuf should not truncate index values.
for i, datum := range handleDataBuf {
restoreDataBuf[i] = *datum.Clone()
}
}
h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx)
h, err := BuildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, loc, errCtx)
if err != nil {
return 0, nil, errors.Trace(err)
}
for i, index := range indexes {
idxID := index.Meta().ID
idxDataBuf = extractDatumByOffsets(ectx,
idxDataBuf = ExtractDatumByOffsets(ectx,
row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf)
idxData := idxDataBuf[:len(index.Meta().Columns)]
var rsData []types.Datum
Expand Down
Loading

0 comments on commit cf2c703

Please sign in to comment.