Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Jdbc-Connector] Add OceanBase Connector #4626

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
| user | String | No | - |
| password | String | No | - |
| query | String | No | - |
| driver_type | String | No | - |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| database | String | No | - |
| table | String | No | - |
| primary_keys | Array | No | - |
Expand Down Expand Up @@ -68,6 +69,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes

Use this sql write upstream input datas to database. e.g `INSERT ...`

### driver_type [string]

Use this field to represent the OceanBase driver. e.g 'mysql'

### database [string]

Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect.
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
| driver_type | String | Yes | - |
| connection_check_timeout_sec | Int | No | 30 |
| partition_column | String | No | - |
| partition_upper_bound | Long | No | - |
Expand Down Expand Up @@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes

Query statement

### driver_type [string]

Use this field to represent the OceanBase driver. e.g 'mysql'

### connection_check_timeout_sec [int]

The time in seconds to wait for the database operation used to validate the connection to complete.
Expand Down
12 changes: 11 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
<vertica.version>12.0.3-0</vertica.version>
<oceanbase.version>2.4.2</oceanbase.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -129,6 +130,12 @@
<version>${vertica.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>${oceanbase.version}</version>
<scope>provided</scope>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not be imported directly due to the lgpl license https://github.com/oceanbase/obconnector-j

</dependencies>
</dependencyManagement>

Expand All @@ -144,7 +151,10 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public AbstractJdbcCatalog(
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = urlInfo.getOrigin();
this.suffix = urlInfo.getSuffix();

this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl, Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable {

public String url;
public String driverName;
public String driverType;
public int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
Expand All @@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable {
public static JdbcConnectionConfig of(ReadonlyConfig config) {
JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder();
builder.url(config.get(JdbcOptions.URL));
builder.driverType(config.get(JdbcOptions.DRIVER_TYPE));
builder.driverName(config.get(JdbcOptions.DRIVER));
builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT));
builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
Expand All @@ -73,6 +75,10 @@ public String getDriverName() {
return driverName;
}

public Optional<String> getDriverType() {
return Optional.ofNullable(driverType);
}

public boolean isAutoCommit() {
return autoCommit;
}
Expand Down Expand Up @@ -120,6 +126,7 @@ public static JdbcConnectionConfig.Builder builder() {
public static final class Builder {
private String url;
private String driverName;
private String driverType;
private int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
Expand All @@ -145,6 +152,11 @@ public Builder driverName(String driverName) {
return this;
}

public Builder driverType(String driverType) {
this.driverType = driverType;
return this;
}

public Builder connectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
Expand Down Expand Up @@ -205,6 +217,7 @@ public JdbcConnectionConfig build() {
jdbcConnectionConfig.batchSize = this.batchSize;
jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs;
jdbcConnectionConfig.driverName = this.driverName;
jdbcConnectionConfig.driverType = this.driverType;
jdbcConnectionConfig.maxRetries = this.maxRetries;
jdbcConnectionConfig.password = this.password;
jdbcConnectionConfig.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface JdbcOptions {
.intType()
.defaultValue(30)
.withDescription("connection check time second");
Option<String> DRIVER_TYPE =
Options.key("driver_type").stringType().noDefaultValue().withDescription("driver_type");

Option<Integer> MAX_RETRIES =
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class JdbcSourceConfig implements Serializable {

private JdbcConnectionConfig jdbcConnectionConfig;
public String query;
public String driverType;
private String partitionColumn;
private Long partitionUpperBound;
private Long partitionLowerBound;
Expand All @@ -43,6 +44,7 @@ public static JdbcSourceConfig of(ReadonlyConfig config) {
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
builder.query(config.get(JdbcOptions.QUERY));
builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE));
config.getOptional(JdbcOptions.DRIVER_TYPE).ifPresent(builder::driverType);
config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn);
config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND)
.ifPresent(builder::partitionUpperBound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import java.util.Optional;

/**
* A factory to create a specific {@link JdbcDialect}
*
Expand All @@ -33,7 +35,7 @@ public interface JdbcDialectFactory {
* @return <code>true</code> if this dialect understands the given URL; <code>false</code>
* otherwise.
*/
boolean acceptsURL(String url);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the driverType param shoud be passed in the create method below, and we can use a new default method to keep the other dialects unchanged.

default JdbcDialect create(String driverType) {
    return create();
}

boolean acceptsURL(String url, Optional<String> driverTye);

/** @return Creates a new instance of the {@link JdbcDialect}. */
JdbcDialect create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
Expand All @@ -44,7 +45,7 @@ private JdbcDialectLoader() {}
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url) {
public static JdbcDialect load(String url, Optional<String> driverTye) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

Expand All @@ -57,7 +58,9 @@ public static JdbcDialect load(String url) {
}

final List<JdbcDialectFactory> matchingFactories =
foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());
foundFactories.stream()
.filter(f -> f.acceptsURL(url, driverTye))
.collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new JdbcConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

/** Factory for {@link DB2Dialect}. */
@AutoService(JdbcDialectFactory.class)
public class DB2DialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:db2:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

/** Factory for {@link DmdbDialect}. */
@AutoService(JdbcDialectFactory.class)
public class DmdbDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:dm:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(JdbcDialectFactory.class)
public class Gbase8aDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:gbase:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import com.google.auto.service.AutoService;
import lombok.NonNull;

import java.util.Optional;

@AutoService(JdbcDialectFactory.class)
public class GreenplumDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(@NonNull String url) {
public boolean acceptsURL(@NonNull String url, Optional<String> driverTye) {
// Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
return url.startsWith("jdbc:pivotal:greenplum:");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

/** Factory for {@link MysqlDialect}. */
@AutoService(JdbcDialectFactory.class)
public class MySqlDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:mysql:");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(JdbcDialectFactory.class)
public class OceanBaseMySqlDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:oceanbase:")
&& driverTye.isPresent()
&& driverTye.get().equalsIgnoreCase("mysql");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add example into docs

}

@Override
public JdbcDialect create() {
return new MysqlDialect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge the two dialect classes of OceanBase and return the JdbcDialect by the driverType param here.

public JdbcDialect create(@Nonnull String driverType) {
    if ("mysql".equalsIgnoreCase(driverType)) {
        return new MysqlDialect();
    }
    return new OracleDialect();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(JdbcDialectFactory.class)
public class OceanBaseOracleDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:oceanbase:")
&& driverTye.isPresent()
&& driverTye.get().equalsIgnoreCase("oracle");
}

@Override
public JdbcDialect create() {
return new OracleDialect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

import com.google.auto.service.AutoService;

import java.util.Optional;

/** Factory for {@link OracleDialect}. */
@AutoService(JdbcDialectFactory.class)
public class OracleDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
public boolean acceptsURL(String url, Optional<String> driverTye) {
return url.startsWith("jdbc:oracle:thin:");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import com.google.auto.service.AutoService;
import lombok.NonNull;

import java.util.Optional;

@AutoService(JdbcDialectFactory.class)
public class PhoenixDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(@NonNull String url) {
public boolean acceptsURL(@NonNull String url, Optional<String> driverTye) {
return url.startsWith("jdbc:phoenix:");
}

Expand Down
Loading