Skip to content

Commit

Permalink
Merge branch 'master' into changeOptimistBehavior
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored May 16, 2022
2 parents 6dd59dd + 46136be commit 91b30ab
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 121 deletions.
26 changes: 6 additions & 20 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<!--
Thank you for contributing to TiDB-CDC! Please read MD's [CONTRIBUTING](https://github.com/pingcap/tidb-cdc/blob/master/CONTRIBUTING.md) document **BEFORE** filing this PR.
Thank you for contributing to TiFlow!
Please read MD's [CONTRIBUTING](https://github.com/pingcap/tiflow/blob/master/CONTRIBUTING.md) document **BEFORE** filing this PR.
-->

### What problem does this PR solve?
Expand All @@ -19,37 +20,22 @@ Issue Number: close #xxx

### Check List <!--REMOVE the items that are not applicable-->

Tests <!-- At least one of them must be included. -->
#### Tests <!-- At least one of them must be included. -->

- Unit test
- Integration test
- Manual test (add detailed scripts or steps below)
- No code

Code changes
#### Questions <!-- Authors should answer these questions and reviewers should consider these questions. -->

- Has exported function/method change
- Has exported variable/fields change
- Has interface methods change
- Has persistent data change
##### Will it cause performance regression or break compatibility?

Side effects

- Possible performance regression
- Increased code complexity
- Breaking backward compatibility

Related changes

- Need to cherry-pick to the release branch
- Need to update the documentation
- Need to update key monitor metrics in both TiCDC document and official document
##### Do you need to update user documentation, design documentation or monitoring documentation?

### Release note <!-- bugfixes or new feature need a release note -->

```release-note
Please add a release note.
Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.
If you don't think this PR needs a release note then fill it with `None`.
Expand Down
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@

## Introduction

**TiFlow** is a unified data replication platform around [TiDB](https://docs.pingcap.com/tidb/stable), including two main components: DM and TiCDC. DM supports full data migration and incremental data replication from MySQL/MariaDB into [TiDB](https://docs.pingcap.com/tidb/stable). TiCDC supports replicating change data to various downstreams, including MySQL protocol-compatible databases, message queues via the open TiCDC protocol and other systems such as local file storage. More details can be found in [DM README](./README_DM.md) and [TiCDC README](./README_TiCDC.md).
**TiFlow** is a unified data replication platform around [TiDB](https://docs.pingcap.com/tidb/stable),
including two main components:

* DM supports full data migration and incremental data replication from MySQL/MariaDB
into [TiDB](https://docs.pingcap.com/tidb/stable).
* TiCDC supports replicating change data to various downstreams, including MySQL protocol-compatible databases
and [Kafka](https://kafka.apache.org/).

More details can be found in [DM README](./README_DM.md) and [TiCDC README](./README_TiCDC.md).

## License

Expand Down
15 changes: 9 additions & 6 deletions README_TiCDC.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# TiCDC

**TiCDC** is [TiDB](https://docs.pingcap.com/tidb/stable)'s change data capture framework. It supports replicating change data to various downstreams, including MySQL protocol-compatible databases, message queues via the open CDC protocol and other systems such as local file storage.
**TiCDC** is [TiDB](https://docs.pingcap.com/tidb/stable)'s change data capture framework.
It supports replicating change data to various downstreams, including MySQL protocol-compatible databases
and [Kafka](https://kafka.apache.org/).

## Architecture

Expand All @@ -23,21 +25,22 @@ See a detailed introduction to [the TiCDC architecture](https://docs.pingcap.com
To check the source code, run test cases and build binaries, you can simply run:

```bash
$ make
$ make cdc
$ make test
```

Note that TiCDC supports building with Go version `Go >= 1.16`.
Note that TiCDC supports building with Go version `Go >= 1.18`.

When TiCDC is built successfully, you can find binary in the `bin` directory. Instructions for unit test and integration
test can be found in [Running tests](./tests/integration_tests/README.md).

## Deployment

You can setup a CDC cluster for replication test manually as following:
You can set up a CDC cluster for replication test manually as following:

1. Setup a TiDB cluster.
2. Start a CDC cluster, which contains one or more CDC servers. The command to start on CDC server is `cdc server --pd http://10.0.10.25:2379`, where `http://10.0.10.25:2379` is the client-url of pd-server.
1. Set up a TiDB cluster.
2. Start a CDC cluster, which contains one or more CDC servers. The command to start on CDC server
is `cdc server --pd http://10.0.10.25:2379`, where `http://10.0.10.25:2379` is the client-url of pd-server.
3. Start a replication changefeed by `cdc cli changefeed create --pd http://10.0.10.25:2379 --start-ts 413105904441098240 --sink-uri mysql://root:123456@127.0.0.1:3306/`. The TSO is TiDB `timestamp oracle`. If it is not provided or set to zero, the TSO of start time will be used. Currently, we support MySQL protocol-compatible databases as downstream sinks only, and will add more sink types in the future.

