diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index f0dd52dbaa7..f128f6b4b21 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -33,7 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | user | String | No | - | | password | String | No | - | | query | String | No | - | -| driver_type | String | No | - | +| compatible_mode | String | No | - | | database | String | No | - | | table | String | No | - | | primary_keys | Array | No | - | @@ -70,9 +70,9 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Use this sql write upstream input datas to database. e.g `INSERT ...` -### driver_type [string] +### compatible_mode [string] -Use this field to represent the OceanBase driver. e.g 'mysql' +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. ### database [string] diff --git a/docs/en/connector-v2/sink/OceanBase.md b/docs/en/connector-v2/sink/OceanBase.md new file mode 100644 index 00000000000..054228b49b9 --- /dev/null +++ b/docs/en/connector-v2/sink/OceanBase.md @@ -0,0 +1,186 @@ +# OceanBase + +> JDBC OceanBase Sink Connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## Description + +Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | Seatunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)
INT UNSIGNED | BOOLEAN | +| TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR | INT | +| INT UNSIGNED
INTEGER UNSIGNED
BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT
FLOAT UNSIGNED | FLOAT | +| DOUBLE
DOUBLE UNSIGNED | DOUBLE | +| CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME
TIMESTAMP | TIMESTAMP | +| TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n) | BYTES | +| GEOMETRY
UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | Seatunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL
BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR
NCHAR
NVARCHAR2
NCLOB
CLOB
ROWID | STRING | +| DATE | DATE | +| TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB
RAW
LONG RAW
BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`
, the data will be flushed into the database | +| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your mysql. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + query = "insert into test_table(name,age) values(?,?)" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + +### Generate Sink SQL + +> This example not need to write complex sql statements, you can configure the database name table name to automatically generate add statements for you + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} +``` + +### CDC(Change Data Capture) Event + +> CDC change data is also supported by us In this case, you need config database, table and primary_keys. + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:3306/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + generate_sink_sql = true + # You need to configure both database and table + database = test + table = sink_table + primary_keys = ["id","name"] + } +} +``` + diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 7723f8f1590..a324316e594 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -35,7 +35,7 @@ supports query SQL and can achieve projection effect. | user | String | No | - | | password | String | No | - | | query | String | Yes | - | -| driver_type | String | Yes | - | +| compatible_mode | String | No | - | | connection_check_timeout_sec | Int | No | 30 | | partition_column | String | No | - | | partition_upper_bound | Long | No | - | @@ -64,9 +64,9 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Query statement -### driver_type [string] +### compatible_mode [string] -Use this field to represent the OceanBase driver. e.g 'mysql' +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. ### connection_check_timeout_sec [int] diff --git a/docs/en/connector-v2/source/OceanBase.md b/docs/en/connector-v2/source/OceanBase.md new file mode 100644 index 00000000000..d237db2a22b --- /dev/null +++ b/docs/en/connector-v2/source/OceanBase.md @@ -0,0 +1,168 @@ +# OceanBase + +> JDBC OceanBase Source Connector + +## Support Those Engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Read external data source data through JDBC. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | Seatunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)
INT UNSIGNED | BOOLEAN | +| TINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR | INT | +| INT UNSIGNED
INTEGER UNSIGNED
BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT
FLOAT UNSIGNED | FLOAT | +| DOUBLE
DOUBLE UNSIGNED | DOUBLE | +| CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME
TIMESTAMP | TIMESTAMP | +| TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BINARY
VARBINAR
BIT(n) | BYTES | +| GEOMETRY
UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | Seatunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL
BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR
NCHAR
NVARCHAR2
NCLOB
CLOB
ROWID | STRING | +| DATE | DATE | +| TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB
RAW
LONG RAW
BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------------------------|--------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| query | String | Yes | - | Query statement | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete | +| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type primary key and only support single column. | +| partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. | +| partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | +| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. Default value is job parallelism. | +| fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure
the row fetch size used in the query to improve performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +``` +env { + execution.parallelism = 2 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +### Parallel: + +> Read your query table in parallel with the shard field you configured and the shard data. You can do this if you want to read the whole table + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + } +} +``` + +### Parallel Boundary: + +> It is more efficient to read your data source according to the upper and lower boundaries you configured + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + partition_column = "id" + partition_num = 10 + # Read start boundary + partition_lower_bound = 1 + # Read end boundary + partition_upper_bound = 500 + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index db2366ae673..8fd427263e6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -27,7 +27,7 @@ public class JdbcConnectionConfig implements Serializable { public String url; public String driverName; - public String driverType; + public String compatibleMode; public int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -49,7 +49,7 @@ public class JdbcConnectionConfig implements Serializable { public static JdbcConnectionConfig of(ReadonlyConfig config) { JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder(); builder.url(config.get(JdbcOptions.URL)); - builder.driverType(config.get(JdbcOptions.DRIVER_TYPE)); + builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE)); builder.driverName(config.get(JdbcOptions.DRIVER)); builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT)); builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES)); @@ -75,8 +75,8 @@ public String getDriverName() { return driverName; } - public String getDriverType() { - return driverType; + public String getCompatibleMode() { + return compatibleMode; } public boolean isAutoCommit() { @@ -126,7 +126,7 @@ public static JdbcConnectionConfig.Builder builder() { public static final class Builder { private String url; private String driverName; - private String driverType; + private String compatibleMode; private int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -152,8 +152,8 @@ public Builder driverName(String driverName) { return this; } - public Builder driverType(String driverType) { - this.driverType = driverType; + public Builder compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; return this; } @@ -217,7 +217,7 @@ public JdbcConnectionConfig build() { jdbcConnectionConfig.batchSize = this.batchSize; jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs; jdbcConnectionConfig.driverName = this.driverName; - jdbcConnectionConfig.driverType = this.driverType; + jdbcConnectionConfig.compatibleMode = this.compatibleMode; jdbcConnectionConfig.maxRetries = this.maxRetries; jdbcConnectionConfig.password = this.password; jdbcConnectionConfig.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 77b32bc95ec..24ae0580f32 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -36,8 +36,12 @@ public interface JdbcOptions { .intType() .defaultValue(30) .withDescription("connection check time second"); - Option DRIVER_TYPE = - Options.key("driver_type").stringType().noDefaultValue().withDescription("driver_type"); + Option COMPATIBLE_MODE = + Options.key("compatible_mode") + .stringType() + .noDefaultValue() + .withDescription( + "The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'."); Option MAX_RETRIES = Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java index c748ff71951..00130b32acc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java @@ -33,7 +33,7 @@ public class JdbcSourceConfig implements Serializable { private JdbcConnectionConfig jdbcConnectionConfig; public String query; - public String driverType; + public String compatibleMode; private String partitionColumn; private BigDecimal partitionUpperBound; private BigDecimal partitionLowerBound; @@ -45,7 +45,7 @@ public static JdbcSourceConfig of(ReadonlyConfig config) { builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config)); builder.query(config.get(JdbcOptions.QUERY)); builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE)); - config.getOptional(JdbcOptions.DRIVER_TYPE).ifPresent(builder::driverType); + config.getOptional(JdbcOptions.COMPATIBLE_MODE).ifPresent(builder::compatibleMode); config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn); config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND) .ifPresent(builder::partitionUpperBound); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java index 94b4f9bcd3c..3baa38e3eed 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java @@ -41,10 +41,10 @@ public interface JdbcDialectFactory { /** * Create a {@link JdbcDialect} instance based on the driver type. * - * @param driverType The driver type + * @param compatibleMode The compatible mode * @return a new instance of {@link JdbcDialect} */ - default JdbcDialect create(String driverType) { + default JdbcDialect create(String compatibleMode) { return create(); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java index 9f4166ed021..b49df35ff3f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java @@ -40,12 +40,12 @@ private JdbcDialectLoader() {} * Loads the unique JDBC Dialect that can handle the given database url. * * @param url A database URL. - * @param driverType The driver type. + * @param compatibleMode The compatible mode. * @throws IllegalStateException if the loader cannot find exactly one dialect that can * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url, String driverType) { + public static JdbcDialect load(String url, String compatibleMode) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); List foundFactories = discoverFactories(cl); @@ -90,7 +90,7 @@ public static JdbcDialect load(String url, String driverType) { .collect(Collectors.joining("\n")))); } - return matchingFactories.get(0).create(driverType); + return matchingFactories.get(0).create(compatibleMode); } private static List discoverFactories(ClassLoader classLoader) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java index 836ece8601e..66df84205ed 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java @@ -36,12 +36,12 @@ public boolean acceptsURL(String url) { @Override public JdbcDialect create() { throw new UnsupportedOperationException( - "Can't create JdbcDialect without driver type for OceanBase"); + "Can't create JdbcDialect without compatible mode for OceanBase"); } @Override - public JdbcDialect create(@Nonnull String driverType) { - if ("oracle".equalsIgnoreCase(driverType)) { + public JdbcDialect create(@Nonnull String compatibleMode) { + if ("oracle".equalsIgnoreCase(compatibleMode)) { return new OracleDialect(); } return new MysqlDialect(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 4527c14f5ff..4666eae1e51 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -107,7 +107,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { this.dialect = JdbcDialectLoader.load( jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), - jdbcSinkConfig.getJdbcConnectionConfig().getDriverType()); + jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index b738b364fe7..98fb2435129 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -85,7 +85,7 @@ public TableSink createSink(TableFactoryContext context) { JdbcDialect dialect = JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(), - sinkConfig.getJdbcConnectionConfig().getDriverType()); + sinkConfig.getJdbcConnectionConfig().getCompatibleMode()); return () -> new JdbcSink( options, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index a02cbc88605..732892b21d6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -101,7 +101,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { this.jdbcDialect = JdbcDialectLoader.load( jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), - jdbcSourceConfig.getJdbcConnectionConfig().getDriverType()); + jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode()); try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { this.typeInfo = initTableField(connection); this.partitionParameter = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index dfd7164dc54..54900af4693 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -86,7 +86,7 @@ TableSource createSource(TableFactoryContext context) { JdbcDialect dialect = JdbcDialectLoader.load( config.getJdbcConnectionConfig().getUrl(), - config.getJdbcConnectionConfig().getDriverType()); + config.getJdbcConnectionConfig().getCompatibleMode()); TableSchema tableSchema = catalogTable.getTableSchema(); SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); Optional partitionParameter = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf index 992d02556e7..b33ed611855 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf @@ -34,7 +34,7 @@ source { user = root password = "" query = "select * from source" - driver_type = "mysql" + compatible_mode = "mysql" } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf index 3d85c1e5f98..53d6337198c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf @@ -34,7 +34,7 @@ source { user = "root" password = "" query = "SELECT * FROM source" - driver_type = "oracle" + compatible_mode = "oracle" } }