-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 12998][io] enhance clickhouse cluster sink support #12999
Conversation
@@ -41,7 +41,6 @@ | |||
<groupId>ru.yandex.clickhouse</groupId> | |||
<artifactId>clickhouse-jdbc</artifactId> | |||
<version>${clickhouse-jdbc.version}</version> | |||
<scope>runtime</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove the runtime scope here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to user ru.yandex.clickhouse.BalancedClickhouseDataSource
@nlu90 Would you please help review this Pr? |
...io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
Show resolved
Hide resolved
...io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
Outdated
Show resolved
Hide resolved
tableName = jdbcSinkConfig.getTableName(); | ||
tableId = JdbcUtils.getTableId(ckConnection, tableName); | ||
// Init PreparedStatement include insert, delete, update | ||
initStatement(); | ||
|
||
int timeoutMs = jdbcSinkConfig.getTimeoutMs(); | ||
batchSize = jdbcSinkConfig.getBatchSize(); | ||
incomingList = Lists.newArrayList(); | ||
swapList = Lists.newArrayList(); | ||
isFlushing = new AtomicBoolean(false); | ||
|
||
flushExecutor = Executors.newScheduledThreadPool(1); | ||
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These code are duplicated inJdbcAbstractSink
, try to reuse them instead of copy
@@ -43,13 +44,13 @@ | |||
@Slf4j | |||
public abstract class JdbcAbstractSink<T> implements Sink<T> { | |||
// ----- Runtime fields | |||
private JdbcSinkConfig jdbcSinkConfig; | |||
protected JdbcSinkConfig jdbcSinkConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of the fields can keep private
if code is reused properly
The pr had no activity for 30 days, mark with Stale label. |
The pr had no activity for 30 days, mark with Stale label. |
This will be supported at ClickHouse/clickhouse-java#894 0.3.3 directly by: String connString = "jdbc:ch://server1,server2,server3/database"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2"; Closing and waiting for a version bump... If you do want to continue this patch, I suggest: diff --git a/pulsar-io/jdbc/clickhouse/pom.xml b/pulsar-io/jdbc/clickhouse/pom.xml
index 82c5983bb2..50092e749d 100644
--- a/pulsar-io/jdbc/clickhouse/pom.xml
+++ b/pulsar-io/jdbc/clickhouse/pom.xml
@@ -41,7 +41,6 @@
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
- <scope>runtime</scope>
</dependency>
</dependencies>
diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
index 1dde785292..2437ed108c 100644
--- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
+++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
@@ -18,8 +18,13 @@
*/
package org.apache.pulsar.io.jdbc;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseDriver;
@Connector(
name = "jdbc-clickhouse",
@@ -28,5 +33,9 @@ import org.apache.pulsar.io.core.annotations.IOType;
configClass = JdbcSinkConfig.class
)
public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
-
+ @Override
+ protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException {
+ final BalancedClickhouseDataSource ds = new BalancedClickhouseDataSource(jdbcUrl, properties);
+ return ds.getConnection();
+ }
}
diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 4586fcebcf..cbfa9b82c9 100644
--- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -94,7 +94,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
properties.setProperty("password", password);
}
- connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
+ connection = createConnection(jdbcSinkConfig.getJdbcUrl(), properties);
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
@@ -114,6 +114,10 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
}
}
+ protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException {
+ return DriverManager.getConnection(jdbcUrl, properties);
+ }
+
private void initStatement() throws Exception {
List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey()); |
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #12998
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #
Motivation
Io moudle support clickhouse loadbalance connection
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)