title | summary |
---|---|
Replicate Data to Storage Services |
Learn how to replicate data to storage services using TiCDC, and learn about the storage path of the replicated data. |
Starting from TiDB v6.5.0, TiCDC supports saving row change events to storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. This document describes how to create a changefeed that replicates incremental data to such storage services using TiCDC, and how data is stored. The organization of this document is as follows:
Run the following command to create a changefeed task:
cdc cli changefeed create \
--server=http://10.0.10.25:8300 \
--sink-uri="s3://logbucket/storage_test?protocol=canal-json" \
--changefeed-id="simple-replication-task"
The output is as follows:
Info: {"upstream_id":7171388873935111376,"namespace":"default","id":"simple-replication-task","sink_uri":"s3://logbucket/storage_test?protocol=canal-json","create_time":"2022-11-29T18:52:05.566016967+08:00","start_ts":437706850431664129,"engine":"unified","config":{"case_sensitive":true,"enable_old_value":true,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["*.*"],"event_filters":null},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","schema_registry":"","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false},"column_selectors":null,"transaction_atomicity":"none","encoder_concurrency":16,"terminator":"\r\n","date_separator":"none","enable_partition_separator":false},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"storage":""}},"state":"normal","creator_version":"v6.5.0-master-dirty"}
--changefeed-id
: The ID of the changefeed. The format must match the^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$
regular expression. If this ID is not specified, TiCDC automatically generates a UUID (the version 4 format) as the ID.--sink-uri
: The downstream address of the changefeed. For details, see Configure sink URI.--start-ts
: The starting TSO of the changefeed. TiCDC starts pulling data from this TSO. The default value is the current time.--target-ts
: The ending TSO of the changefeed. TiCDC stops pulling data until this TSO. The default value is empty, which means that TiCDC does not automatically stop pulling data.--config
: The configuration file of the changefeed. For details, see TiCDC changefeed configuration parameters.
This section describes how to configure Sink URI for storage services, including Amazon S3, GCS, Azure Blob Storage, and NFS. Sink URI is used to specify the connection information of the TiCDC target system. The format is as follows:
[scheme]://[host]/[path]?[query_parameters]
For [query_parameters]
in the URI, the following parameters can be configured:
Parameter | Description | Default value | Value range |
---|---|---|---|
worker-count |
Concurrency for saving data changes to cloud storage in the downstream. | 16 |
[1, 512] |
flush-interval |
Interval for saving data changes to cloud storage in the downstream. | 5s |
[2s, 10m] |
file-size |
A data change file is stored to cloud storage if the number of bytes exceeds the value of this parameter. | 67108864 |
[1048576, 536870912] |
protocol |
The protocol format of the messages sent to the downstream. | N/A | canal-json and csv |
enable-tidb-extension |
When protocol is set to canal-json and enable-tidb-extension is set to true , TiCDC sends WATERMARK events and adds the TiDB extension field to Canal-JSON messages. |
false |
false and true |
Note:
Data change files are saved to the downstream when either
flush-interval
orfile-size
meets the requirements. Theprotocol
parameter is mandatory. If TiCDC does not receive this parameter when creating a changefeed, theCDC:ErrSinkUnknownProtocol
error is returned.
The following is an example configuration for Amazon S3:
--sink-uri="s3://bucket/prefix?protocol=canal-json"
The following is an example configuration for GCS:
--sink-uri="gcs://bucket/prefix?protocol=canal-json"
The following is an example configuration for Azure Blob Storage:
--sink-uri="azure://bucket/prefix?protocol=canal-json"
Tip:
The URI parameters of Amazon S3, GCS, and Azure Blob Storage in TiCDC are the same as their URI parameters in BR. For details, see Backup storage URI format.
The following is an example configuration for NFS:
--sink-uri="file:///my-directory/prefix?protocol=canal-json"
This section describes the storage path structure of data change records, metadata, and DDL events.
Data change records are saved to the following path:
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC{num}.{extension}
scheme
: specifies the storage type, for example,s3
,gcs
,azure
, orfile
.prefix
: specifies the user-defined parent directory, for example,s3://bucket/bbb/ccc
.schema
: specifies the schema name, for example,s3://bucket/bbb/ccc/test
.table
: specifies the table name, for example,s3://bucket/bbb/ccc/test/table1
.table-version-separator
: specifies the separator that separates the path by the table version, for example,s3://bucket/bbb/ccc/test/table1/9999
.partition-separator
: specifies the separator that separates the path by the table partition, for example,s3://bucket/bbb/ccc/test/table1/9999/20
.date-separator
: classifies the files by the transaction commit date. Value options are:none
: nodate-separator
. For example, all files withtest.table1
version being9999
are saved tos3://bucket/bbb/ccc/test/table1/9999
.year
: the separator is the year of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022
.month
: the separator is the year and month of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01
.day
: the separator is the year, month, and day of the transaction commit date, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01-02
.
num
: saves the serial number of the file that records the data change, for example,s3://bucket/bbb/ccc/test/table1/9999/2022-01-02/CDC000005.csv
.extension
: specifies the extension of the file. TiDB v6.5.0 supports the CSV and Canal-JSON formats.
Note:
The table version changes in the following three cases:
- After a DDL operation is performed, the table version is the TSO when the DDL is executed in the upstream TiDB. However, the change of the table version does not mean the change of the table schema. For example, adding a comment to a column does not cause the
schema.json
file content to change.- The changefeed process restarts. The table version is the checkpoint TSO when the process restarts. When there are many tables and the process restarts, it takes a long time to traverse all directories and find the position where each table was written last time. Therefore, data is written to a new directory with the version being the checkpoint TSO, instead of to the earlier directory.
- After a table scheduling occurs, the table version is the Changefeed checkpoint TSO when the table is scheduled to the current node.
An index file is used to prevent written data from being overwritten by mistake. It is stored in the same path as the data change records.
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/{partition-separator}/{date-separator}/CDC.index
The index file records the largest file name used in the current directory. For example:
CDC000005.csv
In this example, the files CDC000001.csv
through CDC000004.csv
in this directory are occupied. When a table scheduling or node restart occurs in the TiCDC cluster, the new node reads the index file and determines if CDC000005.csv
is occupied. If it is not occupied, the new node writes the file starting from CDC000005.csv
. If it is occupied, it starts writing from CDC000006.csv
, which prevents overwriting data written by other nodes.
Metadata is saved in the following path:
{protocol}://{prefix}/metadata
Metadata is a JSON-formatted file, for example:
{
"checkpoint-ts":433305438660591626
}
checkpoint-ts
: Transactions withcommit-ts
smaller thancheckpoint-ts
are written to the target storage in the downstream.
When DDL events cause the table version to change, TiCDC switches to a new path to write data change records. For example, when the version of test.table1
changes from 9999
to 10000
, data will be written to the path s3://bucket/bbb/ccc/test/table1/10000/2022-01-02/CDC000001.csv
. In addition, when DDL events occur, TiCDC generates a schema.json
file to save the table schema information.
Table schema information is saved in the following path:
{scheme}://{prefix}/{schema}/{table}/{table-version-separator}/schema.json
The following is a schema.json
file:
{
"Table":"table1",
"Schema":"test",
"Version":1,
"TableVersion":10000,
"Query": "ALTER TABLE test.table1 ADD OfficeLocation blob(20)",
"TableColumns":[
{
"ColumnName":"Id",
"ColumnType":"INT",
"ColumnNullable":"false",
"ColumnIsPk":"true"
},
{
"ColumnName":"LastName",
"ColumnType":"CHAR",
"ColumnLength":"20"
},
{
"ColumnName":"FirstName",
"ColumnType":"VARCHAR",
"ColumnLength":"30"
},
{
"ColumnName":"HireDate",
"ColumnType":"DATETIME"
},
{
"ColumnName":"OfficeLocation",
"ColumnType":"BLOB",
"ColumnLength":"20"
}
],
"TableColumnsTotal":"5"
}
Table
: Table name.Schema
: Schema name.Version
: Protocol version of the storage sink.TableVersion
: Table version.Query
:DDL statement.TableColumns
: An array of one or more maps, each of which describes a column in the source table.ColumnName
: Column name.ColumnType
: Column type. For details, see Data type.ColumnLength
: Column length. For details, see Data type.ColumnPrecision
: Column precision. For details, see Data type.ColumnScale
: The number of digits following the decimal point (the scale). For details, see Data type.ColumnNullable
: The column can be NULL when the value of this option istrue
.ColumnIsPk
: The column is part of the primary key when the value of this option istrue
.
TableColumnsTotal
: The size of theTableColumns
array.
This section describes the data types used in the schema.json
file. The data types are defined as T(M[, D])
. For details, see Data Types.
Integer types in TiDB are defined as IT[(M)] [UNSIGNED]
, where
IT
is the integer type, which can beTINYINT
,SMALLINT
,MEDIUMINT
,INT
,BIGINT
, orBIT
.M
is the display width of the type.
Integer types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{IT} [UNSIGNED]",
"ColumnPrecision":"{M}"
}
Decimal types in TiDB are defined as DT[(M,D)][UNSIGNED]
, where
DT
is the floating-point type, which can beFLOAT
,DOUBLE
,DECIMAL
, orNUMERIC
.M
is the precision of the data type, or the total number of digits.D
is the number of digits following the decimal point.
Decimal types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{DT} [UNSIGNED]",
"ColumnPrecision":"{M}",
"ColumnScale":"{D}"
}
Date types in TiDB are defined as DT
, where
DT
is the date type, which can beDATE
orYEAR
.
The date types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{DT}"
}
The time types in TiDB are defined as TT[(M)]
, where
TT
is the time type, which can beTIME
,DATETIME
, orTIMESTAMP
.M
is the precision of seconds in the range from 0 to 6.
The time types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{TT}",
"ColumnScale":"{M}"
}
The string types in TiDB are defined as ST[(M)]
, where
ST
is the string type, which can beCHAR
,VARCHAR
,TEXT
,BINARY
,BLOB
, orJSON
.M
is the maximum length of the string.
The string types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{ST}",
"ColumnLength":"{M}"
}
The Enum and Set types are defined as follows in schema.json
:
{
"ColumnName":"COL1",
"ColumnType":"{ENUM/SET}",
}