Skip to content

Commit

Permalink
[api-draft][connector] Add SeaTunnel jdbc source (apache#2048)
Browse files Browse the repository at this point in the history
* Add SeaTunnel jdbc sink (apache#1946)

* Add license head

* fix checkStyle err
  • Loading branch information
ic4y authored and lhyundeadsoul committed Jun 28, 2022
1 parent cdf0f1e commit 393c8ea
Show file tree
Hide file tree
Showing 35 changed files with 2,040 additions and 69 deletions.
2 changes: 2 additions & 0 deletions seatunnel-connectors/plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.source.Http = seatunnel-connector-seatunnel-http
seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.config;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import java.io.Serializable;

public class JdbcConfig implements Serializable {

public static final String URL = "url";

public static final String DRIVER = "driver";

public static final String CONNECTION_CHECK_TIMEOUT_SEC = "connection_check_timeout_sec";

public static final String MAX_RETRIES = "max_retries";

public static final String USER = "user";

public static final String PASSWORD = "password";

public static final String QUERY = "query";

public static final String PARALLELISM = "parallelism";


public static final String BATCH_SIZE = "batch_size";

public static final String BATCH_INTERVAL_MS = "batch_interval_ms";


public static final String IS_EXACTLY_ONCE = "is_exactly_once";

public static final String XA_DATA_SOURCE_CLASS_NAME = "xa_data_source_class_name";


public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts";

public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec";


//source config
public static final String PARTITION_COLUMN = "partition_column";
public static final String PARTITION_UPPER_BOUND = "partition_upper_bound";
public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";

public static JdbcConnectionOptions buildJdbcConnectionOptions(Config config) {

JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions();
jdbcOptions.url = config.getString(JdbcConfig.URL);
jdbcOptions.driverName = config.getString(JdbcConfig.DRIVER);
if (config.hasPath(JdbcConfig.USER)) {
jdbcOptions.username = config.getString(JdbcConfig.USER);
}
if (config.hasPath(JdbcConfig.PASSWORD)) {
jdbcOptions.password = config.getString(JdbcConfig.PASSWORD);
}
jdbcOptions.query = config.getString(JdbcConfig.QUERY);

if (config.hasPath(JdbcConfig.MAX_RETRIES)) {
jdbcOptions.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES);
}
if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) {
jdbcOptions.connectionCheckTimeoutSeconds = config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC);
}
if (config.hasPath(JdbcConfig.BATCH_SIZE)) {
jdbcOptions.batchSize = config.getInt(JdbcConfig.BATCH_SIZE);
}
if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) {
jdbcOptions.batchIntervalMs = config.getInt(JdbcConfig.BATCH_INTERVAL_MS);
}

if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) {
jdbcOptions.xaDataSourceClassName = config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME);
if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) {
jdbcOptions.maxCommitAttempts = config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS);
}
if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) {
jdbcOptions.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
}
}
return jdbcOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.config;

import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;

@Data
@AllArgsConstructor
public class JdbcSinkOptions implements Serializable {
private JdbcConnectionOptions jdbcConnectionOptions;
private boolean isExactlyOnce;

public JdbcSinkOptions(Config config) {
this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE) && config.getBoolean(JdbcConfig.IS_EXACTLY_ONCE)) {
this.isExactlyOnce = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.config;

import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConfig.buildJdbcConnectionOptions;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;
import java.util.Optional;

@Data
@AllArgsConstructor
public class JdbcSourceOptions implements Serializable {
private JdbcConnectionOptions jdbcConnectionOptions;
private String partitionColumn;
private Long partitionUpperBound;
private Long partitionLowerBound;

private Integer parallelism;

public JdbcSourceOptions(Config config) {
this.jdbcConnectionOptions = buildJdbcConnectionOptions(config);
if (config.hasPath(JdbcConfig.PARTITION_COLUMN)) {
this.partitionColumn = config.getString(JdbcConfig.PARTITION_COLUMN);
}
if (config.hasPath(JdbcConfig.PARTITION_UPPER_BOUND)) {
this.partitionUpperBound = config.getLong(JdbcConfig.PARTITION_UPPER_BOUND);
}
if (config.hasPath(JdbcConfig.PARTITION_LOWER_BOUND)) {
this.partitionLowerBound = config.getLong(JdbcConfig.PARTITION_LOWER_BOUND);
}
if (config.hasPath(JdbcConfig.PARALLELISM)) {
this.parallelism = config.getInt(JdbcConfig.PARALLELISM);
}
}

public JdbcConnectionOptions getJdbcConnectionOptions() {
return jdbcConnectionOptions;
}

public Optional<String> getPartitionColumn() {
return Optional.ofNullable(partitionColumn);
}

public Optional<Long> getPartitionUpperBound() {
return Optional.ofNullable(partitionUpperBound);
}

public Optional<Long> getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}

public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}
}
Loading

0 comments on commit 393c8ea

Please sign in to comment.