For details, see [Deploy TiCDC](https://docs.pingcap.com/tidb/stable/deploy-ticdc).
Expand Down
9 changes: 2 additions & 7 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,10 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Init(tableID model.TableID) error {
func (s *mockSink) AddTable(tableID model.TableID) error {
return nil
}

func (s *mockSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) {
_ = s.EmitRowChangedEvents(ctx, rows...)
return true, nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down Expand Up @@ -93,7 +88,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (s *mockSink) RemoveTable(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
13 changes: 2 additions & 11 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,10 @@ type blackHoleSink struct {
lastAccumulated uint64
}

// Init table sink resources
func (b *blackHoleSink) Init(tableID model.TableID) error {
func (b *blackHoleSink) AddTable(tableID model.TableID) error {
return nil
}

func (b *blackHoleSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) {
err := b.EmitRowChangedEvents(ctx, rows...)
if err != nil {
return false, err
}
return true, nil
}

func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row))
Expand Down Expand Up @@ -87,6 +78,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (b *blackHoleSink) RemoveTable(ctx context.Context, tableID model.TableID) error {
return nil
}
30 changes: 14 additions & 16 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,7 @@ func newMqSink(
return s, nil
}

// TryEmitRowChangedEvents just calls EmitRowChangedEvents internally,
// it still blocking in current implementation.
// TODO(dongmen): We should make this method truly non-blocking after we remove buffer sink
func (k *mqSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) {
err := k.EmitRowChangedEvents(ctx, rows...)
if err != nil {
return false, err
}
return true, nil
}

// Init table sink resources
func (k *mqSink) Init(tableID model.TableID) error {
func (k *mqSink) AddTable(tableID model.TableID) error {
// We need to clean up the old values of the table,
// otherwise when the table is dispatched back again,
// it may read the old values.
Expand All @@ -155,6 +143,8 @@ func (k *mqSink) Init(tableID model.TableID) error {
return nil
}

// EmitRowChangedEvents emits row changed events to the flush worker by paritition.
// Concurrency Note: This method is thread-safe.
func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
rowsCount := 0
for _, row := range rows {
Expand Down Expand Up @@ -187,7 +177,10 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
return nil
}

// FlushRowChangedEvents is thread-safety
// FlushRowChangedEvents asynchronously ensures
// that the data before the resolvedTs has been
// successfully written downstream.
// FlushRowChangedEvents is thread-safe.
func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
var checkpointTs uint64
v, ok := k.tableCheckpointTsMap.Load(tableID)
Expand Down Expand Up @@ -241,6 +234,9 @@ func (k *mqSink) flushTsToWorker(ctx context.Context, resolvedTs model.Ts) error
return nil
}

// EmitCheckpointTs emits the checkpointTs to
// default topic or the topics of all tables.
// Concurrency Note: EmitCheckpointTs is thread-safe.
func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model.TableName) error {
encoder := k.encoderBuilder.Build()
msg, err := encoder.EncodeCheckpointEvent(ts)
Expand Down Expand Up @@ -281,6 +277,8 @@ func (k *mqSink) EmitCheckpointTs(ctx context.Context, ts uint64, tables []model
return nil
}

// EmitDDLEvent sends a DDL event to the default topic or the table's corresponding topic.
// Concurrency Note: EmitDDLEvent is thread-safe.
func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if k.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Type, ddl.TableInfo.Schema, ddl.TableInfo.Table) {
log.Info(
Expand Down Expand Up @@ -339,8 +337,8 @@ func (k *mqSink) Close(ctx context.Context) error {
return nil
}

func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in mq sink has flushed
func (k *mqSink) RemoveTable(cxt context.Context, tableID model.TableID) error {
// RemoveTable does nothing because FlushRowChangedEvents in mq sink had flushed
// all buffered events by force.
return nil
}
Expand Down
22 changes: 9 additions & 13 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,18 @@ func NewMySQLSink(
return sink, nil
}

// TryEmitRowChangedEvents just calls EmitRowChangedEvents internally.
func (s *mysqlSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) {
_ = s.EmitRowChangedEvents(ctx, rows...)
return true, nil
}

// EmitRowChangedEvents appends row changed events to the txn cache.
// Concurrency Note: EmitRowChangedEvents is thread-safe.
func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
count := s.txnCache.Append(s.filter, rows...)
s.statistics.AddRowsCount(count)
return nil
}

// FlushRowChangedEvents will flush all received events, we don't allow mysql
// sink to receive events before resolving
// FlushRowChangedEvents will flush all received events,
// we do not write data downstream until we receive resolvedTs.
// Concurrency Note: FlushRowChangedEvents is thread-safe.
func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
// Since CDC does not guarantee exactly once semantic, it won't cause any problem
// here even if the table was moved or removed.
// ref: https://github.com/pingcap/tiflow/pull/4356#discussion_r787405134
v, ok := s.tableMaxResolvedTs.Load(tableID)
if !ok || v.(uint64) < resolvedTs {
s.tableMaxResolvedTs.Store(tableID, resolvedTs)
Expand Down Expand Up @@ -296,6 +290,8 @@ func (s *mysqlSink) EmitCheckpointTs(_ context.Context, ts uint64, _ []model.Tab
return nil
}

// EmitDDLEvent executes DDL event.
// Concurrency Note: EmitDDLEvent is thread-safe.
func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
if s.filter.ShouldIgnoreDDLEvent(ddl.StartTs, ddl.Type, ddl.TableInfo.Schema, ddl.TableInfo.Table) {
log.Info(
Expand Down Expand Up @@ -507,7 +503,7 @@ func (s *mysqlSink) dispatchAndExecTxns(ctx context.Context, txnsGroup map[model
s.notifyAndWaitExec(ctx)
}

func (s *mysqlSink) Init(tableID model.TableID) error {
func (s *mysqlSink) AddTable(tableID model.TableID) error {
s.cleanTableResource(tableID)
return nil
}
Expand Down Expand Up @@ -539,7 +535,7 @@ func (s *mysqlSink) Close(ctx context.Context) error {
return cerror.WrapError(cerror.ErrMySQLConnectionError, err)
}

func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (s *mysqlSink) RemoveTable(ctx context.Context, tableID model.TableID) error {
defer s.cleanTableResource(tableID)

warnDuration := 3 * time.Minute
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
}, retry.WithBackoffBaseDelay(20), retry.WithMaxTries(10), retry.WithIsRetryableErr(cerror.IsRetryableError))
require.Nil(t, err)

err = sink.Barrier(ctx, 2)
err = sink.RemoveTable(ctx, 2)
require.Nil(t, err)
v, ok := sink.tableMaxResolvedTs.Load(2)
require.False(t, ok)
Expand Down Expand Up @@ -1900,7 +1900,7 @@ func TestCleanTableResource(t *testing.T) {
s.tableMaxResolvedTs.Store(tblID, uint64(2))
_, ok := s.txnCache.unresolvedTxns[tblID]
require.True(t, ok)
require.Nil(t, s.Init(tblID))
require.Nil(t, s.AddTable(tblID))
_, ok = s.txnCache.unresolvedTxns[tblID]
require.False(t, ok)
_, ok = s.tableCheckpointTs.Load(tblID)
Expand Down
9 changes: 2 additions & 7 deletions cdc/sink/mysql/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,10 @@ func NewSimpleMySQLSink(
return sink, nil
}

// Init table sink resources
func (s *simpleMySQLSink) Init(tableID model.TableID) error {
func (s *simpleMySQLSink) AddTable(tableID model.TableID) error {
return nil
}

func (s *simpleMySQLSink) TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) {
return true, nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down Expand Up @@ -213,7 +208,7 @@ func (s *simpleMySQLSink) Close(ctx context.Context) error {
return s.db.Close()
}

func (s *simpleMySQLSink) Barrier(ctx context.Context, tableID model.TableID) error {
func (s *simpleMySQLSink) RemoveTable(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
Loading

0 comments on commit 91b30ab

Please sign in to comment.