From 32660d087f5268ff35f603fb73dd625fc377b2e9 Mon Sep 17 00:00:00 2001 From: feitian124 Date: Sat, 2 Oct 2021 23:03:26 +0800 Subject: [PATCH 1/8] statistics: migrate test-infra to testify for sample_test.go --- statistics/sample_test.go | 414 ++++++++++++++++++++------------------ 1 file changed, 217 insertions(+), 197 deletions(-) diff --git a/statistics/sample_test.go b/statistics/sample_test.go index 7553cd6f3cfae..a3682a1928fc7 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -16,25 +16,35 @@ package statistics import ( "math/rand" + "testing" "time" - . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" ) -var _ = Suite(&testSampleSuite{}) - type testSampleSuite struct { count int rs sqlexec.RecordSet } -func (s *testSampleSuite) SetUpSuite(c *C) { +func TestSample(t *testing.T) { + s := createTestSampleSuite() + t.Run("SubTestCollectColumnStats", SubTestCollectColumnStats(s)) + t.Run("SubTestMergeSampleCollector", SubTestMergeSampleCollector(s)) + t.Run("SubTestCollectorProtoConversion", SubTestCollectorProtoConversion(s)) + t.Run("SubTestWeightedSampling", SubTestWeightedSampling(s)) + t.Run("SubTestDistributedWeightedSampling", SubTestDistributedWeightedSampling(s)) + t.Run("SubTestBuildStatsOnRowSample", SubTestBuildStatsOnRowSample(s)) +} + +func createTestSampleSuite() *testSampleSuite { + s := new(testSampleSuite) s.count = 10000 rs := &recordSet{ data: make([]types.Datum, s.count), @@ -54,87 +64,95 @@ func (s *testSampleSuite) SetUpSuite(c *C) { rs.data[i].SetInt64(rs.data[i].GetInt64() + 2) } s.rs = rs + return s } -func (s *testSampleSuite) TestCollectColumnStats(c *C) { - sc := mock.NewContext().GetSessionVars().StmtCtx - builder := SampleBuilder{ - Sc: sc, - RecordSet: s.rs, - ColLen: 1, - PkBuilder: NewSortedBuilder(sc, 256, 1, types.NewFieldType(mysql.TypeLonglong), Version2), - MaxSampleSize: 10000, - MaxBucketSize: 256, - MaxFMSketchSize: 1000, - CMSketchWidth: 2048, - CMSketchDepth: 8, - Collators: make([]collate.Collator, 1), - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, +func SubTestCollectColumnStats(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + sc := mock.NewContext().GetSessionVars().StmtCtx + builder := SampleBuilder{ + Sc: sc, + RecordSet: s.rs, + ColLen: 1, + PkBuilder: NewSortedBuilder(sc, 256, 1, types.NewFieldType(mysql.TypeLonglong), Version2), + MaxSampleSize: 10000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, + Collators: make([]collate.Collator, 1), + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + } + require.Nil(t, s.rs.Close()) + collectors, pkBuilder, err := builder.CollectColumnStats() + require.NoError(t, err) + + require.Equal(t, int64(s.count), collectors[0].NullCount+collectors[0].Count) + require.Equal(t, int64(6232), collectors[0].FMSketch.NDV()) + require.Equal(t, uint64(collectors[0].Count), collectors[0].CMSketch.TotalCount()) + require.Equal(t, int64(s.count), pkBuilder.Count) + require.Equal(t, int64(s.count), pkBuilder.Hist().NDV) } - c.Assert(s.rs.Close(), IsNil) - collectors, pkBuilder, err := builder.CollectColumnStats() - c.Assert(err, IsNil) - c.Assert(collectors[0].NullCount+collectors[0].Count, Equals, int64(s.count)) - c.Assert(collectors[0].FMSketch.NDV(), Equals, int64(6232)) - c.Assert(collectors[0].CMSketch.TotalCount(), Equals, uint64(collectors[0].Count)) - c.Assert(pkBuilder.Count, Equals, int64(s.count)) - c.Assert(pkBuilder.Hist().NDV, Equals, int64(s.count)) } -func (s *testSampleSuite) TestMergeSampleCollector(c *C) { - builder := SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.rs, - ColLen: 2, - MaxSampleSize: 1000, - MaxBucketSize: 256, - MaxFMSketchSize: 1000, - CMSketchWidth: 2048, - CMSketchDepth: 8, - Collators: make([]collate.Collator, 2), - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, +func SubTestMergeSampleCollector(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + builder := SampleBuilder{ + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.rs, + ColLen: 2, + MaxSampleSize: 1000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, + Collators: make([]collate.Collator, 2), + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, + } + require.Nil(t, s.rs.Close()) + sc := &stmtctx.StatementContext{TimeZone: time.Local} + collectors, pkBuilder, err := builder.CollectColumnStats() + require.NoError(t, err) + require.Nil(t, pkBuilder) + require.Equal(t, 2, len(collectors)) + collectors[0].IsMerger = true + collectors[0].MergeSampleCollector(sc, collectors[1]) + require.Equal(t, int64(9280), collectors[0].FMSketch.NDV()) + require.Equal(t, 1000, len(collectors[0].Samples)) + require.Equal(t, int64(1000), collectors[0].NullCount) + require.Equal(t, int64(19000), collectors[0].Count) + require.Equal(t, uint64(collectors[0].Count), collectors[0].CMSketch.TotalCount()) } - c.Assert(s.rs.Close(), IsNil) - sc := &stmtctx.StatementContext{TimeZone: time.Local} - collectors, pkBuilder, err := builder.CollectColumnStats() - c.Assert(err, IsNil) - c.Assert(pkBuilder, IsNil) - c.Assert(len(collectors), Equals, 2) - collectors[0].IsMerger = true - collectors[0].MergeSampleCollector(sc, collectors[1]) - c.Assert(collectors[0].FMSketch.NDV(), Equals, int64(9280)) - c.Assert(len(collectors[0].Samples), Equals, 1000) - c.Assert(collectors[0].NullCount, Equals, int64(1000)) - c.Assert(collectors[0].Count, Equals, int64(19000)) - c.Assert(collectors[0].CMSketch.TotalCount(), Equals, uint64(collectors[0].Count)) } -func (s *testSampleSuite) TestCollectorProtoConversion(c *C) { - builder := SampleBuilder{ - Sc: mock.NewContext().GetSessionVars().StmtCtx, - RecordSet: s.rs, - ColLen: 2, - MaxSampleSize: 10000, - MaxBucketSize: 256, - MaxFMSketchSize: 1000, - CMSketchWidth: 2048, - CMSketchDepth: 8, - Collators: make([]collate.Collator, 2), - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, - } - c.Assert(s.rs.Close(), IsNil) - collectors, pkBuilder, err := builder.CollectColumnStats() - c.Assert(err, IsNil) - c.Assert(pkBuilder, IsNil) - for _, collector := range collectors { - p := SampleCollectorToProto(collector) - s := SampleCollectorFromProto(p) - c.Assert(collector.Count, Equals, s.Count) - c.Assert(collector.NullCount, Equals, s.NullCount) - c.Assert(collector.CMSketch.TotalCount(), Equals, s.CMSketch.TotalCount()) - c.Assert(collector.FMSketch.NDV(), Equals, s.FMSketch.NDV()) - c.Assert(collector.TotalSize, Equals, s.TotalSize) - c.Assert(len(collector.Samples), Equals, len(s.Samples)) +func SubTestCollectorProtoConversion(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + builder := SampleBuilder{ + Sc: mock.NewContext().GetSessionVars().StmtCtx, + RecordSet: s.rs, + ColLen: 2, + MaxSampleSize: 10000, + MaxBucketSize: 256, + MaxFMSketchSize: 1000, + CMSketchWidth: 2048, + CMSketchDepth: 8, + Collators: make([]collate.Collator, 2), + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, + } + require.Nil(t, s.rs.Close()) + collectors, pkBuilder, err := builder.CollectColumnStats() + require.NoError(t, err) + require.Nil(t, pkBuilder) + for _, collector := range collectors { + p := SampleCollectorToProto(collector) + s := SampleCollectorFromProto(p) + require.Equal(t, s.Count, collector.Count) + require.Equal(t, s.NullCount, collector.NullCount) + require.Equal(t, s.CMSketch.TotalCount(), collector.CMSketch.TotalCount()) + require.Equal(t, s.FMSketch.NDV(), collector.FMSketch.NDV()) + require.Equal(t, s.TotalSize, collector.TotalSize) + require.Equal(t, len(s.Samples), len(collector.Samples)) + } } } @@ -167,63 +185,21 @@ func (s *testSampleSuite) recordSetForDistributedSamplingTest(size, batch int) [ return sets } -func (s *testSampleSuite) TestWeightedSampling(c *C) { - sampleNum := int64(20) - rowNum := 100 - loopCnt := 1000 - rs := s.recordSetForWeightSamplingTest(rowNum) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 0; loopI < loopCnt; loopI++ { - builder := &ReservoirRowSampleBuilder{ - Sc: sc, - RecordSet: rs, - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, - Collators: make([]collate.Collator, 1), - ColGroups: nil, - MaxSampleSize: int(sampleNum), - MaxFMSketchSize: 1000, - Rng: rand.New(rand.NewSource(time.Now().UnixNano())), - } - collector, err := builder.Collect() - c.Assert(err, IsNil) - for i := 0; i < collector.MaxSampleSize; i++ { - a := collector.Samples[i].Columns[0].GetInt64() - itemCnt[a]++ - } - c.Assert(rs.Close(), IsNil) - } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - c.Assert(false, IsTrue, Commentf("The frequency %v is exceed the Chernoff Bound", cnt)) - } - } - // } -} - -func (s *testSampleSuite) TestDistributedWeightedSampling(c *C) { - sampleNum := int64(10) - rowNum := 100 - loopCnt := 1500 - batch := 5 - sets := s.recordSetForDistributedSamplingTest(rowNum, batch) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 1; loopI < loopCnt; loopI++ { - rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) - rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) - for i := 0; i < batch; i++ { +func SubTestWeightedSampling(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + sampleNum := int64(20) + rowNum := 100 + loopCnt := 1000 + rs := s.recordSetForWeightSamplingTest(rowNum) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 0; loopI < loopCnt; loopI++ { builder := &ReservoirRowSampleBuilder{ Sc: sc, - RecordSet: sets[i], + RecordSet: rs, ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, Collators: make([]collate.Collator, 1), ColGroups: nil, @@ -232,77 +208,121 @@ func (s *testSampleSuite) TestDistributedWeightedSampling(c *C) { Rng: rand.New(rand.NewSource(time.Now().UnixNano())), } collector, err := builder.Collect() - c.Assert(err, IsNil) - rootRowCollector.MergeCollector(collector) - c.Assert(sets[i].Close(), IsNil) + require.NoError(t, err) + for i := 0; i < collector.MaxSampleSize; i++ { + a := collector.Samples[i].Columns[0].GetInt64() + itemCnt[a]++ + } + require.Nil(t, rs.Close()) } - for _, sample := range rootRowCollector.Samples { - itemCnt[sample.Columns[0].GetInt64()]++ + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) + } } } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - c.Assert(false, IsTrue, Commentf("the frequency %v is exceed the Chernoff Bound", cnt)) +} + +func SubTestDistributedWeightedSampling(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + sampleNum := int64(10) + rowNum := 100 + loopCnt := 1500 + batch := 5 + sets := s.recordSetForDistributedSamplingTest(rowNum, batch) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 1; loopI < loopCnt; loopI++ { + rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) + rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) + for i := 0; i < batch; i++ { + builder := &ReservoirRowSampleBuilder{ + Sc: sc, + RecordSet: sets[i], + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + Collators: make([]collate.Collator, 1), + ColGroups: nil, + MaxSampleSize: int(sampleNum), + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + collector, err := builder.Collect() + require.NoError(t, err) + rootRowCollector.MergeCollector(collector) + require.Nil(t, sets[i].Close()) + } + for _, sample := range rootRowCollector.Samples { + itemCnt[sample.Columns[0].GetInt64()]++ + } + } + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) + } } } - // } } -func (s *testSampleSuite) TestBuildStatsOnRowSample(c *C) { - ctx := mock.NewContext() - sketch := NewFMSketch(1000) - data := make([]*SampleItem, 0, 8) - for i := 1; i <= 1000; i++ { - d := types.NewIntDatum(int64(i)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - c.Assert(err, IsNil) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 10; i++ { - d := types.NewIntDatum(int64(2)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - c.Assert(err, IsNil) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 7; i++ { - d := types.NewIntDatum(int64(4)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - c.Assert(err, IsNil) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 5; i++ { - d := types.NewIntDatum(int64(7)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - c.Assert(err, IsNil) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 3; i++ { - d := types.NewIntDatum(int64(11)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - c.Assert(err, IsNil) - data = append(data, &SampleItem{Value: d}) - } - collector := &SampleCollector{ - Samples: data, - NullCount: 0, - Count: int64(len(data)), - FMSketch: sketch, - TotalSize: int64(len(data)) * 8, +func SubTestBuildStatsOnRowSample(s *testSampleSuite) func(*testing.T) { + return func(t *testing.T) { + ctx := mock.NewContext() + sketch := NewFMSketch(1000) + data := make([]*SampleItem, 0, 8) + for i := 1; i <= 1000; i++ { + d := types.NewIntDatum(int64(i)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 10; i++ { + d := types.NewIntDatum(int64(2)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 7; i++ { + d := types.NewIntDatum(int64(4)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 5; i++ { + d := types.NewIntDatum(int64(7)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 3; i++ { + d := types.NewIntDatum(int64(11)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + collector := &SampleCollector{ + Samples: data, + NullCount: 0, + Count: int64(len(data)), + FMSketch: sketch, + TotalSize: int64(len(data)) * 8, + } + tp := types.NewFieldType(mysql.TypeLonglong) + hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) + require.Nilf(t, err, "%+v", err) + topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) + require.NoError(t, err) + require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) + require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ + "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ + "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) } - tp := types.NewFieldType(mysql.TypeLonglong) - hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) - c.Assert(err, IsNil, Commentf("%+v", err)) - topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) - c.Assert(err, IsNil) - c.Assert(topNStr, Equals, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}") - c.Assert(hist.ToString(0), Equals, "column:1 ndv:1000 totColSize:8168\n"+ - "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ - "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", - ) - } From 931dc0b8b865d8b2c1c7d16d82fafa1a24c138f5 Mon Sep 17 00:00:00 2001 From: feitian124 Date: Sat, 2 Oct 2021 23:08:48 +0800 Subject: [PATCH 2/8] rename --- statistics/{sample_test.go => sample_serial_test.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename statistics/{sample_test.go => sample_serial_test.go} (100%) diff --git a/statistics/sample_test.go b/statistics/sample_serial_test.go similarity index 100% rename from statistics/sample_test.go rename to statistics/sample_serial_test.go From b44adde3959192bab095774fb04f74357cd4bd5f Mon Sep 17 00:00:00 2001 From: feitian124 Date: Tue, 5 Oct 2021 08:59:15 +0800 Subject: [PATCH 3/8] Update statistics/sample_serial_test.go Co-authored-by: Ping Yu --- statistics/sample_serial_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statistics/sample_serial_test.go b/statistics/sample_serial_test.go index a3682a1928fc7..c04b2f75e2e79 100644 --- a/statistics/sample_serial_test.go +++ b/statistics/sample_serial_test.go @@ -118,7 +118,7 @@ func SubTestMergeSampleCollector(s *testSampleSuite) func(*testing.T) { collectors[0].IsMerger = true collectors[0].MergeSampleCollector(sc, collectors[1]) require.Equal(t, int64(9280), collectors[0].FMSketch.NDV()) - require.Equal(t, 1000, len(collectors[0].Samples)) + require.Len(t, collectors[0].Samples, 1000) require.Equal(t, int64(1000), collectors[0].NullCount) require.Equal(t, int64(19000), collectors[0].Count) require.Equal(t, uint64(collectors[0].Count), collectors[0].CMSketch.TotalCount()) From 6d1918392765d562496ecf301b30021a38d657eb Mon Sep 17 00:00:00 2001 From: feitian124 Date: Tue, 5 Oct 2021 09:00:14 +0800 Subject: [PATCH 4/8] Update statistics/sample_serial_test.go Co-authored-by: Ping Yu --- statistics/sample_serial_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statistics/sample_serial_test.go b/statistics/sample_serial_test.go index c04b2f75e2e79..09579db0192f1 100644 --- a/statistics/sample_serial_test.go +++ b/statistics/sample_serial_test.go @@ -114,7 +114,7 @@ func SubTestMergeSampleCollector(s *testSampleSuite) func(*testing.T) { collectors, pkBuilder, err := builder.CollectColumnStats() require.NoError(t, err) require.Nil(t, pkBuilder) - require.Equal(t, 2, len(collectors)) + require.Len(t, collectors, 2) collectors[0].IsMerger = true collectors[0].MergeSampleCollector(sc, collectors[1]) require.Equal(t, int64(9280), collectors[0].FMSketch.NDV()) From 3995d76968561ea75bcf4adaec94e23c9263008e Mon Sep 17 00:00:00 2001 From: feitian124 Date: Tue, 5 Oct 2021 20:59:33 +0800 Subject: [PATCH 5/8] mv 3 tests to separate parallal test file as they do not depend on testSampleSuite --- statistics/sample_serial_test.go | 177 +------------------------- statistics/sample_test.go | 207 +++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 176 deletions(-) create mode 100644 statistics/sample_test.go diff --git a/statistics/sample_serial_test.go b/statistics/sample_serial_test.go index 09579db0192f1..94f30b11f32f4 100644 --- a/statistics/sample_serial_test.go +++ b/statistics/sample_serial_test.go @@ -15,7 +15,6 @@ package statistics import ( - "math/rand" "testing" "time" @@ -33,14 +32,11 @@ type testSampleSuite struct { rs sqlexec.RecordSet } -func TestSample(t *testing.T) { +func TestSampleSerial(t *testing.T) { s := createTestSampleSuite() t.Run("SubTestCollectColumnStats", SubTestCollectColumnStats(s)) t.Run("SubTestMergeSampleCollector", SubTestMergeSampleCollector(s)) t.Run("SubTestCollectorProtoConversion", SubTestCollectorProtoConversion(s)) - t.Run("SubTestWeightedSampling", SubTestWeightedSampling(s)) - t.Run("SubTestDistributedWeightedSampling", SubTestDistributedWeightedSampling(s)) - t.Run("SubTestBuildStatsOnRowSample", SubTestBuildStatsOnRowSample(s)) } func createTestSampleSuite() *testSampleSuite { @@ -155,174 +151,3 @@ func SubTestCollectorProtoConversion(s *testSampleSuite) func(*testing.T) { } } } - -func (s *testSampleSuite) recordSetForWeightSamplingTest(size int) *recordSet { - r := &recordSet{ - data: make([]types.Datum, 0, size), - count: size, - } - for i := 0; i < size; i++ { - r.data = append(r.data, types.NewIntDatum(int64(i))) - } - r.setFields(mysql.TypeLonglong) - return r -} - -func (s *testSampleSuite) recordSetForDistributedSamplingTest(size, batch int) []*recordSet { - sets := make([]*recordSet, 0, batch) - batchSize := size / batch - for i := 0; i < batch; i++ { - r := &recordSet{ - data: make([]types.Datum, 0, batchSize), - count: batchSize, - } - for j := 0; j < size/batch; j++ { - r.data = append(r.data, types.NewIntDatum(int64(j+batchSize*i))) - } - r.setFields(mysql.TypeLonglong) - sets = append(sets, r) - } - return sets -} - -func SubTestWeightedSampling(s *testSampleSuite) func(*testing.T) { - return func(t *testing.T) { - sampleNum := int64(20) - rowNum := 100 - loopCnt := 1000 - rs := s.recordSetForWeightSamplingTest(rowNum) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 0; loopI < loopCnt; loopI++ { - builder := &ReservoirRowSampleBuilder{ - Sc: sc, - RecordSet: rs, - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, - Collators: make([]collate.Collator, 1), - ColGroups: nil, - MaxSampleSize: int(sampleNum), - MaxFMSketchSize: 1000, - Rng: rand.New(rand.NewSource(time.Now().UnixNano())), - } - collector, err := builder.Collect() - require.NoError(t, err) - for i := 0; i < collector.MaxSampleSize; i++ { - a := collector.Samples[i].Columns[0].GetInt64() - itemCnt[a]++ - } - require.Nil(t, rs.Close()) - } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) - } - } - } -} - -func SubTestDistributedWeightedSampling(s *testSampleSuite) func(*testing.T) { - return func(t *testing.T) { - sampleNum := int64(10) - rowNum := 100 - loopCnt := 1500 - batch := 5 - sets := s.recordSetForDistributedSamplingTest(rowNum, batch) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 1; loopI < loopCnt; loopI++ { - rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) - rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) - for i := 0; i < batch; i++ { - builder := &ReservoirRowSampleBuilder{ - Sc: sc, - RecordSet: sets[i], - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, - Collators: make([]collate.Collator, 1), - ColGroups: nil, - MaxSampleSize: int(sampleNum), - MaxFMSketchSize: 1000, - Rng: rand.New(rand.NewSource(time.Now().UnixNano())), - } - collector, err := builder.Collect() - require.NoError(t, err) - rootRowCollector.MergeCollector(collector) - require.Nil(t, sets[i].Close()) - } - for _, sample := range rootRowCollector.Samples { - itemCnt[sample.Columns[0].GetInt64()]++ - } - } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) - } - } - } -} - -func SubTestBuildStatsOnRowSample(s *testSampleSuite) func(*testing.T) { - return func(t *testing.T) { - ctx := mock.NewContext() - sketch := NewFMSketch(1000) - data := make([]*SampleItem, 0, 8) - for i := 1; i <= 1000; i++ { - d := types.NewIntDatum(int64(i)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 10; i++ { - d := types.NewIntDatum(int64(2)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 7; i++ { - d := types.NewIntDatum(int64(4)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 5; i++ { - d := types.NewIntDatum(int64(7)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 3; i++ { - d := types.NewIntDatum(int64(11)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - collector := &SampleCollector{ - Samples: data, - NullCount: 0, - Count: int64(len(data)), - FMSketch: sketch, - TotalSize: int64(len(data)) * 8, - } - tp := types.NewFieldType(mysql.TypeLonglong) - hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) - require.Nilf(t, err, "%+v", err) - topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) - require.NoError(t, err) - require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) - require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ - "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ - "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) - } -} diff --git a/statistics/sample_test.go b/statistics/sample_test.go new file mode 100644 index 0000000000000..f9052fb4c67ea --- /dev/null +++ b/statistics/sample_test.go @@ -0,0 +1,207 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "math/rand" + "testing" + "time" + + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/mock" + "github.com/stretchr/testify/require" +) + +func TestSample(t *testing.T) { + t.Run("SubTestWeightedSampling", SubTestWeightedSampling()) + t.Run("SubTestDistributedWeightedSampling", SubTestDistributedWeightedSampling()) + t.Run("SubTestBuildStatsOnRowSample", SubTestBuildStatsOnRowSample()) +} + +func recordSetForWeightSamplingTest(size int) *recordSet { + r := &recordSet{ + data: make([]types.Datum, 0, size), + count: size, + } + for i := 0; i < size; i++ { + r.data = append(r.data, types.NewIntDatum(int64(i))) + } + r.setFields(mysql.TypeLonglong) + return r +} + +func recordSetForDistributedSamplingTest(size, batch int) []*recordSet { + sets := make([]*recordSet, 0, batch) + batchSize := size / batch + for i := 0; i < batch; i++ { + r := &recordSet{ + data: make([]types.Datum, 0, batchSize), + count: batchSize, + } + for j := 0; j < size/batch; j++ { + r.data = append(r.data, types.NewIntDatum(int64(j+batchSize*i))) + } + r.setFields(mysql.TypeLonglong) + sets = append(sets, r) + } + return sets +} + +func SubTestWeightedSampling() func(*testing.T) { + return func(t *testing.T) { + t.Parallel() + sampleNum := int64(20) + rowNum := 100 + loopCnt := 1000 + rs := recordSetForWeightSamplingTest(rowNum) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 0; loopI < loopCnt; loopI++ { + builder := &ReservoirRowSampleBuilder{ + Sc: sc, + RecordSet: rs, + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + Collators: make([]collate.Collator, 1), + ColGroups: nil, + MaxSampleSize: int(sampleNum), + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + collector, err := builder.Collect() + require.NoError(t, err) + for i := 0; i < collector.MaxSampleSize; i++ { + a := collector.Samples[i].Columns[0].GetInt64() + itemCnt[a]++ + } + require.Nil(t, rs.Close()) + } + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) + } + } + } +} + +func SubTestDistributedWeightedSampling() func(*testing.T) { + return func(t *testing.T) { + t.Parallel() + sampleNum := int64(10) + rowNum := 100 + loopCnt := 1500 + batch := 5 + sets := recordSetForDistributedSamplingTest(rowNum, batch) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 1; loopI < loopCnt; loopI++ { + rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) + rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) + for i := 0; i < batch; i++ { + builder := &ReservoirRowSampleBuilder{ + Sc: sc, + RecordSet: sets[i], + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + Collators: make([]collate.Collator, 1), + ColGroups: nil, + MaxSampleSize: int(sampleNum), + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + collector, err := builder.Collect() + require.NoError(t, err) + rootRowCollector.MergeCollector(collector) + require.Nil(t, sets[i].Close()) + } + for _, sample := range rootRowCollector.Samples { + itemCnt[sample.Columns[0].GetInt64()]++ + } + } + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) + } + } + } +} + +func SubTestBuildStatsOnRowSample() func(*testing.T) { + return func(t *testing.T) { + t.Parallel() + ctx := mock.NewContext() + sketch := NewFMSketch(1000) + data := make([]*SampleItem, 0, 8) + for i := 1; i <= 1000; i++ { + d := types.NewIntDatum(int64(i)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 10; i++ { + d := types.NewIntDatum(int64(2)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 7; i++ { + d := types.NewIntDatum(int64(4)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 5; i++ { + d := types.NewIntDatum(int64(7)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 3; i++ { + d := types.NewIntDatum(int64(11)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + collector := &SampleCollector{ + Samples: data, + NullCount: 0, + Count: int64(len(data)), + FMSketch: sketch, + TotalSize: int64(len(data)) * 8, + } + tp := types.NewFieldType(mysql.TypeLonglong) + hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) + require.Nilf(t, err, "%+v", err) + topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) + require.NoError(t, err) + require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) + require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ + "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ + "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) + } +} From 96bf2ea126e61cecaa4add0e47e7ab2b9c21c2e9 Mon Sep 17 00:00:00 2001 From: feitian124 Date: Tue, 5 Oct 2021 21:31:42 +0800 Subject: [PATCH 6/8] use require.InEpsilonf --- statistics/sample_test.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/statistics/sample_test.go b/statistics/sample_test.go index f9052fb4c67ea..a32600ba84c46 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -93,11 +93,8 @@ func SubTestWeightedSampling() func(*testing.T) { require.Nil(t, rs.Close()) } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) - } + require.InEpsilonf(t, expFrequency, float64(cnt), 0.5, "The frequency %v is exceed the Chernoff Bound", cnt) } } } @@ -139,11 +136,8 @@ func SubTestDistributedWeightedSampling() func(*testing.T) { } } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) - } + require.InEpsilonf(t, expFrequency, float64(cnt), 0.5, "The frequency %v is exceed the Chernoff Bound", cnt) } } } From b39f8d0e267f197b0401a025567f1af4b0092ba9 Mon Sep 17 00:00:00 2001 From: feitian124 Date: Tue, 5 Oct 2021 23:03:17 +0800 Subject: [PATCH 7/8] Revert "use require.InEpsilonf" This reverts commit 96bf2ea126e61cecaa4add0e47e7ab2b9c21c2e9. --- statistics/sample_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/statistics/sample_test.go b/statistics/sample_test.go index a32600ba84c46..f9052fb4c67ea 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -93,8 +93,11 @@ func SubTestWeightedSampling() func(*testing.T) { require.Nil(t, rs.Close()) } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 for _, cnt := range itemCnt { - require.InEpsilonf(t, expFrequency, float64(cnt), 0.5, "The frequency %v is exceed the Chernoff Bound", cnt) + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) + } } } } @@ -136,8 +139,11 @@ func SubTestDistributedWeightedSampling() func(*testing.T) { } } expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 for _, cnt := range itemCnt { - require.InEpsilonf(t, expFrequency, float64(cnt), 0.5, "The frequency %v is exceed the Chernoff Bound", cnt) + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) + } } } } From d9c4ff4cda389be8f70149dc509c4450b3342584 Mon Sep 17 00:00:00 2001 From: feitian124 Date: Thu, 7 Oct 2021 11:13:39 +0800 Subject: [PATCH 8/8] do not use sub test --- statistics/sample_test.go | 246 ++++++++++++++++++-------------------- 1 file changed, 117 insertions(+), 129 deletions(-) diff --git a/statistics/sample_test.go b/statistics/sample_test.go index f9052fb4c67ea..082d7d8016e83 100644 --- a/statistics/sample_test.go +++ b/statistics/sample_test.go @@ -26,12 +26,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestSample(t *testing.T) { - t.Run("SubTestWeightedSampling", SubTestWeightedSampling()) - t.Run("SubTestDistributedWeightedSampling", SubTestDistributedWeightedSampling()) - t.Run("SubTestBuildStatsOnRowSample", SubTestBuildStatsOnRowSample()) -} - func recordSetForWeightSamplingTest(size int) *recordSet { r := &recordSet{ data: make([]types.Datum, 0, size), @@ -61,22 +55,64 @@ func recordSetForDistributedSamplingTest(size, batch int) []*recordSet { return sets } -func SubTestWeightedSampling() func(*testing.T) { - return func(t *testing.T) { - t.Parallel() - sampleNum := int64(20) - rowNum := 100 - loopCnt := 1000 - rs := recordSetForWeightSamplingTest(rowNum) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 0; loopI < loopCnt; loopI++ { +func TestWeightedSampling(t *testing.T) { + t.Parallel() + sampleNum := int64(20) + rowNum := 100 + loopCnt := 1000 + rs := recordSetForWeightSamplingTest(rowNum) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 0; loopI < loopCnt; loopI++ { + builder := &ReservoirRowSampleBuilder{ + Sc: sc, + RecordSet: rs, + ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, + Collators: make([]collate.Collator, 1), + ColGroups: nil, + MaxSampleSize: int(sampleNum), + MaxFMSketchSize: 1000, + Rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + collector, err := builder.Collect() + require.NoError(t, err) + for i := 0; i < collector.MaxSampleSize; i++ { + a := collector.Samples[i].Columns[0].GetInt64() + itemCnt[a]++ + } + require.Nil(t, rs.Close()) + } + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) + } + } +} + +func TestDistributedWeightedSampling(t *testing.T) { + t.Parallel() + sampleNum := int64(10) + rowNum := 100 + loopCnt := 1500 + batch := 5 + sets := recordSetForDistributedSamplingTest(rowNum, batch) + sc := mock.NewContext().GetSessionVars().StmtCtx + // The loop which is commented out is used for stability test. + // This test can run 800 times in a row without any failure. + // for x := 0; x < 800; x++ { + itemCnt := make([]int, rowNum) + for loopI := 1; loopI < loopCnt; loopI++ { + rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) + rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) + for i := 0; i < batch; i++ { builder := &ReservoirRowSampleBuilder{ Sc: sc, - RecordSet: rs, + RecordSet: sets[i], ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, Collators: make([]collate.Collator, 1), ColGroups: nil, @@ -86,122 +122,74 @@ func SubTestWeightedSampling() func(*testing.T) { } collector, err := builder.Collect() require.NoError(t, err) - for i := 0; i < collector.MaxSampleSize; i++ { - a := collector.Samples[i].Columns[0].GetInt64() - itemCnt[a]++ - } - require.Nil(t, rs.Close()) + rootRowCollector.MergeCollector(collector) + require.Nil(t, sets[i].Close()) } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "The frequency %v is exceed the Chernoff Bound", cnt) - } + for _, sample := range rootRowCollector.Samples { + itemCnt[sample.Columns[0].GetInt64()]++ } } -} - -func SubTestDistributedWeightedSampling() func(*testing.T) { - return func(t *testing.T) { - t.Parallel() - sampleNum := int64(10) - rowNum := 100 - loopCnt := 1500 - batch := 5 - sets := recordSetForDistributedSamplingTest(rowNum, batch) - sc := mock.NewContext().GetSessionVars().StmtCtx - // The loop which is commented out is used for stability test. - // This test can run 800 times in a row without any failure. - // for x := 0; x < 800; x++ { - itemCnt := make([]int, rowNum) - for loopI := 1; loopI < loopCnt; loopI++ { - rootRowCollector := NewReservoirRowSampleCollector(int(sampleNum), 1) - rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, NewFMSketch(1000)) - for i := 0; i < batch; i++ { - builder := &ReservoirRowSampleBuilder{ - Sc: sc, - RecordSet: sets[i], - ColsFieldType: []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, - Collators: make([]collate.Collator, 1), - ColGroups: nil, - MaxSampleSize: int(sampleNum), - MaxFMSketchSize: 1000, - Rng: rand.New(rand.NewSource(time.Now().UnixNano())), - } - collector, err := builder.Collect() - require.NoError(t, err) - rootRowCollector.MergeCollector(collector) - require.Nil(t, sets[i].Close()) - } - for _, sample := range rootRowCollector.Samples { - itemCnt[sample.Columns[0].GetInt64()]++ - } - } - expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) - delta := 0.5 - for _, cnt := range itemCnt { - if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { - require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) - } + expFrequency := float64(sampleNum) * float64(loopCnt) / float64(rowNum) + delta := 0.5 + for _, cnt := range itemCnt { + if float64(cnt) < expFrequency/(1+delta) || float64(cnt) > expFrequency*(1+delta) { + require.Truef(t, false, "the frequency %v is exceed the Chernoff Bound", cnt) } } } -func SubTestBuildStatsOnRowSample() func(*testing.T) { - return func(t *testing.T) { - t.Parallel() - ctx := mock.NewContext() - sketch := NewFMSketch(1000) - data := make([]*SampleItem, 0, 8) - for i := 1; i <= 1000; i++ { - d := types.NewIntDatum(int64(i)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 10; i++ { - d := types.NewIntDatum(int64(2)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 7; i++ { - d := types.NewIntDatum(int64(4)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 5; i++ { - d := types.NewIntDatum(int64(7)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - for i := 1; i < 3; i++ { - d := types.NewIntDatum(int64(11)) - err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) - require.NoError(t, err) - data = append(data, &SampleItem{Value: d}) - } - collector := &SampleCollector{ - Samples: data, - NullCount: 0, - Count: int64(len(data)), - FMSketch: sketch, - TotalSize: int64(len(data)) * 8, - } - tp := types.NewFieldType(mysql.TypeLonglong) - hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) - require.Nilf(t, err, "%+v", err) - topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) +func TestBuildStatsOnRowSample(t *testing.T) { + t.Parallel() + ctx := mock.NewContext() + sketch := NewFMSketch(1000) + data := make([]*SampleItem, 0, 8) + for i := 1; i <= 1000; i++ { + d := types.NewIntDatum(int64(i)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) require.NoError(t, err) - require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) - require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ - "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ - "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ - "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 10; i++ { + d := types.NewIntDatum(int64(2)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 7; i++ { + d := types.NewIntDatum(int64(4)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 5; i++ { + d := types.NewIntDatum(int64(7)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + for i := 1; i < 3; i++ { + d := types.NewIntDatum(int64(11)) + err := sketch.InsertValue(ctx.GetSessionVars().StmtCtx, d) + require.NoError(t, err) + data = append(data, &SampleItem{Value: d}) + } + collector := &SampleCollector{ + Samples: data, + NullCount: 0, + Count: int64(len(data)), + FMSketch: sketch, + TotalSize: int64(len(data)) * 8, } + tp := types.NewFieldType(mysql.TypeLonglong) + hist, topN, err := BuildHistAndTopN(ctx, 5, 4, 1, collector, tp, true) + require.Nilf(t, err, "%+v", err) + topNStr, err := topN.DecodedString(ctx, []byte{tp.Tp}) + require.NoError(t, err) + require.Equal(t, "TopN{length: 4, [(2, 10), (4, 7), (7, 5), (11, 3)]}", topNStr) + require.Equal(t, "column:1 ndv:1000 totColSize:8168\n"+ + "num: 200 lower_bound: 1 upper_bound: 204 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 205 upper_bound: 404 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 405 upper_bound: 604 repeats: 1 ndv: 0\n"+ + "num: 200 lower_bound: 605 upper_bound: 804 repeats: 1 ndv: 0\n"+ + "num: 196 lower_bound: 805 upper_bound: 1000 repeats: 1 ndv: 0", hist.ToString(0)) }