Skip to content

Commit

Permalink
[Feature][Connector-v2] Support schema evolution for Oracle connector
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Nov 5, 2024
1 parent 019af39 commit 510cc0a
Show file tree
Hide file tree
Showing 40 changed files with 3,116 additions and 272 deletions.
101 changes: 98 additions & 3 deletions docs/en/concept/schema-evolution.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# Schema evolution
Schema Evolution means that the schema of a data table can be changed and the data synchronization task can automatically adapt to the changes of the new table structure without any other operations.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.
Now we only support the operation about `add column``drop column``rename column` and `modify column` of the table in CDC source. This feature is only support zeta engine at now.


## Supported connectors

### Source
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)
[Oracle-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Oracle-CDC.md)

### Sink
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)

Note: The schema evolution is not support the transform at now. The schema evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently not supported the default value of the column in ddl.

Note: The schema evolution is not support the transform at now.
When you use the Oracle-CDC,you can not use the username named `SYS` or `SYSTEM` to modify the table schema, otherwise the ddl event will be filtered out which can lead to the schema evolution not working.
Otherwise, If your table name start with `ORA_TEMP_` will also has the same problem.

## Enable schema evolution
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in MySQL-CDC to enable it.
Schema evolution is disabled by default in CDC source. You need configure `debezium.include.schema.changes = true` which is only supported in CDC to enable it. When you use Oracle-CDC with schema-evolution enabled, you must specify `redo_log_catalog` as `log.mining.strategy` in the `debezium` attribute.

## Examples

Expand Down Expand Up @@ -56,3 +62,92 @@ sink {
}
}
```

### Oracle-cdc -> Jdbc-Oracle
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
}
}
sink {
Jdbc {
source_table_name = "customers"
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user = "dbzuser"
password = "dbz"
generate_sink_sql = true
database = "ORCLCDB"
table = "DEBEZIUM.FULL_TYPES_SINK"
batch_size = 1
primary_keys = ["ID"]
connection.pool.size = 1
}
}
```

### Oracle-cdc -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
}
}
sink {
jdbc {
source_table_name = "customers"
url = "jdbc:mysql://oracle-host:3306/oracle_sink"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
database = oracle_sink
table = oracle_cdc_2_mysql_sink_table
primary_keys = ["ID"]
}
}
```
97 changes: 95 additions & 2 deletions docs/zh/concept/schema-evolution.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

###
[Mysql-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/MySQL-CDC.md)
[Oracle-CDC](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/source/Oracle-CDC.md)

### 目标
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)

注意: 目前模式演进不支持transform.
注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。

当你使用Oracle-CDC时,你不能使用用户名`SYS``SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。

## 启用Schema evolution功能
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = true`来启用它。
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`debezium.include.schema.changes = true`来启用它。当你使用Oracle-CDC并且启用schema-evolution时,你必须将`debezium`属性中的`log.mining.strategy`指定为`redo_log_catalog`

## 示例

Expand All @@ -38,6 +42,7 @@ source {
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
debezium = {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
}
}
Expand All @@ -57,3 +62,91 @@ sink {
}
}
```

### Oracle-cdc -> Jdbc-Oracle
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
log.mining.strategy = redo_log_catalog
}
}
}
sink {
Jdbc {
source_table_name = "customers"
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user = "dbzuser"
password = "dbz"
generate_sink_sql = true
database = "ORCLCDB"
table = "DEBEZIUM.FULL_TYPES_SINK"
batch_size = 1
primary_keys = ["ID"]
connection.pool.size = 1
}
}
```

### Oracle-cdc -> Jdbc-Mysql
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
source.reader.close.timeout = 120000
connection.pool.size = 1
debezium {
include.schema.changes = true
}
}
}
sink {
jdbc {
source_table_name = "customers"
url = "jdbc:mysql://oracle-host:3306/oracle_sink"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"
generate_sink_sql = true
# You need to configure both database and table
database = oracle_sink
table = oracle_cdc_2_mysql_sink_table
primary_keys = ["ID"]
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,19 @@ private SeaTunnelRowType applyChangeColumn(
String oldColumn = changeColumnEvent.getOldColumn();
int oldColumnIndex = dataType.indexOf(oldColumn);

// The operation of rename column which only has the name of old column and the name of new
// column,
// so we need to fill the data type which is the same as the old column.
SeaTunnelDataType<?> fieldType = dataType.getFieldType(oldColumnIndex);
Column column = changeColumnEvent.getColumn();
if (column.getDataType() == null) {
column = column.copy(fieldType);
}

return applyModifyColumn(
dataType,
oldColumnIndex,
changeColumnEvent.getColumn(),
column,
changeColumnEvent.isFirst(),
changeColumnEvent.getAfterColumn());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.connectors.cdc.base.schema;

import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;

Expand All @@ -25,17 +31,24 @@
import org.apache.kafka.connect.source.SourceRecord;

import com.google.common.collect.Lists;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.HistoryRecord;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Objects;

@Slf4j
public abstract class AbstractSchemaChangeResolver implements SchemaChangeResolver {

protected static final List<String> SUPPORT_DDL = Lists.newArrayList("ALTER TABLE");

protected JdbcSourceConfig jdbcSourceConfig;
protected final JdbcSourceConfig jdbcSourceConfig;
@Setter protected transient DdlParser ddlParser;
@Setter protected transient Tables tables;
@Setter protected String sourceDialectName;

public AbstractSchemaChangeResolver(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
Expand All @@ -55,4 +68,39 @@ public boolean support(SourceRecord record) {
.map(String::toUpperCase)
.anyMatch(prefix -> ddl.toUpperCase().contains(prefix));
}

@Override
public SchemaChangeEvent resolve(SourceRecord record, SeaTunnelDataType dataType) {
TablePath tablePath = SourceRecordUtils.getTablePath(record);
String ddl = SourceRecordUtils.getDdl(record);
if (Objects.isNull(ddlParser)) {
this.ddlParser = createDdlParser(tablePath);
}
if (Objects.isNull(tables)) {
this.tables = new Tables();
}
ddlParser.setCurrentDatabase(tablePath.getDatabaseName());
ddlParser.setCurrentSchema(tablePath.getSchemaName());
// Parse DDL statement using Debezium's Antlr parser
ddlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents = getAndClearParsedEvents();
parsedEvents.forEach(e -> e.setSourceDialectName(getSourceDialectName()));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
StringUtils.EMPTY,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName()),
parsedEvents);
alterTableColumnsEvent.setStatement(ddl);
alterTableColumnsEvent.setSourceDialectName(getSourceDialectName());
return parsedEvents.isEmpty() ? null : alterTableColumnsEvent;
}

protected abstract DdlParser createDdlParser(TablePath tablePath);

protected abstract List<AlterTableColumnEvent> getAndClearParsedEvents();

protected abstract String getSourceDialectName();
}
Loading

0 comments on commit 510cc0a

Please sign in to comment.