diff --git a/databend-client/pom.xml b/databend-client/pom.xml index 6119dfff..bd17f322 100644 --- a/databend-client/pom.xml +++ b/databend-client/pom.xml @@ -6,12 +6,12 @@ com.databend databend-base - 0.3.0 + 0.3.1 ../pom.xml com.databend databend-client - 0.3.0 + 0.3.1 diff --git a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java index 6f623b50..ee151767 100644 --- a/databend-client/src/main/java/com/databend/client/DatabendClientV1.java +++ b/databend-client/src/main/java/com/databend/client/DatabendClientV1.java @@ -94,7 +94,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett } } - public static List dicoverNodes(OkHttpClient httpClient, ClientSettings settings) { + public static List discoverNodes(OkHttpClient httpClient, ClientSettings settings) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(settings, "settings is null"); requireNonNull(settings.getHost(), "settings.host is null"); diff --git a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java index 7a08c524..6689e137 100644 --- a/databend-client/src/main/java/com/databend/client/DiscoveryNode.java +++ b/databend-client/src/main/java/com/databend/client/DiscoveryNode.java @@ -30,6 +30,9 @@ public DiscoveryNode( this.address = address; } + public static DiscoveryNode create(String address) { + return new DiscoveryNode(address); + } // add builder @JsonProperty diff --git a/databend-client/src/test/java/com/databend/client/TestClientIT.java b/databend-client/src/test/java/com/databend/client/TestClientIT.java index 5f356871..25b6df5c 100644 --- a/databend-client/src/test/java/com/databend/client/TestClientIT.java +++ b/databend-client/src/test/java/com/databend/client/TestClientIT.java @@ -102,7 +102,7 @@ public void testDiscoverNodes() { Map additionalHeaders = new HashMap<>(); additionalHeaders.put(X_Databend_Query_ID, expectedUUID); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); - List nodes = DatabendClientV1.dicoverNodes(client, settings); + List nodes = DatabendClientV1.discoverNodes(client, settings); Assert.assertFalse(nodes.isEmpty()); for (DiscoveryNode node : nodes) { System.out.println(node.getAddress()); @@ -119,7 +119,7 @@ public void testDiscoverNodesUnSupported() { additionalHeaders.put("~mock.unsupported.discovery", "true"); ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS); try { - DatabendClientV1.dicoverNodes(client, settings); + DatabendClientV1.discoverNodes(client, settings); Assert.fail("Expected exception was not thrown"); } catch (Exception e) { System.out.println(e.getMessage()); diff --git a/databend-jdbc/pom.xml b/databend-jdbc/pom.xml index 17d8b8bd..997f98b9 100644 --- a/databend-jdbc/pom.xml +++ b/databend-jdbc/pom.xml @@ -6,12 +6,12 @@ com.databend databend-base - 0.3.0 + 0.3.1 ../pom.xml com.databend databend-jdbc - 0.3.0 + 0.3.1 @@ -24,7 +24,7 @@ com.databend databend-client - 0.3.0 + 0.3.1 com.squareup.okhttp3 diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java b/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java index 391f936a..4be05383 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/ConnectionProperties.java @@ -24,7 +24,7 @@ public final class ConnectionProperties { public static final ConnectionProperty MAX_FAILOVER_RETRY = new MaxFailoverRetry(); public static final ConnectionProperty LOAD_BALANCING_POLICY = new LoadBalancingPolicy(); public static final ConnectionProperty AUTO_DISCOVERY = new AutoDiscovery(); - + public static final ConnectionProperty ENABLE_MOCK = new EnableMock(); public static final ConnectionProperty DATABASE = new Database(); public static final ConnectionProperty ACCESS_TOKEN = new AccessToken(); @@ -162,6 +162,12 @@ public AutoDiscovery() { } } + private static class EnableMock extends AbstractConnectionProperty { + public EnableMock() { + super("enable_mock", Optional.of("false"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER); + } + } + private static class AccessToken extends AbstractConnectionProperty { public AccessToken() { diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 5fde3d4c..d61afc0f 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -602,6 +602,10 @@ public boolean copyPurge() { return this.driverUri.copyPurge(); } + public boolean isAutoDiscovery() { + return this.autoDiscovery; + } + public String warehouse() { return this.driverUri.getWarehouse(); } @@ -718,9 +722,9 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws ClientSettings s = sb.build(); logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost()); // discover new hosts in need. -// if (this.autoDiscovery) { -// -// } + if (this.autoDiscovery) { + tryAutoDiscovery(httpClient, s); + } return new DatabendClientV1(httpClient, sql, s, this); } catch (RuntimeException e1) { e = e1; @@ -728,7 +732,33 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws throw new SQLException("Error executing query: " + "SQL: " + sql + " " + e1.getMessage() + " cause: " + e1.getCause(), e1); } } - throw new SQLException("Failover Retry Error executing query after" + getMaxFailoverRetries() + "failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); + throw new SQLException("Failover Retry Error executing query after " + getMaxFailoverRetries() + " failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); + } + + /** + * Try to auto discovery the databend nodes it will log exceptions when auto discovery failed and not affect real query execution + * + * @param client the http client to query on + * @param settings the client settings to use + */ + void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) { + if (this.autoDiscovery) { + if (this.driverUri.enableMock()) { + settings.getAdditionalHeaders().put("~mock.unsupported.discovery", "true"); + } + DatabendNodes nodes = this.driverUri.getNodes(); + if (nodes != null && nodes.needDiscovery()) { + try { + nodes.discoverUris(client, settings); + } catch (UnsupportedOperationException e) { + logger.log(Level.WARNING, "Current Query Node do not support auto discovery, close the functionality: " + e.getMessage()); + this.autoDiscovery = false; + } catch (Exception e) { + logger.log(Level.FINE, "Error auto discovery: " + " cause: " + e.getCause() + " message: " + e.getMessage()); + } + } + } + } DatabendClient startQuery(String sql) throws SQLException { diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java index 864e8ea0..8e740262 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendDriverUri.java @@ -57,6 +57,7 @@ public final class DatabendDriverUri { private final Integer connectionTimeout; private final Integer maxFailoverRetry; private final boolean autoDiscovery; + private final boolean enableMock; private final Integer queryTimeout; private final Integer socketTimeout; private final Integer waitTimeSecs; @@ -73,6 +74,7 @@ private DatabendDriverUri(String url, Properties driverProperties) this.useSecureConnection = SSL.getValue(properties).orElse(false); this.useVerify = USE_VERIFY.getValue(properties).orElse(false); this.debug = DEBUG.getValue(properties).orElse(false); + this.enableMock = ENABLE_MOCK.getValue(properties).orElse(false); this.strNullAsNull = STRNULL_AS_NULL.getValue(properties).orElse(true); this.warehouse = WAREHOUSE.getValue(properties).orElse(""); this.sslmode = SSL_MODE.getValue(properties).orElse("disable"); @@ -117,7 +119,7 @@ private static void initDatabase(URI uri, Map uriProperties) thr uriProperties.put(DATABASE.getKey(), db); } - private static List canonicalizeUris(List uris, boolean isSSLSecured, String sslmode) throws SQLException { + public static List canonicalizeUris(List uris, boolean isSSLSecured, String sslmode) throws SQLException { List finalUris = new ArrayList<>(); for (URI uri : uris) { finalUris.add(canonicalizeUri(uri, isSSLSecured, sslmode)); @@ -351,6 +353,10 @@ public boolean getDebug() { return debug; } + public boolean enableMock() { + return enableMock; + } + public String getSslmode() { return sslmode; } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java index 8e0d626e..dfa31bda 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodeRouter.java @@ -24,11 +24,8 @@ public interface DatabendNodeRouter { /** * Discover all possible query uris through databend discovery api and update candidate node router list in need - * - * @return true if update operation executed, false otherwise - * Ref PR: - * https://github.com/datafuselabs/databend-jdbc/pull/264 - * https://github.com/datafuselabs/databend/pull/16353 */ - boolean discoverUris(OkHttpClient client, ClientSettings settings); + void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException; + + boolean needDiscovery(); } diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java index 9c5c96a1..aba3a498 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java @@ -3,16 +3,18 @@ import com.databend.client.ClientSettings; import com.databend.client.DatabendClientV1; import com.databend.client.DiscoveryNode; +import lombok.Setter; import okhttp3.OkHttpClient; import java.net.URI; import java.net.URISyntaxException; +import java.security.InvalidParameterException; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; +import java.util.logging.Logger; public class DatabendNodes implements DatabendNodeRouter { @@ -20,6 +22,9 @@ public class DatabendNodes implements DatabendNodeRouter { protected final AtomicInteger index; // keep track of latest discovery scheduled time protected final AtomicReference lastDiscoveryTime = new AtomicReference<>(0L); + private static final Logger logger = Logger.getLogger(DatabendNodes.class.getPackage().getName()); + @Setter + private boolean debug = false; // minimum time between discovery protected long discoveryInterval = 1000 * 60 * 5; protected DatabendClientLoadBalancingPolicy policy; @@ -64,28 +69,42 @@ public DatabendClientLoadBalancingPolicy getPolicy() { } @Override - public boolean discoverUris(OkHttpClient client, ClientSettings settings) { + public void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException { // do nothing if discovery interval is not reached Long lastDiscoveryTime = this.lastDiscoveryTime.get(); if (System.currentTimeMillis() - lastDiscoveryTime < discoveryInterval) { - return false; + return; } - List current_nodes = query_nodes_uris.get(); + List current_uris = query_nodes_uris.get(); if (!this.lastDiscoveryTime.compareAndSet(lastDiscoveryTime, System.currentTimeMillis())) { - return false; + return; } - - List new_nodes = DatabendClientV1.dicoverNodes(client, settings); - if (!new_nodes.isEmpty()) { - // convert new nodes using lambda - List new_uris = new_nodes.stream().map(node -> URI.create("http://" + node.getAddress())).collect(Collectors.toList()); - updateNodes(new_uris); - return true; + try { + List new_nodes = DatabendClientV1.discoverNodes(client, settings); + if (!new_nodes.isEmpty()) { + // convert new nodes using lambda + List new_uris = this.parseURI(new_nodes); + if (this.query_nodes_uris.compareAndSet(current_uris, new_uris)) { + java.util.logging.Level level = debug ? java.util.logging.Level.INFO : java.util.logging.Level.FINE; + // the log would only show that when truly updated the nodes + logger.log(level, "Automatic Discovery updated nodes: " + new_uris); + } + } + } catch (UnsupportedOperationException e) { + throw e; + } catch (RuntimeException e) { + logger.log(java.util.logging.Level.WARNING, "Error updating nodes: " + e.getMessage()); } - return false; + } - private List parseURI(List nodes) throws SQLException { + @Override + public boolean needDiscovery() { + Long lastDiscoveryTime = this.lastDiscoveryTime.get(); + return System.currentTimeMillis() - lastDiscoveryTime >= discoveryInterval; + } + + public List parseURI(List nodes) throws RuntimeException { String host = null; List uris = new ArrayList<>(); try { @@ -103,20 +122,20 @@ private List parseURI(List nodes) throws SQLException { } else if (hostAndPort.length == 1) { host = hostAndPort[0]; } else { - throw new SQLException("Invalid host and port, url: " + uri); + throw new InvalidParameterException("Invalid host and port, url: " + uri); } if (host == null || host.isEmpty()) { - throw new SQLException("Invalid host " + host); + throw new InvalidParameterException("Invalid host " + host); } uris.add(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment)); } - + return DatabendDriverUri.canonicalizeUris(uris, this.useSecureConnection, this.sslmode); } catch (URISyntaxException e) { - throw new SQLException("Invalid URI", e.getMessage()); + throw new InvalidParameterException("Invalid URI " + e.getMessage()); + } catch (SQLException e) { + throw new RuntimeException("Error parsing URI " + e.getMessage()); } - - return uris; } public URI pickUri(String query_id) { diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestBasicDriver.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestBasicDriver.java index d247017f..b8e71167 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestBasicDriver.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestBasicDriver.java @@ -9,6 +9,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; + import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendDatabaseMetaData.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendDatabaseMetaData.java index 8e174219..33cbeea7 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendDatabaseMetaData.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendDatabaseMetaData.java @@ -5,13 +5,7 @@ import org.testng.annotations.Test; import java.math.BigDecimal; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Types; +import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.Locale; diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendParameterMetaData.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendParameterMetaData.java index 3f62d924..caeb4e04 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendParameterMetaData.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestDatabendParameterMetaData.java @@ -5,6 +5,7 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; + import java.sql.Connection; import java.sql.DriverManager; import java.sql.ParameterMetaData; diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java index 4a9bed0f..3a4daf29 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java @@ -9,13 +9,9 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.nio.file.Files; + import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java index d9b569ce..1c29d9fb 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestMultiHost.java @@ -1,18 +1,26 @@ package com.databend.jdbc; +import com.databend.client.DiscoveryNode; + import org.testng.Assert; import org.testng.annotations.Test; +import java.net.URI; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; public class TestMultiHost { private final String DEFAULT_JDBC_URL = "jdbc:databend://localhost:8000,localhost:8002,localhost:8003/default"; private final String RANDOM_JDBC_URL = "jdbc:databend://localhost:8000,localhost:8002,localhost:8003/default?load_balancing_policy=random"; private final String ROUND_ROBIN_JDBC_URL = "jdbc:databend://localhost:8000,localhost:8002,localhost:8003/default?load_balancing_policy=round_robin"; private final String FAIL_OVER_JDBC_URL = "jdbc:databend://localhost:7222,localhost:7223,localhost:7224,localhost:8000/default?load_balancing_policy=round_robin&max_failover_retry=4"; + private final String AUTO_DISCOVERY_JDBC_URL = "jdbc:databend://localhost:8000/default?load_balancing_policy=round_robin&auto_discovery=true"; + private final String UNSUPPORT_AUTO_DISCOVERY_JDBC_URL = "jdbc:databend://localhost:8000/default?load_balancing_policy=round_robin&auto_discovery=true&enable_mock=true"; + private Connection createConnection(String url) throws SQLException { @@ -185,4 +193,80 @@ public void testFailOver() Assert.assertEquals(unknown, 0); Assert.assertEquals(node8000 + node8002 + node8003, 90); } + + @Test(groups = {"IT", "cluster"}) + public void testAutoDiscovery() + throws SQLException { + // try connect with three nodes 1000 times and count for each node + int node8000 = 0; + int node8002 = 0; + int node8003 = 0; + int unknown = 0; + try (Connection connection = createConnection(AUTO_DISCOVERY_JDBC_URL)) { + for (int i = 0; i < 30; i++) { + DatabendStatement statement = (DatabendStatement) connection.createStatement(); + // remove the effect setup commands + for (int j = 0; j < 3; j++) { + statement.execute("select value from system.configs where name = 'http_handler_port';"); + ResultSet r = statement.getResultSet(); + r.next(); + if (r.getInt(1) == 8000) { + node8000++; + } else if (r.getInt(1) == 8002) { + node8002++; + } else if (r.getInt(1) == 8003) { + node8003++; + } else { + unknown++; + } + } + } + } + System.out.println("node8000: " + node8000 + ", node8002: " + node8002 + ", node8003: " + node8003 + ", unknown: " + unknown); + + Assert.assertEquals(node8000, 31); + Assert.assertEquals(node8002, 30); + Assert.assertEquals(node8003, 29); + Assert.assertEquals(unknown, 0); + Assert.assertEquals(node8000 + node8002 + node8003, 90); + } + + @Test(groups = {"IT", "cluster"}) + public void testUnSupportedAutoDiscovery() + throws SQLException { + try (Connection connection = createConnection(UNSUPPORT_AUTO_DISCOVERY_JDBC_URL)) { + + DatabendStatement statement = (DatabendStatement) connection.createStatement(); + statement.execute("select value from system.configs where name = 'http_handler_port';"); + ResultSet r = statement.getResultSet(); + r.next(); + DatabendConnection dbc = (DatabendConnection) connection; + // automatically + Assert.assertFalse(dbc.isAutoDiscovery()); + } catch (SQLException e) { + // there should be no exception + Assert.fail("Should not throw exception"); + } + } + + @Test(groups = {"unit"}) + public void testAutoDiscoveryUriParsing() throws SQLException { + DatabendDriverUri uri = DatabendDriverUri.create("jdbc:databend://localhost:8000?ssl=true", null); + DatabendDriverUri uri2 = DatabendDriverUri.create("jdbc:databend://127.0.0.1:8000,127.0.0.1:8002,127.0.0.1:8003?ssl=true", null); + List uris2 = uri2.getNodes().getUris(); + + DatabendNodes nodes = uri.getNodes(); + List discoveryNodes = new ArrayList<>(); + discoveryNodes.add(DiscoveryNode.create("127.0.0.1:8000")); + discoveryNodes.add(DiscoveryNode.create("127.0.0.1:8002")); + discoveryNodes.add(DiscoveryNode.create("127.0.0.1:8003")); + List uris = nodes.parseURI(discoveryNodes); + for (URI u : uris) { + System.out.println(u); + } + Assert.assertEquals(uris.size(), 3); + Assert.assertEquals(uris2.size(), 3); + Assert.assertEquals(uris2, uris); + + } } diff --git a/pom.xml b/pom.xml index 5ce8bf19..cbe4b5f7 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.databend databend-base - 0.3.0 + 0.3.1 databend-base Databend pom