Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][connector-v2] add tablestore source and sink #3309

Merged
merged 33 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e94a14d
add tablestore
liugddx Nov 5, 2022
d036fc6
fix unable to release link problem
liugddx Nov 6, 2022
77c9ad4
remove schema
liugddx Nov 6, 2022
77dd396
add doc
liugddx Nov 6, 2022
cbf1a6d
fix convert error
liugddx Nov 7, 2022
29c6897
fix deadlink error
liugddx Nov 7, 2022
6e608a0
fix code style error
liugddx Nov 7, 2022
53b5abb
fix code style error
liugddx Nov 7, 2022
8359747
revert table sink
liugddx Nov 7, 2022
0785f3b
fix deadlink
liugddx Nov 7, 2022
0de05f5
add tablestore sink
liugddx Nov 8, 2022
7a0593f
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx Nov 9, 2022
72f75d4
fix type error
liugddx Nov 9, 2022
51a56a7
add tablestore sink
liugddx Nov 9, 2022
b736f23
fix tablestore sink bug
liugddx Nov 9, 2022
448a619
fix tablestore sink bug
liugddx Nov 9, 2022
a86225b
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx Nov 9, 2022
f594236
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx Nov 11, 2022
40d2018
fix cr error
liugddx Nov 11, 2022
5896145
fix cr error
liugddx Nov 11, 2022
d5a839a
fix cr error
liugddx Nov 12, 2022
7e84bd7
add optional params
liugddx Nov 12, 2022
adfc22e
Merge remote-tracking branch 'upstream/dev' into feature-tablestore
liugddx Nov 14, 2022
5df8159
fix cr error
liugddx Nov 14, 2022
de19951
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
1be80a9
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
3e5c3eb
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
f2e4c6b
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
c2883a5
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
557fed0
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
6ff5e15
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
d76fa8a
Update seatunnel-connectors-v2/connector-tablestore/src/main/java/org…
liugddx Nov 14, 2022
ec0786c
fix cr error
liugddx Nov 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions docs/en/connector-v2/sink/Tablestore.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Tablestore

> Tablestore sink connector

## Description

Write data to `Tablestore`

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------- | ------ |----------| ------------- |
| end_point | string | yes | - |
| instance_name | string | yes | - |
| access_key_id | string | yes | - |
| access_key_secret| string | yes | - |
| table | string | yes | - |
| primary_keys | array | yes | - |
| batch_size | string | no | 25 |
| batch_interval_ms| string | no | 1000 |
| common-options | config | no | - |

### end_point [string]

endPoint to write to Tablestore.

### instanceName [string]

The instanceName of Tablestore.

### access_key_id [string]

The access id of Tablestore.

### access_key_secret [string]

The access secret of Tablestore.

### table [string]

The table of Tablestore.

### primaryKeys [array]

The primaryKeys of Tablestore.

### common options [ config ]

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Example

```bash
Tablestore {
end_point = "xxxx"
instance_name = "xxxx"
access_key_id = "xxxx"
access_key_secret = "xxxx"
table = "sink"
primary_keys = ["pk_1","pk_2","pk_3","pk_4"]
}
```

## Changelog

### next version

- Add Tablestore Sink Connector

25 changes: 14 additions & 11 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks.

there are some reference value for params above.

| datasource | driver | url | maven |
|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
| datasource | driver | url | maven |
|------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql |
| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc |

## Example

Expand Down Expand Up @@ -145,6 +146,8 @@ parallel:
- [Feature] Support StarRocks JDBC Source ([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
- [Feature] Support GBase8a JDBC Source ([3026](https://github.com/apache/incubator-seatunnel/pull/3026))
- [Feature] Support DB2 JDBC Source ([2410](https://github.com/apache/incubator-seatunnel/pull/2410))

### next version

- [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
- [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,5 @@ seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.source.MyHours = connector-http-myhours
seatunnel.sink.InfluxDB = connector-influxdb
seatunnel.source.GoogleSheets = connector-google-sheets
seatunnel.source.GoogleSheets = connector-google-sheets
seatunnel.sink.Tablestore = connector-tablestore
20 changes: 19 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
<db2.version>db2jcc4</db2.version>
<tablestore.version>5.13.9</tablestore.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -84,11 +85,24 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-jdbc</artifactId>
<version>${tablestore.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
</dependencyManagement>

<dependencies>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down Expand Up @@ -119,6 +133,10 @@
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>tablestore-jdbc</artifactId>
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

/**
Expand Down Expand Up @@ -67,4 +69,9 @@ default PreparedStatement creatPreparedStatement(Connection connection, String q
return statement;
}

default ResultSetMetaData getResultSetMetaData(Connection conn, JdbcSourceOptions jdbcSourceOptions) throws SQLException {
PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
return ps.getMetaData();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;

import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TablestoreDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Tablestore";
}

@Override
public JdbcRowConverter getRowConverter() {
return new TablestoreJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new TablestoreTypeMapper();
}

@Override
public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
PreparedStatement statement = connection.prepareStatement(queryTemplate);
statement.setFetchSize(fetchSize);
return statement;
}

@Override
public ResultSetMetaData getResultSetMetaData(Connection conn, JdbcSourceOptions jdbcSourceOptions) throws SQLException {
PreparedStatement ps = conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
return ps.executeQuery().getMetaData();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/**
* Factory for {@link TablestoreDialect}.
*/

@AutoService(JdbcDialectFactory.class)
public class TablestoreDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:ots:https:");
}

@Override
public JdbcDialect create() {
return new TablestoreDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TablestoreJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Tablestore";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Loading