Skip to content

Commit

Permalink
[Fix][Connector-tdengine] Fix sql exception and concurrentmodifyexcep…
Browse files Browse the repository at this point in the history
…tion when connect to taos and read data
  • Loading branch information
alextinng authored Aug 8, 2024
1 parent 064fcad commit a18fca8
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.TIMEZONE;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.UPPER_BOUND;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME;

@Data
Expand All @@ -54,7 +53,10 @@ public class TDengineSourceConfig implements Serializable {

public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
TDengineSourceConfig tdengineSourceConfig = new TDengineSourceConfig();
tdengineSourceConfig.setUrl(pluginConfig.hasPath(URL) ? pluginConfig.getString(URL) : null);
tdengineSourceConfig.setUrl(
pluginConfig.hasPath(ConfigNames.URL)
? pluginConfig.getString(ConfigNames.URL)
: null);
tdengineSourceConfig.setDatabase(
pluginConfig.hasPath(DATABASE) ? pluginConfig.getString(DATABASE) : null);
tdengineSourceConfig.setStable(
Expand All @@ -69,6 +71,7 @@ public static TDengineSourceConfig buildSourceConfig(Config pluginConfig) {
pluginConfig.hasPath(LOWER_BOUND) ? pluginConfig.getString(LOWER_BOUND) : null);
tdengineSourceConfig.setTimezone(
pluginConfig.hasPath(TIMEZONE) ? pluginConfig.getString(TIMEZONE) : "UTC");

return tdengineSourceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.commons.lang3.ArrayUtils;

import com.google.auto.service.AutoService;
import com.taosdata.jdbc.TSDBDriver;
import lombok.SneakyThrows;

import java.sql.Connection;
Expand All @@ -49,6 +50,7 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD;
Expand Down Expand Up @@ -127,42 +129,36 @@ private StableMetadata getStableMetadata(TDengineSourceConfig config) throws SQL
List<String> fieldNames = new ArrayList<>();
List<SeaTunnelDataType<?>> fieldTypes = new ArrayList<>();

String jdbcUrl =
String.join(
"",
config.getUrl(),
config.getDatabase(),
"?user=",
config.getUsername(),
"&password=",
config.getPassword());
String jdbcUrl = String.join("", config.getUrl(), config.getDatabase());

// check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (Statement statement = conn.createStatement()) {

Properties properties = new Properties();
properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
String metaSQL =
String.format(
"select table_name from information_schema.ins_tables where db_name = '%s' and stable_name='%s'",
config.getDatabase(), config.getStable());
try (Connection conn = DriverManager.getConnection(jdbcUrl, properties);
Statement statement = conn.createStatement();
ResultSet metaResultSet =
statement.executeQuery(
"desc " + config.getDatabase() + "." + config.getStable());
while (metaResultSet.next()) {
if (timestampFieldName == null) {
timestampFieldName = metaResultSet.getString(1);
}
fieldNames.add(metaResultSet.getString(1));
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
String.format(
"desc %s.%s", config.getDatabase(), config.getStable()));
ResultSet subTableNameResultSet = statement.executeQuery(metaSQL)) {
while (metaResultSet.next()) {
if (timestampFieldName == null) {
timestampFieldName = metaResultSet.getString(1);
}
fieldNames.add(metaResultSet.getString(1));
fieldTypes.add(TDengineTypeMapper.mapping(metaResultSet.getString(2)));
}
try (Statement statement = conn.createStatement()) {
String metaSQL =
"select table_name from information_schema.ins_tables where db_name = '"
+ config.getDatabase()
+ "' and stable_name='"
+ config.getStable()
+ "';";
ResultSet subTableNameResultSet = statement.executeQuery(metaSQL);
while (subTableNameResultSet.next()) {
String subTableName = subTableNameResultSet.getString(1);
subTableNames.add(subTableName);
}

while (subTableNameResultSet.next()) {
String subTableName = subTableNameResultSet.getString(1);
subTableNames.add(subTableName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.tdengine.source;

import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;

import org.apache.commons.lang3.StringUtils;

import com.google.common.collect.Sets;
import com.taosdata.jdbc.TSDBDriver;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -39,84 +35,76 @@
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;

import static org.apache.seatunnel.connectors.seatunnel.tdengine.utils.TDengineUtil.checkDriverExist;

@Slf4j
public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> {

private static final long THREAD_WAIT_TIME = 500L;

private final TDengineSourceConfig config;

private final Set<TDengineSourceSplit> sourceSplits;
private final Deque<TDengineSourceSplit> sourceSplits;

private final Context context;

private Connection conn;

private volatile boolean noMoreSplit;

public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) {
this.config = config;
this.sourceSplits = Sets.newHashSet();
this.sourceSplits = new ConcurrentLinkedDeque<>();
this.context = readerContext;
}

@Override
public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException {
if (sourceSplits.isEmpty()) {
Thread.sleep(THREAD_WAIT_TIME);
return;
}
synchronized (collector.getCheckpointLock()) {
sourceSplits.forEach(
split -> {
try {
read(split, collector);
} catch (Exception e) {
throw new TDengineConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
"TDengine split read error",
e);
}
});
}

if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded TDengine source");
context.signalNoMoreElement();
log.info("polling new split from queue!");
TDengineSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
log.info(
"starting run new split {}, query sql: {}!",
split.splitId(),
split.getQuery());
try {
read(split, collector);
} catch (Exception e) {
throw new TDengineConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
"TDengine split read error",
e);
}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded TDengine source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
}
}
}

@Override
public void open() {
String jdbcUrl =
StringUtils.join(
config.getUrl(),
config.getDatabase(),
"?user=",
config.getUsername(),
"&password=",
config.getPassword());
Properties connProps = new Properties();
// todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true",
// there is a exception : Caused by: java.sql.SQLException: can't create connection with
// server
// under docker network env
// @bobo (tdengine)
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false");
String jdbcUrl = config.getUrl();

Properties properties = new Properties();
properties.put(TSDBDriver.PROPERTY_KEY_USER, config.getUsername());
properties.put(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());

try {
// check td driver whether exist and if not, try to register
checkDriverExist(jdbcUrl);
conn = DriverManager.getConnection(jdbcUrl, connProps);
conn = DriverManager.getConnection(jdbcUrl, properties);
} catch (SQLException e) {
throw new TDengineConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
"get TDengine connection failed:" + jdbcUrl);
"get TDengine connection failed:" + jdbcUrl,
e);
}
}

Expand All @@ -135,8 +123,8 @@ public void close() {
}

private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) throws Exception {
try (Statement statement = conn.createStatement()) {
final ResultSet resultSet = statement.executeQuery(split.getQuery());
try (Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(split.getQuery())) {
ResultSetMetaData meta = resultSet.getMetaData();

while (resultSet.next()) {
Expand All @@ -151,6 +139,8 @@ private void read(TDengineSourceSplit split, Collector<SeaTunnelRow> output) thr
}

private Object convertDataType(Object object) {
if (Objects.isNull(object)) return null;

if (Timestamp.class.equals(object.getClass())) {
return ((Timestamp) object).toLocalDateTime();
} else if (byte[].class.equals(object.getClass())) {
Expand All @@ -171,7 +161,8 @@ public void addSplits(List<TDengineSourceSplit> splits) {

@Override
public void handleNoMoreSplits() {
// do nothing
log.info("no more split accepted!");
noMoreSplit = true;
}

@Override
Expand Down
Loading

0 comments on commit a18fca8

Please sign in to comment.