diff --git a/.codecov.yml b/.codecov.yml index e5a5e37e24f..ce3ecdb7c09 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -37,3 +37,4 @@ ignore: - "*.toml" - "*.md" - "docs/.*" + - "testing_utils/.*" diff --git a/Makefile b/Makefile index 46ac48b374e..ec6f14ce9a8 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ MAC := "Darwin" PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration' PACKAGES := $$($(PACKAGE_LIST)) PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||' -FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor') +FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'kv_gen') CDC_PKG := github.com/pingcap/ticdc FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done) FAILPOINT := bin/failpoint-ctl @@ -105,6 +105,8 @@ integration_test_build: check_failpoint_ctl -coverpkg=github.com/pingcap/ticdc/... \ -o bin/cdc.test github.com/pingcap/ticdc \ || { $(FAILPOINT_DISABLE); exit 1; } + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go \ + || { $(FAILPOINT_DISABLE); exit 1; } $(FAILPOINT_DISABLE) integration_test: integration_test_mysql @@ -143,8 +145,8 @@ check: check-copyright fmt lint check-static tidy errdoc coverage: GO111MODULE=off go get github.com/wadey/gocovmerge - gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" - grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out" + gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" + grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out" ifeq ("$(JenkinsCI)", "1") GO111MODULE=off go get github.com/mattn/goveralls @goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN) @@ -156,7 +158,7 @@ else endif check-static: tools/bin/golangci-lint - tools/bin/golangci-lint run --timeout 10m0s + tools/bin/golangci-lint run --timeout 10m0s --skip-files kv_gen clean: go clean -i ./... diff --git a/cdc/metrics.go b/cdc/metrics.go index a9863f87886..b36944682e1 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -14,6 +14,7 @@ package cdc import ( + "github.com/pingcap/ticdc/cdc/puller/sorter" "time" "github.com/pingcap/ticdc/cdc/entry" @@ -37,5 +38,6 @@ func init() { puller.InitMetrics(registry) sink.InitMetrics(registry) entry.InitMetrics(registry) + sorter.InitMetrics(registry) initProcessorMetrics(registry) } diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 2043ce66c5a..7f50659bd43 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -36,6 +36,7 @@ type SortEngine string const ( SortInMemory SortEngine = "memory" SortInFile SortEngine = "file" + SortUnified SortEngine = "unified" ) // FeedState represents the running state of a changefeed diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 88f40947736..e9aef629cfa 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate msgp + package model import ( @@ -32,6 +34,7 @@ const ( // RegionFeedEvent from the kv layer. // Only one of the event will be setted. +//msgp:ignore RegionFeedEvent type RegionFeedEvent struct { Val *RawKVEntry Resolved *ResolvedSpan @@ -53,6 +56,7 @@ func (e *RegionFeedEvent) GetValue() interface{} { // ResolvedSpan guarantees all the KV value event // with commit ts less than ResolvedTs has been emitted. +//msgp:ignore ResolvedSpan type ResolvedSpan struct { Span regionspan.ComparableSpan ResolvedTs uint64 @@ -60,18 +64,18 @@ type ResolvedSpan struct { // RawKVEntry notify the KV operator type RawKVEntry struct { - OpType OpType - Key []byte + OpType OpType `msg:"op_type"` + Key []byte `msg:"key"` // nil for delete type - Value []byte + Value []byte `msg:"value"` // nil for insert type - OldValue []byte - StartTs uint64 + OldValue []byte `msg:"old_value"` + StartTs uint64 `msg:"start_ts"` // Commit or resolved TS - CRTs uint64 + CRTs uint64 `msg:"crts"` // Additonal debug info - RegionID uint64 + RegionID uint64 `msg:"region_id"` } func (v *RawKVEntry) String() string { diff --git a/cdc/model/kv_gen.go b/cdc/model/kv_gen.go new file mode 100644 index 00000000000..d78e50d3604 --- /dev/null +++ b/cdc/model/kv_gen.go @@ -0,0 +1,320 @@ +package model + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *OpType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 int + zb0001, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = OpType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z OpType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteInt(int(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z OpType) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendInt(o, int(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *OpType) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 int + zb0001, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = OpType(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z OpType) Msgsize() (s int) { + s = msgp.IntSize + return +} + +// DecodeMsg implements msgp.Decodable +func (z *RawKVEntry) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "op_type": + { + var zb0002 int + zb0002, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "OpType") + return + } + z.OpType = OpType(zb0002) + } + case "key": + z.Key, err = dc.ReadBytes(z.Key) + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + case "value": + z.Value, err = dc.ReadBytes(z.Value) + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + case "old_value": + z.OldValue, err = dc.ReadBytes(z.OldValue) + if err != nil { + err = msgp.WrapError(err, "OldValue") + return + } + case "start_ts": + z.StartTs, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "StartTs") + return + } + case "crts": + z.CRTs, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "CRTs") + return + } + case "region_id": + z.RegionID, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "RegionID") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *RawKVEntry) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 7 + // write "op_type" + err = en.Append(0x87, 0xa7, 0x6f, 0x70, 0x5f, 0x74, 0x79, 0x70, 0x65) + if err != nil { + return + } + err = en.WriteInt(int(z.OpType)) + if err != nil { + err = msgp.WrapError(err, "OpType") + return + } + // write "key" + err = en.Append(0xa3, 0x6b, 0x65, 0x79) + if err != nil { + return + } + err = en.WriteBytes(z.Key) + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + // write "value" + err = en.Append(0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteBytes(z.Value) + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + // write "old_value" + err = en.Append(0xa9, 0x6f, 0x6c, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteBytes(z.OldValue) + if err != nil { + err = msgp.WrapError(err, "OldValue") + return + } + // write "start_ts" + err = en.Append(0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.StartTs) + if err != nil { + err = msgp.WrapError(err, "StartTs") + return + } + // write "crts" + err = en.Append(0xa4, 0x63, 0x72, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.CRTs) + if err != nil { + err = msgp.WrapError(err, "CRTs") + return + } + // write "region_id" + err = en.Append(0xa9, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.RegionID) + if err != nil { + err = msgp.WrapError(err, "RegionID") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *RawKVEntry) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 7 + // string "op_type" + o = append(o, 0x87, 0xa7, 0x6f, 0x70, 0x5f, 0x74, 0x79, 0x70, 0x65) + o = msgp.AppendInt(o, int(z.OpType)) + // string "key" + o = append(o, 0xa3, 0x6b, 0x65, 0x79) + o = msgp.AppendBytes(o, z.Key) + // string "value" + o = append(o, 0xa5, 0x76, 0x61, 0x6c, 0x75, 0x65) + o = msgp.AppendBytes(o, z.Value) + // string "old_value" + o = append(o, 0xa9, 0x6f, 0x6c, 0x64, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + o = msgp.AppendBytes(o, z.OldValue) + // string "start_ts" + o = append(o, 0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x73) + o = msgp.AppendUint64(o, z.StartTs) + // string "crts" + o = append(o, 0xa4, 0x63, 0x72, 0x74, 0x73) + o = msgp.AppendUint64(o, z.CRTs) + // string "region_id" + o = append(o, 0xa9, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64) + o = msgp.AppendUint64(o, z.RegionID) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *RawKVEntry) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "op_type": + { + var zb0002 int + zb0002, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "OpType") + return + } + z.OpType = OpType(zb0002) + } + case "key": + z.Key, bts, err = msgp.ReadBytesBytes(bts, z.Key) + if err != nil { + err = msgp.WrapError(err, "Key") + return + } + case "value": + z.Value, bts, err = msgp.ReadBytesBytes(bts, z.Value) + if err != nil { + err = msgp.WrapError(err, "Value") + return + } + case "old_value": + z.OldValue, bts, err = msgp.ReadBytesBytes(bts, z.OldValue) + if err != nil { + err = msgp.WrapError(err, "OldValue") + return + } + case "start_ts": + z.StartTs, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartTs") + return + } + case "crts": + z.CRTs, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "CRTs") + return + } + case "region_id": + z.RegionID, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "RegionID") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *RawKVEntry) Msgsize() (s int) { + s = 1 + 8 + msgp.IntSize + 4 + msgp.BytesPrefixSize + len(z.Key) + 6 + msgp.BytesPrefixSize + len(z.Value) + 10 + msgp.BytesPrefixSize + len(z.OldValue) + 9 + msgp.Uint64Size + 5 + msgp.Uint64Size + 10 + msgp.Uint64Size + return +} diff --git a/cdc/model/kv_gen_test.go b/cdc/model/kv_gen_test.go new file mode 100644 index 00000000000..dbf9aefd376 --- /dev/null +++ b/cdc/model/kv_gen_test.go @@ -0,0 +1,123 @@ +package model + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalRawKVEntry(t *testing.T) { + v := RawKVEntry{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgRawKVEntry(b *testing.B) { + v := RawKVEntry{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgRawKVEntry(b *testing.B) { + v := RawKVEntry{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalRawKVEntry(b *testing.B) { + v := RawKVEntry{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeRawKVEntry(t *testing.T) { + v := RawKVEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeRawKVEntry Msgsize() is inaccurate") + } + + vn := RawKVEntry{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeRawKVEntry(b *testing.B) { + v := RawKVEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeRawKVEntry(b *testing.B) { + v := RawKVEntry{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cdc/model/mounter.go b/cdc/model/mounter.go index 0368a4abe12..43dd8e890de 100644 --- a/cdc/model/mounter.go +++ b/cdc/model/mounter.go @@ -37,7 +37,7 @@ func NewPolymorphicEvent(rawKV *RawKVEntry) *PolymorphicEvent { StartTs: rawKV.StartTs, CRTs: rawKV.CRTs, RawKV: rawKV, - finished: make(chan struct{}), + finished: nil, } } @@ -56,6 +56,13 @@ func (e *PolymorphicEvent) RegionID() uint64 { return e.RawKV.RegionID } +// SetUpFinishedChan creates an internal channel to support PrepareFinished and WaitPrepare +func (e *PolymorphicEvent) SetUpFinishedChan() { + if e.finished == nil { + e.finished = make(chan struct{}) + } +} + // PrepareFinished marks the prepare process is finished // In prepare process, Mounter will translate raw KV to row data func (e *PolymorphicEvent) PrepareFinished() { diff --git a/cdc/model/mounter_test.go b/cdc/model/mounter_test.go index fc915cf375e..27926959fd4 100644 --- a/cdc/model/mounter_test.go +++ b/cdc/model/mounter_test.go @@ -55,6 +55,7 @@ func (s *mounterSuite) TestPolymorphicEventPrepare(c *check.C) { c.Assert(polyEvent.WaitPrepare(ctx), check.IsNil) polyEvent = NewPolymorphicEvent(&RawKVEntry{OpType: OpTypePut}) + polyEvent.SetUpFinishedChan() var wg sync.WaitGroup wg.Add(1) go func() { @@ -67,6 +68,7 @@ func (s *mounterSuite) TestPolymorphicEventPrepare(c *check.C) { cctx, cancel := context.WithCancel(ctx) polyEvent = NewPolymorphicEvent(&RawKVEntry{OpType: OpTypePut}) + polyEvent.SetUpFinishedChan() cancel() err := polyEvent.WaitPrepare(cctx) c.Assert(err, check.Equals, context.Canceled) diff --git a/cdc/processor.go b/cdc/processor.go index 4f42f71d18b..d4ae8660c40 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/ticdc/cdc/kv" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/cdc/puller" + psorter "github.com/pingcap/ticdc/cdc/puller/sorter" "github.com/pingcap/ticdc/cdc/sink" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/filter" @@ -705,7 +706,7 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { select { case <-ctx.Done(): return - case p.output <- model.NewResolvedPolymorphicEvent(0, lastResolvedTs): + case p.output <- model.NewResolvedPolymorphicEvent(0, globalResolvedTs): // regionID = 0 means the event is produced by TiCDC } } @@ -881,6 +882,7 @@ func (p *processor) syncResolved(ctx context.Context) error { resolvedTs = localResolvedTs } if row.CRTs <= resolvedTs { + _ = row.WaitPrepare(ctx) log.Fatal("The CRTs must be greater than the resolvedTs", zap.String("model", "processor"), zap.String("changefeed", p.changefeedID), @@ -991,7 +993,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo switch p.changefeed.Engine { case model.SortInMemory: sorterImpl = puller.NewEntrySorter() - case model.SortInFile: + case model.SortInFile, model.SortUnified: err := util.IsDirAndWritable(p.changefeed.SortDir) if err != nil { if os.IsNotExist(errors.Cause(err)) { @@ -1005,7 +1007,13 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo return nil } } - sorterImpl = puller.NewFileSorter(p.changefeed.SortDir) + + if p.changefeed.Engine == model.SortInFile { + sorterImpl = puller.NewFileSorter(p.changefeed.SortDir) + } else { + // Unified Sorter + sorterImpl = psorter.NewUnifiedSorter(p.changefeed.SortDir, tableName, util.CaptureAddrFromCtx(ctx)) + } default: p.errCh <- cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine) return nil @@ -1109,6 +1117,17 @@ func (p *processor) sorterConsume( if pEvent == nil { continue } + + pEvent.SetUpFinishedChan() + select { + case <-ctx.Done(): + if errors.Cause(ctx.Err()) != context.Canceled { + p.errCh <- ctx.Err() + } + return + case p.mounter.Input() <- pEvent: + } + if pEvent.RawKV != nil && pEvent.RawKV.OpType == model.OpTypeResolved { atomic.StoreUint64(pResolvedTs, pEvent.CRTs) lastResolvedTs = pEvent.CRTs @@ -1165,14 +1184,6 @@ func (p *processor) pullerConsume( } pEvent := model.NewPolymorphicEvent(rawKV) sorter.AddEntry(ctx, pEvent) - select { - case <-ctx.Done(): - if errors.Cause(ctx.Err()) != context.Canceled { - p.errCh <- ctx.Err() - } - return - case p.mounter.Input() <- pEvent: - } } } } diff --git a/cdc/puller/file_sorter.go b/cdc/puller/file_sorter.go index 10fc85429b7..f74f2571178 100644 --- a/cdc/puller/file_sorter.go +++ b/cdc/puller/file_sorter.go @@ -193,15 +193,8 @@ func flushEventsToFile(ctx context.Context, fullpath string, entries []*model.Po dataBuf := new(bytes.Buffer) var dataLen [8]byte for _, entry := range entries { - err := entry.WaitPrepare(ctx) - if err != nil { - return 0, errors.Trace(err) - } - if entry.Row == nil { - continue - } dataBuf.Reset() - err = msgpack.NewEncoder(dataBuf).Encode(entry) + err := msgpack.NewEncoder(dataBuf).Encode(entry) if err != nil { return 0, cerror.WrapError(cerror.ErrFileSorterEncode, err) } @@ -247,9 +240,19 @@ type sortItem struct { type sortHeap []*sortItem -func (h sortHeap) Len() int { return len(h) } -func (h sortHeap) Less(i, j int) bool { return h[i].entry.CRTs < h[j].entry.CRTs } -func (h sortHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h sortHeap) Len() int { return len(h) } +func (h sortHeap) Less(i, j int) bool { + if h[i].entry.CRTs == h[j].entry.CRTs { + if h[j].entry.RawKV != nil && h[j].entry.RawKV.OpType == model.OpTypeResolved && h[i].entry.RawKV.OpType != model.OpTypeResolved { + return true + } + if h[i].entry.RawKV != nil && h[i].entry.RawKV.OpType == model.OpTypeDelete && h[j].entry.RawKV.OpType != model.OpTypeDelete { + return true + } + } + return h[i].entry.CRTs < h[j].entry.CRTs +} +func (h sortHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *sortHeap) Push(x interface{}) { *h = append(*h, x.(*sortItem)) } diff --git a/cdc/puller/sorter/backend.go b/cdc/puller/sorter/backend.go new file mode 100644 index 00000000000..4554ffc7e69 --- /dev/null +++ b/cdc/puller/sorter/backend.go @@ -0,0 +1,39 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import "github.com/pingcap/ticdc/cdc/model" + +type backEnd interface { + reader() (backEndReader, error) + writer() (backEndWriter, error) + free() error +} + +type backEndReader interface { + readNext() (*model.PolymorphicEvent, error) + resetAndClose() error +} + +type backEndWriter interface { + writeNext(event *model.PolymorphicEvent) error + writtenCount() int + dataSize() uint64 + flushAndClose() error +} + +type serializerDeserializer interface { + marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) + unmarshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) +} diff --git a/cdc/puller/sorter/backend_pool.go b/cdc/puller/sorter/backend_pool.go new file mode 100644 index 00000000000..9fdffabe3a9 --- /dev/null +++ b/cdc/puller/sorter/backend_pool.go @@ -0,0 +1,175 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "context" + "fmt" + "os" + "reflect" + "runtime/debug" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/mackerelio/go-osstat/memory" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" +) + +var ( + pool *backEndPool // this is the singleton instance of backEndPool + poolMu sync.Mutex // this mutex is for delayed initialization of `pool` only +) + +type backEndPool struct { + memoryUseEstimate int64 + onDiskDataSize int64 + fileNameCounter uint64 + memPressure int32 + cache [256]unsafe.Pointer + dir string +} + +func newBackEndPool(dir string, captureAddr string) *backEndPool { + ret := &backEndPool{ + memoryUseEstimate: 0, + fileNameCounter: 0, + dir: dir, + } + + go func() { + ticker := time.NewTicker(5 * time.Second) + + metricSorterInMemoryDataSizeGauge := sorterInMemoryDataSizeGauge.WithLabelValues(captureAddr) + metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr) + metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr) + + for { + <-ticker.C + + metricSorterInMemoryDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.memoryUseEstimate))) + metricSorterOnDiskDataSizeGauge.Set(float64(atomic.LoadInt64(&ret.onDiskDataSize))) + metricSorterOpenFileCountGauge.Set(float64(atomic.LoadInt64(&openFDCount))) + + // update memPressure + m, err := memory.Get() + if err != nil { + failpoint.Inject("sorterDebug", func() { + log.Fatal("unified sorter: getting system memory usage failed", zap.Error(err)) + }) + + log.Warn("unified sorter: getting system memory usage failed", zap.Error(err)) + } + + memPressure := m.Used * 100 / m.Total + atomic.StoreInt32(&ret.memPressure, int32(memPressure)) + if memPressure > 50 { + log.Debug("unified sorter: high memory pressure", zap.Uint64("memPressure", memPressure), + zap.Int64("usedBySorter", atomic.LoadInt64(&ret.memoryUseEstimate))) + // Increase GC frequency to avoid necessary OOM + debug.SetGCPercent(10) + } else { + debug.SetGCPercent(100) + } + + // garbage collect temporary files in batches + freedCount := 0 + for i := range ret.cache { + ptr := &ret.cache[i] + innerPtr := atomic.SwapPointer(ptr, nil) + if innerPtr == nil { + continue + } + backEnd := (*fileBackEnd)(innerPtr) + err := backEnd.free() + if err != nil { + log.Warn("Cannot remove temporary file for sorting", zap.String("file", backEnd.fileName), zap.Error(err)) + } else { + log.Info("Temporary file removed", zap.String("file", backEnd.fileName)) + freedCount += 1 + } + if freedCount >= 16 { + freedCount = 0 + break + } + } + } + }() + + return ret +} + +func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) { + sorterConfig := config.GetSorterConfig() + if atomic.LoadInt64(&p.memoryUseEstimate) < int64(sorterConfig.MaxMemoryConsumption) && + atomic.LoadInt32(&p.memPressure) < int32(sorterConfig.MaxMemoryPressure) { + + ret := newMemoryBackEnd() + return ret, nil + } + + for i := range p.cache { + ptr := &p.cache[i] + ret := atomic.SwapPointer(ptr, nil) + if ret != nil { + return (*fileBackEnd)(ret), nil + } + } + + fname := fmt.Sprintf("%s/sort-%d-%d", p.dir, os.Getpid(), atomic.AddUint64(&p.fileNameCounter, 1)) + log.Debug("Unified Sorter: trying to create file backEnd", zap.String("filename", fname)) + + ret, err := newFileBackEnd(fname, &msgPackGenSerde{}) + if err != nil { + return nil, errors.Trace(err) + } + + return ret, nil +} + +func (p *backEndPool) dealloc(backEnd backEnd) error { + switch b := backEnd.(type) { + case *memoryBackEnd: + // Let GC do its job + return nil + case *fileBackEnd: + failpoint.Inject("sorterDebug", func() { + if atomic.LoadInt32(&b.borrowed) != 0 { + log.Warn("Deallocating a fileBackEnd in use", zap.String("filename", b.fileName)) + failpoint.Return(nil) + } + }) + for i := range p.cache { + ptr := &p.cache[i] + if atomic.CompareAndSwapPointer(ptr, nil, unsafe.Pointer(b)) { + return nil + } + } + // Cache is full. + err := b.free() + if err != nil { + return errors.Trace(err) + } + + return nil + default: + log.Fatal("backEndPool: unexpected backEnd type to be deallocated", zap.Reflect("type", reflect.TypeOf(backEnd))) + } + return nil +} diff --git a/cdc/puller/sorter/file_backend.go b/cdc/puller/sorter/file_backend.go new file mode 100644 index 00000000000..9b2ad3e15b7 --- /dev/null +++ b/cdc/puller/sorter/file_backend.go @@ -0,0 +1,345 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "bufio" + "encoding/binary" + "io" + "os" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "go.uber.org/zap" +) + +const ( + fileBufferSize = 1 * 1024 * 1024 // 1MB + magic = 0xbeefbeef +) + +var ( + openFDCount int64 +) + +type fileBackEnd struct { + fileName string + serde serializerDeserializer + borrowed int32 + size int64 +} + +func newFileBackEnd(fileName string, serde serializerDeserializer) (*fileBackEnd, error) { + f, err := os.Create(fileName) + if err != nil { + return nil, errors.Trace(err) + } + + err = f.Close() + if err != nil { + return nil, errors.Trace(err) + } + + log.Debug("new FileSorterBackEnd created", zap.String("filename", fileName)) + return &fileBackEnd{ + fileName: fileName, + serde: serde, + borrowed: 0, + }, nil +} + +func (f *fileBackEnd) reader() (backEndReader, error) { + fd, err := os.OpenFile(f.fileName, os.O_RDONLY, 0644) + if err != nil { + return nil, errors.Trace(err) + } + + atomic.AddInt64(&openFDCount, 1) + + var totalSize int64 + failpoint.Inject("sorterDebug", func() { + info, err := fd.Stat() + if err != nil { + failpoint.Return(nil, errors.Trace(err)) + } + totalSize = info.Size() + }) + + failpoint.Inject("sorterDebug", func() { + if atomic.SwapInt32(&f.borrowed, 1) != 0 { + log.Fatal("fileBackEnd: already borrowed", zap.String("fileName", f.fileName)) + } + }) + + return &fileBackEndReader{ + backEnd: f, + f: fd, + reader: bufio.NewReaderSize(fd, fileBufferSize), + totalSize: totalSize, + }, nil +} + +func (f *fileBackEnd) writer() (backEndWriter, error) { + fd, err := os.OpenFile(f.fileName, os.O_TRUNC|os.O_RDWR, 0644) + if err != nil { + return nil, errors.Trace(err) + } + + atomic.AddInt64(&openFDCount, 1) + + failpoint.Inject("sorterDebug", func() { + if atomic.SwapInt32(&f.borrowed, 1) != 0 { + log.Fatal("fileBackEnd: already borrowed", zap.String("fileName", f.fileName)) + } + }) + + return &fileBackEndWriter{ + backEnd: f, + f: fd, + writer: bufio.NewWriterSize(fd, fileBufferSize), + }, nil +} + +func (f *fileBackEnd) free() error { + failpoint.Inject("sorterDebug", func() { + if atomic.LoadInt32(&f.borrowed) != 0 { + log.Fatal("fileBackEnd: trying to free borrowed file", zap.String("fileName", f.fileName)) + } + }) + + err := os.Remove(f.fileName) + if err != nil { + failpoint.Inject("sorterDebug", func() { + failpoint.Return(errors.Trace(err)) + }) + // ignore this error in production to provide some resilience + log.Warn("fileBackEnd: failed to remove file", zap.Error(err)) + } + + return nil +} + +type fileBackEndReader struct { + backEnd *fileBackEnd + f *os.File + reader *bufio.Reader + rawBytesBuf []byte + isEOF bool + + // debug only fields + readBytes int64 + totalSize int64 +} + +func (r *fileBackEndReader) readNext() (*model.PolymorphicEvent, error) { + if r.isEOF { + // guaranteed EOF idempotency + return nil, nil + } + + var m uint32 + err := binary.Read(r.reader, binary.LittleEndian, &m) + if err != nil { + if err == io.EOF { + r.isEOF = true + return nil, nil + } + return nil, errors.Trace(err) + } + + if m != magic { + log.Fatal("fileSorterBackEnd: wrong magic. Damaged file or bug?", zap.Uint32("magic", m)) + } + + var size uint32 + err = binary.Read(r.reader, binary.LittleEndian, &size) + if err != nil { + return nil, errors.Trace(err) + } + + if cap(r.rawBytesBuf) < int(size) { + r.rawBytesBuf = make([]byte, size) + } else { + r.rawBytesBuf = r.rawBytesBuf[:size] + } + + // short reads are possible with bufio, hence the need for io.ReadFull + n, err := io.ReadFull(r.reader, r.rawBytesBuf) + if err != nil { + return nil, errors.Trace(err) + } + + if n != int(size) { + return nil, errors.Errorf("fileSorterBackEnd: expected %d bytes, actually read %d bytes", size, n) + } + + event := new(model.PolymorphicEvent) + _, err = r.backEnd.serde.unmarshal(event, r.rawBytesBuf) + if err != nil { + return nil, errors.Trace(err) + } + + failpoint.Inject("sorterDebug", func() { + r.readBytes += int64(4 + 4 + int(size)) + if r.readBytes > r.totalSize { + log.Fatal("fileSorterBackEnd: read more bytes than expected, check concurrent use of file", + zap.String("fileName", r.backEnd.fileName)) + } + }) + + return event, nil +} + +func (r *fileBackEndReader) resetAndClose() error { + defer func() { + // fail-fast for double-close + r.f = nil + + atomic.AddInt64(&pool.onDiskDataSize, -r.backEnd.size) + + failpoint.Inject("sorterDebug", func() { + atomic.StoreInt32(&r.backEnd.borrowed, 0) + }) + + }() + + if r.f == nil { + failpoint.Inject("sorterDebug", func() { + log.Fatal("Double closing of file", zap.String("filename", r.backEnd.fileName)) + }) + log.Warn("Double closing of file", zap.String("filename", r.backEnd.fileName)) + return nil + } + + err := r.f.Truncate(0) + if err != nil { + failpoint.Inject("sorterDebug", func() { + info, err1 := r.f.Stat() + if err1 != nil { + failpoint.Return(errors.Trace(err)) + } + + log.Info("file debug info", zap.String("filename", info.Name()), + zap.Int64("size", info.Size())) + + failpoint.Return(nil) + }) + log.Warn("fileBackEndReader: could not truncate file", zap.Error(err)) + } + + err = r.f.Close() + if err != nil { + failpoint.Inject("sorterDebug", func() { + failpoint.Return(errors.Trace(err)) + }) + log.Warn("fileBackEndReader: could not close file", zap.Error(err)) + return nil + } + + atomic.AddInt64(&openFDCount, -1) + + return nil +} + +type fileBackEndWriter struct { + backEnd *fileBackEnd + f *os.File + writer *bufio.Writer + rawBytesBuf []byte + + bytesWritten int64 + eventsWritten int64 +} + +func (w *fileBackEndWriter) writeNext(event *model.PolymorphicEvent) error { + var err error + w.rawBytesBuf, err = w.backEnd.serde.marshal(event, w.rawBytesBuf) + if err != nil { + return errors.Trace(err) + } + + size := len(w.rawBytesBuf) + if size == 0 { + log.Fatal("fileSorterBackEnd: serialized to empty byte array. Bug?") + } + + err = binary.Write(w.writer, binary.LittleEndian, uint32(magic)) + if err != nil { + return errors.Trace(err) + } + + err = binary.Write(w.writer, binary.LittleEndian, uint32(size)) + if err != nil { + return errors.Trace(err) + } + + // short writes are possible with bufio + offset := 0 + for offset < size { + n, err := w.writer.Write(w.rawBytesBuf[offset:]) + if err != nil { + return errors.Trace(err) + } + offset += n + } + if offset != size { + return errors.Errorf("fileSorterBackEnd: expected to write %d bytes, actually wrote %d bytes", size, offset) + } + + w.eventsWritten++ + w.bytesWritten += int64(size) + return nil +} + +func (w *fileBackEndWriter) writtenCount() int { + return int(w.bytesWritten) +} + +func (w *fileBackEndWriter) dataSize() uint64 { + return uint64(w.eventsWritten) +} + +func (w *fileBackEndWriter) flushAndClose() error { + defer func() { + // fail-fast for double-close + w.f = nil + }() + + err := w.writer.Flush() + if err != nil { + return errors.Trace(err) + } + + err = w.f.Close() + if err != nil { + failpoint.Inject("sorterDebug", func() { + failpoint.Return(errors.Trace(err)) + }) + log.Warn("fileBackEndReader: could not close file", zap.Error(err)) + return nil + } + + atomic.AddInt64(&openFDCount, -1) + w.backEnd.size = w.bytesWritten + atomic.AddInt64(&pool.onDiskDataSize, w.bytesWritten) + + failpoint.Inject("sorterDebug", func() { + atomic.StoreInt32(&w.backEnd.borrowed, 0) + }) + + return nil +} diff --git a/cdc/puller/sorter/heap.go b/cdc/puller/sorter/heap.go new file mode 100644 index 00000000000..52d55e631b2 --- /dev/null +++ b/cdc/puller/sorter/heap.go @@ -0,0 +1,48 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import "github.com/pingcap/ticdc/cdc/model" + +type sortItem struct { + entry *model.PolymorphicEvent + data interface{} +} + +type sortHeap []*sortItem + +func (h sortHeap) Len() int { return len(h) } +func (h sortHeap) Less(i, j int) bool { + if h[i].entry.CRTs == h[j].entry.CRTs { + if h[j].entry.RawKV.OpType == model.OpTypeResolved && h[i].entry.RawKV.OpType != model.OpTypeResolved { + return true + } + if h[i].entry.RawKV.OpType == model.OpTypeDelete && h[j].entry.RawKV.OpType != model.OpTypeDelete { + return true + } + } + return h[i].entry.CRTs < h[j].entry.CRTs +} +func (h sortHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *sortHeap) Push(x interface{}) { + *h = append(*h, x.(*sortItem)) +} +func (h *sortHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil + *h = old[0 : n-1] + return x +} diff --git a/cdc/puller/sorter/heap_sorter.go b/cdc/puller/sorter/heap_sorter.go new file mode 100644 index 00000000000..bc6aab7b6aa --- /dev/null +++ b/cdc/puller/sorter/heap_sorter.go @@ -0,0 +1,251 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "container/heap" + "context" + "github.com/pingcap/ticdc/pkg/util" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" +) + +const ( + flushRateLimitPerSecond = 10 +) + +type flushTask struct { + taskID int + heapSorterID int + backend backEnd + reader backEndReader + tsLowerBound uint64 + maxResolvedTs uint64 + finished chan error + dealloc func() error + dataSize int64 + lastTs uint64 // for debugging TODO remove +} + +type heapSorter struct { + id int + taskCounter int + inputCh chan *model.PolymorphicEvent + outputCh chan *flushTask + heap sortHeap +} + +func newHeapSorter(id int, out chan *flushTask) *heapSorter { + return &heapSorter{ + id: id, + inputCh: make(chan *model.PolymorphicEvent, 1024*1024), + outputCh: out, + heap: make(sortHeap, 0, 65536), + } +} + +// flush should only be called within the main loop in run(). +func (h *heapSorter) flush(ctx context.Context, maxResolvedTs uint64) error { + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + _, tableName := util.TableIDFromCtx(ctx) + sorterFlushCountHistogram.WithLabelValues(captureAddr, changefeedID, tableName).Observe(float64(h.heap.Len())) + + isEmptyFlush := h.heap.Len() == 0 + if isEmptyFlush { + return nil + } + var ( + backEnd backEnd + lowerBound uint64 + ) + + if !isEmptyFlush { + var err error + backEnd, err = pool.alloc(ctx) + if err != nil { + return errors.Trace(err) + } + + lowerBound = h.heap[0].entry.CRTs + } + + task := &flushTask{ + taskID: h.taskCounter, + heapSorterID: h.id, + backend: backEnd, + tsLowerBound: lowerBound, + maxResolvedTs: maxResolvedTs, + finished: make(chan error, 2), + } + h.taskCounter++ + + var oldHeap sortHeap + if !isEmptyFlush { + task.dealloc = func() error { + if task.backend != nil { + task.backend = nil + return pool.dealloc(backEnd) + } + return nil + } + oldHeap = h.heap + h.heap = make(sortHeap, 0, 65536) + } else { + task.dealloc = func() error { + return nil + } + } + + log.Debug("Unified Sorter new flushTask", + zap.String("table", tableNameFromCtx(ctx)), + zap.Int("heap-id", task.heapSorterID), + zap.Uint64("resolvedTs", task.maxResolvedTs)) + + go func() { + if isEmptyFlush { + return + } + backEndFinal := backEnd + writer, err := backEnd.writer() + if err != nil { + if backEndFinal != nil { + _ = task.dealloc() + } + task.finished <- errors.Trace(err) + return + } + + defer func() { + // handle errors (or aborts) gracefully to prevent resource leaking (especially FD's) + if writer != nil { + _ = writer.flushAndClose() + } + if backEndFinal != nil { + _ = task.dealloc() + } + close(task.finished) + }() + + for oldHeap.Len() > 0 { + select { + case <-ctx.Done(): + task.finished <- ctx.Err() + default: + } + + event := heap.Pop(&oldHeap).(*sortItem).entry + err := writer.writeNext(event) + if err != nil { + task.finished <- errors.Trace(err) + return + } + } + + dataSize := writer.dataSize() + atomic.StoreInt64(&task.dataSize, int64(dataSize)) + eventCount := writer.writtenCount() + + writer1 := writer + writer = nil + err = writer1.flushAndClose() + if err != nil { + task.finished <- errors.Trace(err) + return + } + + backEndFinal = nil + task.finished <- nil // DO NOT access `task` beyond this point in this function + log.Debug("Unified Sorter flushTask finished", + zap.Int("heap-id", task.heapSorterID), + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("resolvedTs", task.maxResolvedTs), + zap.Uint64("data-size", dataSize), + zap.Int("size", eventCount)) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case h.outputCh <- task: + } + return nil +} + +func (h *heapSorter) run(ctx context.Context) error { + var ( + maxResolved uint64 + heapSizeBytesEstimate int64 + rateCounter int + ) + + rateTicker := time.NewTicker(1 * time.Second) + defer rateTicker.Stop() + + flushTicker := time.NewTicker(5 * time.Second) + defer flushTicker.Stop() + + sorterConfig := config.GetSorterConfig() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event := <-h.inputCh: + heap.Push(&h.heap, &sortItem{entry: event}) + isResolvedEvent := event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved + + if isResolvedEvent { + if event.RawKV.CRTs < maxResolved { + log.Fatal("ResolvedTs regression, bug?", zap.Uint64("event-resolvedTs", event.RawKV.CRTs), + zap.Uint64("max-resolvedTs", maxResolved)) + } + maxResolved = event.RawKV.CRTs + } + + if event.RawKV.CRTs < maxResolved { + log.Fatal("Bad input to sorter", zap.Uint64("cur-ts", event.RawKV.CRTs), zap.Uint64("maxResolved", maxResolved)) + } + + // 5 * 8 is for the 5 fields in PolymorphicEvent + heapSizeBytesEstimate += event.RawKV.ApproximateSize() + 40 + needFlush := heapSizeBytesEstimate >= int64(sorterConfig.ChunkSizeLimit) || + (isResolvedEvent && rateCounter < flushRateLimitPerSecond) + + if needFlush { + rateCounter++ + err := h.flush(ctx, maxResolved) + if err != nil { + return errors.Trace(err) + } + heapSizeBytesEstimate = 0 + } + case <-flushTicker.C: + if rateCounter < flushRateLimitPerSecond { + err := h.flush(ctx, maxResolved) + if err != nil { + return errors.Trace(err) + } + heapSizeBytesEstimate = 0 + } + case <-rateTicker.C: + rateCounter = 0 + } + } +} diff --git a/cdc/puller/sorter/memory_backend.go b/cdc/puller/sorter/memory_backend.go new file mode 100644 index 00000000000..10ad504b62c --- /dev/null +++ b/cdc/puller/sorter/memory_backend.go @@ -0,0 +1,135 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "go.uber.org/zap" + "sync/atomic" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" +) + +type memoryBackEnd struct { + events []*model.PolymorphicEvent + estimatedSize int64 + borrowed int32 +} + +func newMemoryBackEnd() *memoryBackEnd { + return &memoryBackEnd{} +} + +func (m *memoryBackEnd) reader() (backEndReader, error) { + failpoint.Inject("sorterDebug", func() { + if atomic.SwapInt32(&m.borrowed, 1) != 0 { + log.Fatal("memoryBackEnd: already borrowed") + } + }) + + return &memoryBackEndReader{ + backEnd: m, + readIndex: 0, + }, nil +} + +func (m *memoryBackEnd) writer() (backEndWriter, error) { + failpoint.Inject("sorterDebug", func() { + if atomic.SwapInt32(&m.borrowed, 1) != 0 { + log.Fatal("memoryBackEnd: already borrowed") + } + }) + + return &memoryBackEndWriter{backEnd: m}, nil +} + +func (m *memoryBackEnd) free() error { + failpoint.Inject("sorterDebug", func() { + if atomic.LoadInt32(&m.borrowed) != 0 { + log.Fatal("fileBackEnd: trying to free borrowed file") + } + }) + + return nil +} + +type memoryBackEndReader struct { + backEnd *memoryBackEnd + readIndex int +} + +func (r *memoryBackEndReader) readNext() (*model.PolymorphicEvent, error) { + // Check for "EOF" + if r.readIndex >= len(r.backEnd.events) { + return nil, nil + } + + ret := r.backEnd.events[r.readIndex] + r.readIndex++ + return ret, nil +} + +func (r *memoryBackEndReader) resetAndClose() error { + failpoint.Inject("sorterDebug", func() { + atomic.StoreInt32(&r.backEnd.borrowed, 0) + }) + + atomic.AddInt64(&pool.memoryUseEstimate, -r.backEnd.estimatedSize) + + return nil +} + +type memoryBackEndWriter struct { + backEnd *memoryBackEnd + bytesWritten int64 + // for debugging only + maxTs uint64 +} + +func (w *memoryBackEndWriter) writeNext(event *model.PolymorphicEvent) error { + w.backEnd.events = append(w.backEnd.events, event) + // 8 * 5 is for the 5 fields in PolymorphicEvent, each of which is thought of as a 64-bit pointer + w.bytesWritten += 8*5 + event.RawKV.ApproximateSize() + + failpoint.Inject("sorterDebug", func() { + if event.CRTs < w.maxTs { + log.Fatal("memoryBackEnd: ts regressed, bug?", + zap.Uint64("prev-ts", w.maxTs), + zap.Uint64("cur-ts", event.CRTs)) + } + w.maxTs = event.CRTs + }) + return nil +} + +func (w *memoryBackEndWriter) writtenCount() int { + return len(w.backEnd.events) +} + +// dataSize for the memoryBackEnd returns only an estimation, as there is no serialization taking place. +func (w *memoryBackEndWriter) dataSize() uint64 { + return uint64(w.bytesWritten) +} + +func (w *memoryBackEndWriter) flushAndClose() error { + failpoint.Inject("sorterDebug", func() { + atomic.StoreInt32(&w.backEnd.borrowed, 0) + }) + + w.backEnd.estimatedSize = w.bytesWritten + atomic.AddInt64(&pool.memoryUseEstimate, w.bytesWritten) + + return nil +} diff --git a/cdc/puller/sorter/merger.go b/cdc/puller/sorter/merger.go new file mode 100644 index 00000000000..acf250a896f --- /dev/null +++ b/cdc/puller/sorter/merger.go @@ -0,0 +1,377 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "container/heap" + "context" + "github.com/pingcap/ticdc/pkg/util" + "go.uber.org/zap" + "math" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" +) + +func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out chan *model.PolymorphicEvent) error { + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + _, tableName := util.TableIDFromCtx(ctx) + + metricSorterEventCount := sorterEventCount.MustCurryWith(map[string]string{ + "capture": captureAddr, + "changefeed": changefeedID, + "table": tableName}) + metricSorterResolvedTsGauge := sorterResolvedTsGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricSorterMergerStartTsGauge := sorterMergerStartTsGauge.WithLabelValues(captureAddr, changefeedID, tableName) + metricSorterMergeCountHistogram := sorterMergeCountHistogram.WithLabelValues(captureAddr, changefeedID, tableName) + + lastResolvedTs := make([]uint64, numSorters) + minResolvedTs := uint64(0) + + pendingSet := make(map[*flushTask]*model.PolymorphicEvent) + defer func() { + log.Info("Unified Sorter: merger exiting, cleaning up resources", zap.Int("pending-set-size", len(pendingSet))) + // clean up resources + for task := range pendingSet { + if task.reader != nil { + _ = printError(task.reader.resetAndClose()) + } + _ = printError(task.dealloc()) + } + }() + + lastOutputTs := uint64(0) + var lastEvent *model.PolymorphicEvent + var lastTask *flushTask + + sendResolvedEvent := func(ts uint64) error { + select { + case <-ctx.Done(): + return ctx.Err() + case out <- model.NewResolvedPolymorphicEvent(0, ts): + metricSorterEventCount.WithLabelValues("resolved").Inc() + metricSorterResolvedTsGauge.Set(float64(ts)) + return nil + } + } + + onMinResolvedTsUpdate := func() error { + metricSorterMergerStartTsGauge.Set(float64(minResolvedTs)) + + workingSet := make(map[*flushTask]struct{}) + sortHeap := new(sortHeap) + + defer func() { + // clean up + for task := range workingSet { + select { + case <-ctx.Done(): + break + case err := <-task.finished: + _ = printError(err) + } + + if task.reader != nil { + err := task.reader.resetAndClose() + task.reader = nil + _ = printError(err) + } + _ = printError(task.dealloc()) + } + }() + + for task, cache := range pendingSet { + if task.tsLowerBound > minResolvedTs { + // the condition above implies that for any event in task.backend, CRTs > minResolvedTs. + continue + } + var event *model.PolymorphicEvent + if cache != nil { + event = cache + } else { + var err error + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-task.finished: + if err != nil { + return errors.Trace(err) + } + } + + if task.reader == nil { + task.reader, err = task.backend.reader() + if err != nil { + return errors.Trace(err) + } + } + + event, err = task.reader.readNext() + if err != nil { + return errors.Trace(err) + } + + if event == nil { + log.Fatal("Unexpected end of backEnd data, bug?", + zap.Uint64("minResolvedTs", task.maxResolvedTs)) + } + } + + if event.CRTs > minResolvedTs { + pendingSet[task] = event + continue + } + + pendingSet[task] = nil + workingSet[task] = struct{}{} + + heap.Push(sortHeap, &sortItem{ + entry: event, + data: task, + }) + } + + resolvedTicker := time.NewTicker(1 * time.Second) + defer resolvedTicker.Stop() + + retire := func(task *flushTask) error { + delete(workingSet, task) + if pendingSet[task] != nil { + return nil + } + nextEvent, err := task.reader.readNext() + if err != nil { + _ = task.reader.resetAndClose() // prevents fd leak + task.reader = nil + return errors.Trace(err) + } + + if nextEvent == nil { + delete(pendingSet, task) + + err := task.reader.resetAndClose() + if err != nil { + return errors.Trace(err) + } + task.reader = nil + + err = task.dealloc() + if err != nil { + return errors.Trace(err) + } + } else { + pendingSet[task] = nextEvent + if nextEvent.CRTs < minResolvedTs { + log.Fatal("remaining event CRTs too small", zap.Uint64("next-ts", nextEvent.CRTs), zap.Uint64("minResolvedTs", minResolvedTs)) + } + } + return nil + } + + if sortHeap.Len() > 0 { + log.Debug("Unified Sorter: start merging", + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("minResolvedTs", minResolvedTs)) + } + + counter := 0 + for sortHeap.Len() > 0 { + item := heap.Pop(sortHeap).(*sortItem) + task := item.data.(*flushTask) + event := item.entry + + if event.CRTs < task.lastTs { + log.Fatal("unified sorter: ts regressed in one backEnd, bug?", zap.Uint64("cur-ts", event.CRTs), zap.Uint64("last-ts", task.lastTs)) + } + task.lastTs = event.CRTs + + if event.RawKV != nil && event.RawKV.OpType != model.OpTypeResolved { + if event.CRTs < lastOutputTs { + for sortHeap.Len() > 0 { + item := heap.Pop(sortHeap).(*sortItem) + task := item.data.(*flushTask) + event := item.entry + log.Debug("dump", zap.Reflect("event", event), zap.Int("heap-id", task.heapSorterID)) + } + log.Fatal("unified sorter: output ts regressed, bug?", + zap.Int("counter", counter), + zap.Uint64("minResolvedTs", minResolvedTs), + zap.Int("cur-heap-id", task.heapSorterID), + zap.Int("cur-task-id", task.taskID), + zap.Uint64("cur-task-resolved", task.maxResolvedTs), + zap.Reflect("cur-event", event), + zap.Uint64("cur-ts", event.CRTs), + zap.Int("last-heap-id", lastTask.heapSorterID), + zap.Int("last-task-id", lastTask.taskID), + zap.Uint64("last-task-resolved", task.maxResolvedTs), + zap.Reflect("last-event", lastEvent), + zap.Uint64("last-ts", lastOutputTs), + zap.Int("sort-heap-len", sortHeap.Len())) + } + lastOutputTs = event.CRTs + lastEvent = event + lastTask = task + select { + case <-ctx.Done(): + return ctx.Err() + case out <- event: + metricSorterEventCount.WithLabelValues("kv").Inc() + } + } + counter += 1 + + select { + case <-resolvedTicker.C: + err := sendResolvedEvent(event.CRTs - 1) + if err != nil { + return errors.Trace(err) + } + default: + } + + event, err := task.reader.readNext() + if err != nil { + return errors.Trace(err) + } + + if event == nil { + // EOF + delete(workingSet, task) + delete(pendingSet, task) + + err := task.reader.resetAndClose() + if err != nil { + return errors.Trace(err) + } + task.reader = nil + + err = task.dealloc() + if err != nil { + return errors.Trace(err) + } + + continue + } + + if event.CRTs > minResolvedTs || (event.CRTs == minResolvedTs && event.RawKV.OpType == model.OpTypeResolved) { + // we have processed all events from this task that need to be processed in this merge + if event.CRTs > minResolvedTs || event.RawKV.OpType != model.OpTypeResolved { + pendingSet[task] = event + } + err := retire(task) + if err != nil { + return errors.Trace(err) + } + continue + } + + if counter%10 == 0 { + log.Debug("Merging progress", + zap.String("table", tableNameFromCtx(ctx)), + zap.Int("counter", counter)) + } + + heap.Push(sortHeap, &sortItem{ + entry: event, + data: task, + }) + } + + if len(workingSet) != 0 { + log.Fatal("unified sorter: merging ended prematurely, bug?", zap.Uint64("resolvedTs", minResolvedTs)) + } + + if counter > 0 { + log.Debug("Unified Sorter: merging ended", + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("resolvedTs", minResolvedTs), zap.Int("count", counter)) + } + err := sendResolvedEvent(minResolvedTs) + if err != nil { + return errors.Trace(err) + } + + metricSorterMergeCountHistogram.Observe(float64(counter)) + + return nil + } + + resolveTicker := time.NewTicker(1 * time.Second) + defer resolveTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case task := <-in: + if task == nil { + log.Info("Merger input channel closed, exiting", + zap.String("table", tableNameFromCtx(ctx)), + zap.Uint64("max-output", minResolvedTs)) + return nil + } + + if task.backend != nil { + pendingSet[task] = nil + } // otherwise it is an empty flush + + if lastResolvedTs[task.heapSorterID] < task.maxResolvedTs { + lastResolvedTs[task.heapSorterID] = task.maxResolvedTs + } + + minTemp := uint64(math.MaxUint64) + for _, ts := range lastResolvedTs { + if minTemp > ts { + minTemp = ts + } + } + + if minTemp > minResolvedTs { + minResolvedTs = minTemp + err := onMinResolvedTsUpdate() + if err != nil { + return errors.Trace(err) + } + } + case <-resolveTicker.C: + err := sendResolvedEvent(minResolvedTs) + if err != nil { + return errors.Trace(err) + } + } + } +} + +func mergerCleanUp(in <-chan *flushTask) { + for task := range in { + _ = printError(task.dealloc()) + } +} + +// printError is a helper for tracing errors on function returns +func printError(err error) error { + if err != nil && errors.Cause(err) != context.Canceled && + errors.Cause(err) != context.DeadlineExceeded && + !strings.Contains(err.Error(), "context canceled") && + !strings.Contains(err.Error(), "context deadline exceeded") { + + log.Warn("Unified Sorter: Error detected", zap.Error(err)) + } + return err +} diff --git a/cdc/puller/sorter/metrics.go b/cdc/puller/sorter/metrics.go new file mode 100644 index 00000000000..8f1a1408dcb --- /dev/null +++ b/cdc/puller/sorter/metrics.go @@ -0,0 +1,90 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + sorterEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_event_count", + Help: "the number of events output by the sorter", + }, []string{"capture", "changefeed", "table", "type"}) + + sorterResolvedTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_resolved_ts_gauge", + Help: "the resolved ts of the sorter", + }, []string{"capture", "changefeed", "table"}) + + sorterMergerStartTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_merger_start_ts_gauge", + Help: "the start TS of each merge in the sorter", + }, []string{"capture", "changefeed", "table"}) + + sorterInMemoryDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_in_memory_data_size_gauge", + Help: "the amount of pending data stored in-memory by the sorter", + }, []string{"capture"}) + + sorterOnDiskDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_on_disk_data_size_gauge", + Help: "the amount of pending data stored on-disk by the sorter", + }, []string{"capture"}) + + sorterOpenFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_open_file_count_gauge", + Help: "the number of open file descriptors held by the sorter", + }, []string{"capture"}) + + sorterFlushCountHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_flush_count_histogram", + Help: "Bucketed histogram of the number of events in individual flushes performed by the sorter", + Buckets: prometheus.ExponentialBuckets(4, 4, 10), + }, []string{"capture", "changefeed", "table"}) + + sorterMergeCountHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "sorter_merge_count_histogram", + Help: "Bucketed histogram of the number of events in individual merges performed by the sorter", + Buckets: prometheus.ExponentialBuckets(16, 4, 10), + }, []string{"capture", "changefeed", "table"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(sorterEventCount) + registry.MustRegister(sorterResolvedTsGauge) + registry.MustRegister(sorterMergerStartTsGauge) + registry.MustRegister(sorterInMemoryDataSizeGauge) + registry.MustRegister(sorterOnDiskDataSizeGauge) + registry.MustRegister(sorterOpenFileCountGauge) + registry.MustRegister(sorterFlushCountHistogram) + registry.MustRegister(sorterMergeCountHistogram) +} diff --git a/cdc/puller/sorter/serde.go b/cdc/puller/sorter/serde.go new file mode 100644 index 00000000000..66ec4c9971d --- /dev/null +++ b/cdc/puller/sorter/serde.go @@ -0,0 +1,43 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" +) + +type msgPackGenSerde struct { +} + +func (m *msgPackGenSerde) marshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) { + bytes = bytes[:0] + return event.RawKV.MarshalMsg(bytes) +} + +func (m *msgPackGenSerde) unmarshal(event *model.PolymorphicEvent, bytes []byte) ([]byte, error) { + if event.RawKV == nil { + event.RawKV = new(model.RawKVEntry) + } + + bytes, err := event.RawKV.UnmarshalMsg(bytes) + if err != nil { + return nil, errors.Trace(err) + } + + event.StartTs = event.RawKV.StartTs + event.CRTs = event.RawKV.CRTs + + return bytes, nil +} diff --git a/cdc/puller/sorter/unified_sorter.go b/cdc/puller/sorter/unified_sorter.go new file mode 100644 index 00000000000..1a48b9e4829 --- /dev/null +++ b/cdc/puller/sorter/unified_sorter.go @@ -0,0 +1,165 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "context" + "go.uber.org/zap" + "sync/atomic" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/util" + "golang.org/x/sync/errgroup" +) + +// UnifiedSorter provides both sorting in memory and in file. Memory pressure is used to determine which one to use. +type UnifiedSorter struct { + inputCh chan *model.PolymorphicEvent + outputCh chan *model.PolymorphicEvent + dir string + pool *backEndPool + tableName string // used only for debugging and tracing +} + +type ctxKey struct { +} + +// NewUnifiedSorter creates a new UnifiedSorter +func NewUnifiedSorter(dir string, tableName string, captureAddr string) *UnifiedSorter { + poolMu.Lock() + defer poolMu.Unlock() + + if pool == nil { + pool = newBackEndPool(dir, captureAddr) + } + + return &UnifiedSorter{ + inputCh: make(chan *model.PolymorphicEvent, 128000), + outputCh: make(chan *model.PolymorphicEvent, 128000), + dir: dir, + pool: pool, + tableName: tableName, + } +} + +// Run implements the EventSorter interface +func (s *UnifiedSorter) Run(ctx context.Context) error { + failpoint.Inject("sorterDebug", func() { + log.Info("sorterDebug: Running Unified Sorter in debug mode") + }) + + finish := util.MonitorCancelLatency(ctx, "Unified Sorter") + defer finish() + + valueCtx := context.WithValue(ctx, ctxKey{}, s) + + sorterConfig := config.GetSorterConfig() + numConcurrentHeaps := sorterConfig.NumConcurrentWorker + + errg, subctx := errgroup.WithContext(valueCtx) + heapSorterCollectCh := make(chan *flushTask, 4096) + // mergerCleanUp will consumer the remaining elements in heapSorterCollectCh to prevent any FD leak. + defer mergerCleanUp(heapSorterCollectCh) + + heapSorterErrg, subsubctx := errgroup.WithContext(subctx) + heapSorters := make([]*heapSorter, sorterConfig.NumConcurrentWorker) + for i := range heapSorters { + finalI := i + heapSorters[finalI] = newHeapSorter(finalI, heapSorterCollectCh) + heapSorterErrg.Go(func() error { + return printError(heapSorters[finalI].run(subsubctx)) + }) + } + + errg.Go(func() error { + // must wait for all writers to exit to close the channel. + defer close(heapSorterCollectCh) + return heapSorterErrg.Wait() + }) + + errg.Go(func() error { + return printError(runMerger(subctx, numConcurrentHeaps, heapSorterCollectCh, s.outputCh)) + }) + + errg.Go(func() error { + nextSorterID := 0 + for { + select { + case <-subctx.Done(): + return subctx.Err() + case event := <-s.inputCh: + if event.RawKV != nil && event.RawKV.OpType == model.OpTypeResolved { + // broadcast resolved events + for _, sorter := range heapSorters { + select { + case <-subctx.Done(): + return subctx.Err() + case sorter.inputCh <- event: + } + } + continue + } + + // dispatch a row changed event + targetID := nextSorterID % numConcurrentHeaps + nextSorterID++ + select { + case <-subctx.Done(): + return subctx.Err() + case heapSorters[targetID].inputCh <- event: + } + } + } + }) + + return printError(errg.Wait()) +} + +// AddEntry implements the EventSorter interface +func (s *UnifiedSorter) AddEntry(ctx context.Context, entry *model.PolymorphicEvent) { + select { + case <-ctx.Done(): + return + case s.inputCh <- entry: + } +} + +// Output implements the EventSorter interface +func (s *UnifiedSorter) Output() <-chan *model.PolymorphicEvent { + return s.outputCh +} + +// tableNameFromCtx is used for retrieving the table's name from a context within the Unified Sorter +func tableNameFromCtx(ctx context.Context) string { + if sorter, ok := ctx.Value(ctxKey{}).(*UnifiedSorter); ok { + return sorter.tableName + } + return "" +} + +func init() { + go func() { + ticker := time.NewTicker(5 * time.Second) + for range ticker.C { + failpoint.Inject("sorterDebug", func() { + log.Info("Unified Sorter: open file statistics", zap.Int64("num-open-fd", atomic.LoadInt64(&openFDCount))) + }) + } + }() + +} diff --git a/cdc/puller/sorter_test.go b/cdc/puller/sorter_test.go new file mode 100644 index 00000000000..ce5035e3b2f --- /dev/null +++ b/cdc/puller/sorter_test.go @@ -0,0 +1,199 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package puller + +import ( + "context" + "github.com/pingcap/failpoint" + sorter2 "github.com/pingcap/ticdc/cdc/puller/sorter" + "math" + "os" + "sync/atomic" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + _ "net/http/pprof" +) + +const ( + numProducers = 16 +) + +type sorterSuite struct{} + +var _ = check.Suite(&sorterSuite{}) + +func generateMockRawKV(ts uint64) *model.RawKVEntry { + return &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte{}, + Value: []byte{}, + OldValue: nil, + StartTs: ts - 5, + CRTs: ts, + RegionID: 0, + } +} + +func (s *sorterSuite) TestSorterBasic(c *check.C) { + config.SetSorterConfig(&config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + }) + + err := os.MkdirAll("./sorter", 0755) + c.Assert(err, check.IsNil) + sorter := sorter2.NewUnifiedSorter("./sorter", "test", "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + testSorter(ctx, c, sorter, 10000) +} + +func (s *sorterSuite) TestSorterCancel(c *check.C) { + config.SetSorterConfig(&config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 0, + }) + + err := os.MkdirAll("./sorter", 0755) + c.Assert(err, check.IsNil) + sorter := sorter2.NewUnifiedSorter("./sorter", "test", "0.0.0.0:0") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + finishedCh := make(chan struct{}) + go func() { + testSorter(ctx, c, sorter, 10000000) + close(finishedCh) + }() + + after := time.After(20 * time.Second) + select { + case <-after: + c.FailNow() + case <-finishedCh: + } + + log.Info("Sorter successfully cancelled") +} + +func testSorter(ctx context.Context, c *check.C, sorter EventSorter, count int) { + err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") + if err != nil { + log.Fatal("Could not enable failpoint", zap.Error(err)) + } + + ctx, cancel := context.WithCancel(ctx) + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return sorter.Run(ctx) + }) + + producerProgress := make([]uint64, numProducers) + + // launch the producers + for i := 0; i < numProducers; i++ { + finalI := i + errg.Go(func() error { + for j := 0; j < count; j++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(j)<<5))) + if j%10000 == 0 { + atomic.StoreUint64(&producerProgress[finalI], uint64(j)<<5) + } + } + sorter.AddEntry(ctx, model.NewPolymorphicEvent(generateMockRawKV(uint64(count)<<5))) + atomic.StoreUint64(&producerProgress[finalI], uint64(count)<<5) + return nil + }) + } + + // launch the resolver + errg.Go(func() error { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + resolvedTs := uint64(math.MaxUint64) + for i := range producerProgress { + ts := atomic.LoadUint64(&producerProgress[i]) + if resolvedTs > ts { + resolvedTs = ts + } + } + sorter.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, resolvedTs)) + if resolvedTs == uint64(count)<<5 { + return nil + } + } + } + }) + + // launch the consumer + errg.Go(func() error { + counter := 0 + lastTs := uint64(0) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event := <-sorter.Output(): + if event.RawKV.OpType != model.OpTypeResolved { + if event.CRTs < lastTs { + panic("regressed") + } + lastTs = event.CRTs + counter += 1 + if counter%10000 == 0 { + log.Debug("Messages received", zap.Int("counter", counter)) + } + if counter >= numProducers*count { + log.Debug("Unified Sorter test successful") + cancel() + } + } + case <-ticker.C: + log.Debug("Consumer is alive") + } + } + }) + + err = errg.Wait() + if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded { + return + } + c.Assert(err, check.IsNil) +} diff --git a/cmd/server.go b/cmd/server.go index d192dc30a41..e854174a16f 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -15,6 +15,7 @@ package cmd import ( "context" + "github.com/pingcap/ticdc/pkg/config" "time" "github.com/pingcap/errors" @@ -35,6 +36,11 @@ var ( gcTTL int64 logFile string logLevel string + // variables for unified sorter + numConcurrentWorker int + chunkSizeLimit uint64 + maxMemoryPressure int + maxMemoryConsumption uint64 ownerFlushInterval time.Duration processorFlushInterval time.Duration @@ -58,6 +64,12 @@ func init() { serverCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level (etc: debug|info|warn|error)") serverCmd.Flags().DurationVar(&ownerFlushInterval, "owner-flush-interval", time.Millisecond*200, "owner flushes changefeed status interval") serverCmd.Flags().DurationVar(&processorFlushInterval, "processor-flush-interval", time.Millisecond*100, "processor flushes task status interval") + + serverCmd.Flags().IntVar(&numConcurrentWorker, "sorter-num-concurrent-worker", 8, "sorter concurrency level") + serverCmd.Flags().Uint64Var(&chunkSizeLimit, "sorter-chunk-size-limit", 1024*1024*1024, "size of heaps for sorting") + serverCmd.Flags().IntVar(&maxMemoryPressure, "sorter-max-memory-percentage", 90, "system memory usage threshold for forcing in-disk sort") + serverCmd.Flags().Uint64Var(&maxMemoryConsumption, "sorter-max-memory-consumption", 16*1024*1024*1024, "maximum memory consumption of in-memory sort") + addSecurityFlags(serverCmd.Flags(), true /* isServer */) } @@ -72,6 +84,13 @@ func runEServer(cmd *cobra.Command, args []string) error { return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`") } + config.SetSorterConfig(&config.SorterConfig{ + NumConcurrentWorker: numConcurrentWorker, + ChunkSizeLimit: chunkSizeLimit, + MaxMemoryPressure: maxMemoryPressure, + MaxMemoryConsumption: maxMemoryConsumption, + }) + version.LogVersionInfo() opts := []cdc.ServerOption{ cdc.PDEndpoints(serverPdAddr), diff --git a/go.mod b/go.mod index cb498576b12..b2421017000 100644 --- a/go.mod +++ b/go.mod @@ -24,22 +24,41 @@ require ( github.com/jmoiron/sqlx v1.2.0 github.com/klauspost/compress v1.11.1 // indirect github.com/linkedin/goavro/v2 v2.9.7 +<<<<<<< HEAD github.com/mattn/go-shellwords v1.0.3 github.com/pingcap/br v0.0.0-20200907090854-8a4cd9e0abd1 +======= + github.com/mackerelio/go-osstat v0.1.0 + github.com/mattn/go-colorable v0.1.7 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mattn/go-shellwords v1.0.3 + github.com/philhofer/fwd v1.0.0 // indirect + github.com/pingcap/br v0.0.0-20200923023944-7456456854e4 +>>>>>>> ad1354b... optimization: Unified Sorter (#972) github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/kvproto v0.0.0-20200909045102-2ac90648531b github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 +<<<<<<< HEAD github.com/pingcap/parser v0.0.0-20200921063432-e220cfcfd026 github.com/pingcap/tidb v1.1.0-beta.0.20200921080130-30cfb6af225c +======= + github.com/pingcap/parser v0.0.0-20200924053142-5d7e8ebf605e + github.com/pingcap/tidb v1.1.0-beta.0.20200927065602-486e473a86e9 +>>>>>>> ad1354b... optimization: Unified Sorter (#972) github.com/pingcap/tidb-tools v4.0.6-0.20200828085514-03575b185007+incompatible github.com/prometheus/client_golang v1.5.1 github.com/r3labs/diff v1.1.0 github.com/spf13/cobra v1.0.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.1 +<<<<<<< HEAD github.com/tikv/pd v1.1.0-beta.0.20200907080620-6830f5bb92a2 +======= + github.com/tikv/pd v1.1.0-beta.0.20200907085700-5b04bec39b99 + github.com/tinylib/msgp v1.1.2 +>>>>>>> ad1354b... optimization: Unified Sorter (#972) github.com/uber-go/atomic v1.3.2 github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1 go.etcd.io/etcd v0.5.0-alpha.5.0.20200425165423-262c93980547 diff --git a/go.sum b/go.sum index e7550816fdb..df12b86347e 100644 --- a/go.sum +++ b/go.sum @@ -377,6 +377,8 @@ github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/linkedin/goavro/v2 v2.9.7 h1:Vd++Rb/RKcmNJjM0HP/JJFMEWa21eUBVKPYlKehOGrM= github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= +github.com/mackerelio/go-osstat v0.1.0 h1:e57QHeHob8kKJ5FhcXGdzx5O6Ktuc5RHMDIkeqhgkFA= +github.com/mackerelio/go-osstat v0.1.0/go.mod h1:1K3NeYLhMHPvzUu+ePYXtoB58wkaRpxZsGClZBJyIFw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -451,6 +453,8 @@ github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUr github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d h1:U+PMnTlV2tu7RuMK5etusZG3Cf+rpow5hqQByeCzJ2g= github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d/go.mod h1:lXfE4PvvTW5xOjO6Mba8zDPyw8M93B6AQ7frTGnMlA8= +github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= +github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -647,8 +651,16 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +<<<<<<< HEAD github.com/tikv/pd v1.1.0-beta.0.20200907080620-6830f5bb92a2 h1:cC5v/gn9NdcmAlpBrWI5x3MiYmQcW2k7EHccg8837p4= github.com/tikv/pd v1.1.0-beta.0.20200907080620-6830f5bb92a2/go.mod h1:6OYi62ks7nFIBtWWpOjnngr5LNos4Hvi1BzArCWAlBc= +======= +github.com/tikv/pd v1.1.0-beta.0.20200818122340-ef1a4e920b2f/go.mod h1:mwZ3Lip1YXgtgBx6blADUPMxrqPGCfwABlreDzuJul8= +github.com/tikv/pd v1.1.0-beta.0.20200907085700-5b04bec39b99 h1:p2EYnx1jl7VJ5AuYoit0bDxGndYmsiUquDSAph5Ao1Q= +github.com/tikv/pd v1.1.0-beta.0.20200907085700-5b04bec39b99/go.mod h1:h0GTvNPZrjA06ToexaL13DxlzAvm/6kkJWz12baD68M= +github.com/tinylib/msgp v1.1.2 h1:gWmO7n0Ys2RBEb7GPYB9Ujq8Mk5p2U08lRnmMcGy6BQ= +github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +>>>>>>> ad1354b... optimization: Unified Sorter (#972) github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= @@ -731,7 +743,6 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= -go.uber.org/zap v1.15.0 h1:ZZCA22JRF2gQE5FoNmhmrf7jeJJ2uhqDUNRYKm8dvmM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= @@ -748,9 +759,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -811,9 +820,7 @@ golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200904194848-62affa334b73 h1:MXfv8rhZWmFeqX3GNZRsd6vOLoaCHjYEX3qkRo3YBUA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201022231255-08b38378de70 h1:Z6x4N9mAi4oF0TbHweCsH618MO6OI6UFgV0FP5n0wBY= golang.org/x/net v0.0.0-20201022231255-08b38378de70/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -840,6 +847,7 @@ golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190410235845-0ad05ae3009d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -862,7 +870,6 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200819171115-d785dc25833f h1:KJuwZVtZBVzDmEDtB2zro9CXkD9O0dpCv4o2LHbQIAw= golang.org/x/sys v0.0.0-20200819171115-d785dc25833f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -969,7 +976,6 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/integration/framework/avro/kafka_single_table.go b/integration/framework/avro/kafka_single_table.go index 5395d674195..ca8b36ea6a0 100644 --- a/integration/framework/avro/kafka_single_table.go +++ b/integration/framework/avro/kafka_single_table.go @@ -17,13 +17,12 @@ import ( "bytes" "database/sql" "fmt" - "io/ioutil" - "net/http" - "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "go.uber.org/zap" + "io/ioutil" + "net/http" + "time" "github.com/pingcap/ticdc/integration/framework" ) diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go new file mode 100644 index 00000000000..ef76c980cd5 --- /dev/null +++ b/pkg/config/sorter.go @@ -0,0 +1,47 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import "sync" + +// SorterConfig represents sorter config for a changefeed +type SorterConfig struct { + // number of concurrent heap sorts + NumConcurrentWorker int `toml:"num-concurrent-workers" json:"num-concurrent-workers"` + // maximum size for a heap + ChunkSizeLimit uint64 `toml:"chunk-size-limit" json:"chunk-size-limit"` + // the maximum memory use percentage that allows in-memory sorting + MaxMemoryPressure int `toml:"max-memory-pressure" json:"max-memory-pressure"` + // the maximum memory consumption allowed for in-memory sorting + MaxMemoryConsumption uint64 `toml:"max-memory-consumption" json:"max-memory-consumption"` +} + +var ( + sorterConfig *SorterConfig + mu sync.Mutex +) + +// GetSorterConfig returns the process-local sorter config +func GetSorterConfig() *SorterConfig { + mu.Lock() + defer mu.Unlock() + return sorterConfig +} + +// SetSorterConfig sets the process-local sorter config +func SetSorterConfig(config *SorterConfig) { + mu.Lock() + defer mu.Unlock() + sorterConfig = config +} diff --git a/pkg/util/cancel_monitor.go b/pkg/util/cancel_monitor.go new file mode 100644 index 00000000000..d94e4e0a60d --- /dev/null +++ b/pkg/util/cancel_monitor.go @@ -0,0 +1,49 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "github.com/pingcap/log" + "go.uber.org/zap" + "time" +) + +// MonitorCancelLatency monitors the latency from ctx being cancelled and the returned function being called +func MonitorCancelLatency(ctx context.Context, identifier string) func() { + finishedCh := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + log.Debug("MonitorCancelLatency: Cancelled", zap.String("identifier", identifier)) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + elapsed := 0 + for { + select { + case <-finishedCh: + log.Debug("MonitorCancelLatency: Monitored routine exited", zap.String("identifier", identifier)) + return + case <-ticker.C: + elapsed++ + log.Warn("MonitorCancelLatency: Cancellation is taking too long", zap.String("identifier", identifier), zap.Int("elapsed seconds", elapsed), zap.Error(ctx.Err())) + } + } + case <-finishedCh: + } + }() + return func() { + close(finishedCh) + } +} diff --git a/scripts/check-copyright.sh b/scripts/check-copyright.sh index c9d7802236e..7e482c62d58 100755 --- a/scripts/check-copyright.sh +++ b/scripts/check-copyright.sh @@ -1,6 +1,6 @@ copyright="// Copyright $(date '+%Y') PingCAP, Inc." -result=$(find ./ -name "*.go" | grep -vE '.pb.go|vendor/|leaktest.go' | while read file_path; do +result=$(find ./ -name "*.go" | grep -vE '.pb.go|vendor/|leaktest.go|kv_gen' | while read file_path; do head=`cat "${file_path}" | head -n 1` if [ "$head" != "$copyright" ];then echo "${file_path}" diff --git a/testing_utils/many_sorters_test/many_sorters.go b/testing_utils/many_sorters_test/many_sorters.go new file mode 100644 index 00000000000..0acfbc2428e --- /dev/null +++ b/testing_utils/many_sorters_test/many_sorters.go @@ -0,0 +1,150 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "math/rand" + "net/http" + "os" + "strings" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/puller" + pullerSorter "github.com/pingcap/ticdc/cdc/puller/sorter" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") +var numSorters = flag.Int("num-sorters", 256, "number of instances of sorters") +var numEvents = flag.Int("num-events-per-sorter", 10000, "number of events sent to a sorter") +var percentageResolves = flag.Int("percentage-resolve-events", 70, "percentage of resolved events") + +func main() { + flag.Parse() + log.SetLevel(zap.DebugLevel) + err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") + if err != nil { + log.Fatal("Could not enable failpoint", zap.Error(err)) + } + + config.SetSorterConfig(&config.SorterConfig{ + NumConcurrentWorker: 4, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + }) + + go func() { + _ = http.ListenAndServe("localhost:6060", nil) + }() + + err = os.MkdirAll(*sorterDir, 0755) + if err != nil { + log.Error("sorter_stress_test:", zap.Error(err)) + } + + sorters := make([]puller.EventSorter, *numSorters) + ctx0, cancel := context.WithCancel(context.Background()) + errg, ctx := errgroup.WithContext(ctx0) + + var finishCount int32 + for i := 0; i < *numSorters; i++ { + sorters[i] = pullerSorter.NewUnifiedSorter(*sorterDir, fmt.Sprintf("test-%d", i), "0.0.0.0:0") + finalI := i + + // run sorter + errg.Go(func() error { + return printError(sorters[finalI].Run(ctx)) + }) + + // run producer + errg.Go(func() error { + for j := 0; j < *numEvents; j++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + ev := generateEvent(uint64(finalI), uint64(j<<5)) + sorters[finalI].AddEntry(ctx, ev) + } + sorters[finalI].AddEntry(ctx, model.NewResolvedPolymorphicEvent(uint64(finalI), uint64(((*numEvents)<<5)+1))) + return nil + }) + + // run consumer + errg.Go(func() error { + for { + var ev *model.PolymorphicEvent + select { + case <-ctx.Done(): + return ctx.Err() + case ev = <-sorters[finalI].Output(): + } + + if ev.CRTs == uint64(((*numEvents)<<5)+1) { + log.Info("Sorter finished", zap.Int("sorter-id", finalI)) + if atomic.AddInt32(&finishCount, 1) == int32(*numSorters) { + log.Info("Many Sorters test finished, cancelling all goroutines") + cancel() + } + return nil + } + } + }) + } + + _ = printError(errg.Wait()) + if atomic.LoadInt32(&finishCount) == int32(*numSorters) { + log.Info("Test was successful!") + } +} + +func generateEvent(region uint64, ts uint64) *model.PolymorphicEvent { + r := rand.Int() % 100 + if r < *percentageResolves { + return model.NewResolvedPolymorphicEvent(region, ts) + } + return model.NewPolymorphicEvent(&model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("keykeykey"), + Value: []byte("valuevaluevalue"), + OldValue: nil, + StartTs: ts - 5, + CRTs: ts, + RegionID: region, + }) +} + +// printError is a helper for tracing errors on function returns +func printError(err error) error { + if err != nil && errors.Cause(err) != context.Canceled && + errors.Cause(err) != context.DeadlineExceeded && + !strings.Contains(err.Error(), "context canceled") && + !strings.Contains(err.Error(), "context deadline exceeded") { + + log.Warn("Unified Sorter: Error detected", zap.Error(err)) + } + return err +} diff --git a/testing_utils/sorter_stress_test/sorter_stress.go b/testing_utils/sorter_stress_test/sorter_stress.go new file mode 100644 index 00000000000..2f058397a6f --- /dev/null +++ b/testing_utils/sorter_stress_test/sorter_stress.go @@ -0,0 +1,159 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "math/rand" + "net/http" + _ "net/http/pprof" + "os" + "strings" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/puller" + pullerSorter "github.com/pingcap/ticdc/cdc/puller/sorter" + "github.com/pingcap/ticdc/pkg/config" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var sorterDir = flag.String("dir", "./sorter", "temporary directory used for sorting") +var numBatches = flag.Int("num-batches", 256, "number of batches of ordered events") +var msgsPerBatch = flag.Int("num-messages-per-batch", 102400, "number of events in a batch") +var bytesPerMsg = flag.Int("bytes-per-message", 1024, "number of bytes in an event") + +func main() { + flag.Parse() + log.SetLevel(zap.DebugLevel) + err := failpoint.Enable("github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug", "return(true)") + if err != nil { + log.Fatal("Could not enable failpoint", zap.Error(err)) + } + + config.SetSorterConfig(&config.SorterConfig{ + NumConcurrentWorker: 8, + ChunkSizeLimit: 1 * 1024 * 1024 * 1024, + MaxMemoryPressure: 60, + MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, + }) + + go func() { + _ = http.ListenAndServe("localhost:6060", nil) + }() + + err = os.MkdirAll(*sorterDir, 0755) + if err != nil { + log.Error("sorter_stress_test:", zap.Error(err)) + } + + sorter := pullerSorter.NewUnifiedSorter(*sorterDir, "test", "0.0.0.0:0") + + ctx1, cancel := context.WithCancel(context.Background()) + + eg, ctx := errgroup.WithContext(ctx1) + + eg.Go(func() error { + return sorter.Run(ctx) + }) + + // launch the consumer + eg.Go(func() error { + counter := 0 + lastTs := uint64(0) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case event := <-sorter.Output(): + if event.RawKV.OpType != model.OpTypeResolved { + if event.CRTs < lastTs { + panic("regressed") + } + lastTs = event.CRTs + counter += 1 + if counter%10000 == 0 { + log.Debug("Messages received", zap.Int("counter", counter)) + } + if counter >= *numBatches**msgsPerBatch { + log.Debug("Unified Sorter test successful") + cancel() + return nil + } + } + } + } + }) + + eg1 := errgroup.Group{} + for i := 0; i < *numBatches; i++ { + eg1.Go(func() error { + generateGroup(ctx, sorter) + return nil + }) + } + + err = eg1.Wait() + if err != nil { + log.Error("sorter_stress_test:", zap.Error(err)) + } + + sorter.AddEntry(ctx, model.NewResolvedPolymorphicEvent(0, uint64((*msgsPerBatch<<5)+256))) + + err = eg.Wait() + if err != nil { + if strings.Contains(err.Error(), "context canceled") { + return + } + log.Error("sorter_stress_test:", zap.Error(err)) + } +} + +func generateGroup(ctx context.Context, sorter puller.EventSorter) { + for i := 0; i < *msgsPerBatch; i++ { + ts := (i << 5) + rand.Intn(256) + event := model.NewPolymorphicEvent(newMockRawKV(uint64(ts))) + sorter.AddEntry(ctx, event) + } +} + +var ( + key = []byte(randSeq(10)) + value = []byte(randSeq(*bytesPerMsg)) +) + +func newMockRawKV(ts uint64) *model.RawKVEntry { + return &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: key, + Value: value, + OldValue: nil, + StartTs: ts - 5, + CRTs: ts, + RegionID: 0, + } +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randSeq(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +} diff --git a/tests/_utils/cleanup_process b/tests/_utils/cleanup_process index bb54566a04f..2777ae572aa 100755 --- a/tests/_utils/cleanup_process +++ b/tests/_utils/cleanup_process @@ -8,7 +8,7 @@ killall $process || true counter=0 while [ $counter -lt $retry_count ]; do - pgrep $process > /dev/null 2>&1 + pgrep -x $process > /dev/null 2>&1 ret=$? if [ "$ret" != "0" ]; then echo "process $process already exit" diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index 21382b213b8..6be9a0625cc 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -41,3 +41,4 @@ DOWN_TIKV_STATUS_PORT=${DOWN_TIKV_STATUS_PORT:-21180} TLS_TIKV_HOST=${TLS_TIKV_HOST:-127.0.0.1} TLS_TIKV_PORT=${TLS_TIKV_PORT:-22160} TLS_TIKV_STATUS_PORT=${TLS_TIKV_STATUS_PORT:-22180} + diff --git a/tests/file_sort/conf/diff_config.toml b/tests/unified_sorter/conf/diff_config.toml similarity index 94% rename from tests/file_sort/conf/diff_config.toml rename to tests/unified_sorter/conf/diff_config.toml index 5124e9d3bd5..9dc1558cba8 100644 --- a/tests/file_sort/conf/diff_config.toml +++ b/tests/unified_sorter/conf/diff_config.toml @@ -10,7 +10,7 @@ fix-sql-file = "fix.sql" # tables need to check. [[check-tables]] - schema = "file_sort" + schema = "unified_sort" tables = ["~usertable.*"] [[source-db]] diff --git a/tests/file_sort/conf/workload b/tests/unified_sorter/conf/workload similarity index 90% rename from tests/file_sort/conf/workload rename to tests/unified_sorter/conf/workload index 1763faf2338..4c7f6583e40 100644 --- a/tests/file_sort/conf/workload +++ b/tests/unified_sorter/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=15000 +recordcount=45000 operationcount=0 workload=core diff --git a/tests/file_sort/run.sh b/tests/unified_sorter/run.sh similarity index 50% rename from tests/file_sort/run.sh rename to tests/unified_sorter/run.sh index 1b669bb89b8..be4d51af0a5 100755 --- a/tests/file_sort/run.sh +++ b/tests/unified_sorter/run.sh @@ -5,7 +5,7 @@ set -e CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) source $CUR/../_utils/test_prepare WORK_DIR=$OUT_DIR/$TEST_NAME -CDC_BINARY=cdc.test +CDC_BINARY=cdc SINK_TYPE=$1 CDC_COUNT=3 @@ -19,45 +19,47 @@ function run() { cd $WORK_DIR start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) - run_sql "CREATE DATABASE file_sort;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=file_sort - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info" + run_sql "CREATE DATABASE unified_sort;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=unified_sort + + export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/puller/sorter/sorterDebug=return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY TOPIC_NAME="ticdc-sink-retry-test-$RANDOM" case $SINK_TYPE in - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&max-message-bytes=102400";; *) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";; esac - sort_dir="$WORK_DIR/file_sort_cache" + sort_dir="$WORK_DIR/unified_sort_cache" mkdir $sort_dir - run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="file" --sort-dir="$sort_dir" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="unified" --sort-dir="$sort_dir" if [ "$SINK_TYPE" == "kafka" ]; then run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan - run_sql "CREATE table file_sort.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "file_sort.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists "file_sort.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + run_sql "CREATE table unified_sort.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "unified_sort.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "unified_sort.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_sql "truncate table file_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "truncate table unified_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_sql "CREATE table file_sort.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "file_sort.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + run_sql "CREATE table unified_sort.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "unified_sort.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=file_sort - run_sql "CREATE table file_sort.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "file_sort.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=unified_sort + run_sql "CREATE table unified_sort.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "unified_sort.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_sql "create table file_sort.USERTABLE2 like file_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into file_sort.USERTABLE2 select * from file_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table file_sort.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "file_sort.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists "file_sort.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + run_sql "create table unified_sort.USERTABLE2 like unified_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into unified_sort.USERTABLE2 select * from unified_sort.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table unified_sort.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "unified_sort.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "unified_sort.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml