-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Connector-V2][JDBC] support sqlite Source & Sink #3089
Changes from 3 commits
04e2395
cf1ddc4
2981c1a
0643e4a
f3de959
6f3b792
e4bf558
45701f3
7bd5125
e587104
049f6c7
803dec9
ce5ddf0
eb321b0
0118847
7ef5bfe
d5ed310
a861767
8fbaad8
dfd660c
93468b5
18cf506
70494ae
03fba9f
976c5a6
6197196
d03bf4d
5ad4b4c
fa00b72
5c0b7e8
f4d4d33
3204610
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,6 @@ public class JdbcConfig implements Serializable { | |
|
||
public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec"; | ||
|
||
|
||
//source config | ||
public static final String PARTITION_COLUMN = "partition_column"; | ||
public static final String PARTITION_UPPER_BOUND = "partition_upper_bound"; | ||
|
@@ -95,6 +94,7 @@ public static JdbcConnectionOptions buildJdbcConnectionOptions(Config config) { | |
jdbcOptions.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert it. |
||
return jdbcOptions; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; | ||
|
||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; | ||
|
||
public class SqliteDialect implements JdbcDialect { | ||
@Override | ||
public String dialectName() { | ||
return "Sqlite"; | ||
} | ||
|
||
@Override | ||
public JdbcRowConverter getRowConverter() { | ||
return new SqliteJdbcRowConverter(); | ||
} | ||
|
||
@Override | ||
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { | ||
return new SqliteTypeMapper(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; | ||
|
||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; | ||
|
||
import com.google.auto.service.AutoService; | ||
|
||
/** | ||
* Factory for {@link SqliteDialect}. | ||
*/ | ||
|
||
@AutoService(JdbcDialectFactory.class) | ||
public class SqliteDialectFactory implements JdbcDialectFactory { | ||
@Override | ||
public boolean acceptsURL(String url) { | ||
return url.startsWith("jdbc:sqlite:"); | ||
} | ||
|
||
@Override | ||
public JdbcDialect create() { | ||
return new SqliteDialect(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; | ||
|
||
import org.apache.seatunnel.api.table.type.SeaTunnelRow; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelRowType; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; | ||
|
||
import java.sql.ResultSet; | ||
import java.sql.ResultSetMetaData; | ||
import java.sql.SQLException; | ||
|
||
public class SqliteJdbcRowConverter extends AbstractJdbcRowConverter { | ||
|
||
@Override | ||
public String converterName() { | ||
return "Sqlite"; | ||
} | ||
|
||
@Override | ||
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { | ||
return super.toInternal(rs, metaData, typeInfo); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite; | ||
|
||
import org.apache.seatunnel.api.table.type.BasicType; | ||
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; | ||
import org.apache.seatunnel.api.table.type.SeaTunnelDataType; | ||
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.sql.ResultSetMetaData; | ||
import java.sql.SQLException; | ||
|
||
@Slf4j | ||
public class SqliteTypeMapper implements JdbcDialectTypeMapper { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(SqliteTypeMapper.class); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Useless code |
||
|
||
// ============================data types===================== | ||
|
||
private static final String SQLITE_UNKNOWN = "UNKNOWN"; | ||
private static final String SQLITE_BIT = "BIT"; | ||
private static final String SQLITE_BOOLEAN = "BOOLEAN"; | ||
|
||
// -------------------------integer---------------------------- | ||
private static final String SQLITE_TINYINT = "TINYINT"; | ||
private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; | ||
private static final String SQLITE_SMALLINT = "SMALLINT"; | ||
private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; | ||
private static final String SQLITE_MEDIUMINT = "MEDIUMINT"; | ||
private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; | ||
private static final String SQLITE_INT = "INT"; | ||
private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED"; | ||
private static final String SQLITE_INTEGER = "INTEGER"; | ||
private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; | ||
private static final String SQLITE_BIGINT = "BIGINT"; | ||
private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; | ||
private static final String SQLITE_DECIMAL = "DECIMAL"; | ||
private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; | ||
private static final String SQLITE_FLOAT = "FLOAT"; | ||
private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; | ||
private static final String SQLITE_DOUBLE = "DOUBLE"; | ||
private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION"; | ||
private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; | ||
private static final String SQLITE_NUMERIC = "NUMERIC"; | ||
private static final String SQLITE_REAL = "REAL"; | ||
|
||
// -------------------------text---------------------------- | ||
private static final String SQLITE_CHAR = "CHAR"; | ||
private static final String SQLITE_CHARACTER = "CHARACTER"; | ||
private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER"; | ||
private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER"; | ||
private static final String SQLITE_NCHAR = "NCHAR"; | ||
private static final String SQLITE_VARCHAR = "VARCHAR"; | ||
private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR"; | ||
private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR"; | ||
private static final String SQLITE_NVARCHAR = "NVARCHAR"; | ||
private static final String SQLITE_TINYTEXT = "TINYTEXT"; | ||
private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT"; | ||
private static final String SQLITE_TEXT = "TEXT"; | ||
private static final String SQLITE_LONGTEXT = "LONGTEXT"; | ||
private static final String SQLITE_JSON = "JSON"; | ||
private static final String SQLITE_CLOB = "CLOB"; | ||
|
||
// ------------------------------time(text)------------------------- | ||
private static final String SQLITE_DATE = "DATE"; | ||
private static final String SQLITE_DATETIME = "DATETIME"; | ||
private static final String SQLITE_TIME = "TIME"; | ||
private static final String SQLITE_TIMESTAMP = "TIMESTAMP"; | ||
|
||
// ------------------------------blob------------------------- | ||
private static final String SQLITE_TINYBLOB = "TINYBLOB"; | ||
private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB"; | ||
private static final String SQLITE_BLOB = "BLOB"; | ||
private static final String SQLITE_LONGBLOB = "LONGBLOB"; | ||
private static final String SQLITE_BINARY = "BINARY"; | ||
private static final String SQLITE_VARBINARY = "VARBINARY"; | ||
private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY"; | ||
|
||
@Override | ||
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { | ||
String columnTypeName = metadata.getColumnTypeName(colIndex).toUpperCase().trim(); | ||
switch (columnTypeName) { | ||
case SQLITE_BIT: | ||
case SQLITE_BOOLEAN: | ||
return BasicType.BOOLEAN_TYPE; | ||
case SQLITE_TINYINT: | ||
case SQLITE_TINYINT_UNSIGNED: | ||
case SQLITE_SMALLINT: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also support short type, you can use it to map smallint and tinyint. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx, I'll test it. |
||
case SQLITE_SMALLINT_UNSIGNED: | ||
case SQLITE_MEDIUMINT: | ||
case SQLITE_MEDIUMINT_UNSIGNED: | ||
case SQLITE_INT: | ||
case SQLITE_INTEGER: | ||
return BasicType.INT_TYPE; | ||
case SQLITE_INT_UNSIGNED: | ||
case SQLITE_INTEGER_UNSIGNED: | ||
case SQLITE_BIGINT: | ||
case SQLITE_BIGINT_UNSIGNED: | ||
case SQLITE_NUMERIC: | ||
return BasicType.LONG_TYPE; | ||
case SQLITE_DECIMAL: | ||
case SQLITE_DECIMAL_UNSIGNED: | ||
case SQLITE_DOUBLE: | ||
case SQLITE_DOUBLE_PRECISION: | ||
case SQLITE_REAL: | ||
return BasicType.DOUBLE_TYPE; | ||
case SQLITE_FLOAT: | ||
return BasicType.FLOAT_TYPE; | ||
case SQLITE_FLOAT_UNSIGNED: | ||
LOG.warn("{} will probably cause value overflow.", SQLITE_FLOAT_UNSIGNED); | ||
return BasicType.FLOAT_TYPE; | ||
case SQLITE_DOUBLE_UNSIGNED: | ||
LOG.warn("{} will probably cause value overflow.", SQLITE_DOUBLE_UNSIGNED); | ||
return BasicType.DOUBLE_TYPE; | ||
case SQLITE_CHARACTER: | ||
case SQLITE_VARYING_CHARACTER: | ||
case SQLITE_NATIVE_CHARACTER: | ||
case SQLITE_NVARCHAR: | ||
case SQLITE_NCHAR: | ||
case SQLITE_LONGNVARCHAR: | ||
case SQLITE_LONGVARCHAR: | ||
case SQLITE_CLOB: | ||
case SQLITE_CHAR: | ||
case SQLITE_TINYTEXT: | ||
case SQLITE_MEDIUMTEXT: | ||
case SQLITE_TEXT: | ||
case SQLITE_VARCHAR: | ||
case SQLITE_JSON: | ||
case SQLITE_LONGTEXT: | ||
|
||
case SQLITE_DATE: | ||
case SQLITE_TIME: | ||
case SQLITE_DATETIME: | ||
case SQLITE_TIMESTAMP: | ||
return BasicType.STRING_TYPE; | ||
|
||
case SQLITE_TINYBLOB: | ||
case SQLITE_MEDIUMBLOB: | ||
case SQLITE_BLOB: | ||
case SQLITE_LONGBLOB: | ||
case SQLITE_VARBINARY: | ||
case SQLITE_BINARY: | ||
case SQLITE_LONGVARBINARY: | ||
return PrimitiveByteArrayType.INSTANCE; | ||
|
||
//Doesn't support yet | ||
case SQLITE_UNKNOWN: | ||
default: | ||
final String jdbcColumnName = metadata.getColumnName(colIndex); | ||
throw new UnsupportedOperationException( | ||
String.format( | ||
"Doesn't support SQLite type '%s' on column '%s' yet.", | ||
columnTypeName, jdbcColumnName)); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Driver right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done