Skip to content

Commit

Permalink
feat: support passive auto discovery (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhiHanZ authored Sep 6, 2024
1 parent 1b209a0 commit 19245ea
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 54 deletions.
4 changes: 2 additions & 2 deletions databend-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.databend</groupId>
<artifactId>databend-base</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.databend</groupId>
<artifactId>databend-client</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>

<properties>
<!--suppress UnresolvedMavenProperty -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public DatabendClientV1(OkHttpClient httpClient, String sql, ClientSettings sett
}
}

public static List<DiscoveryNode> dicoverNodes(OkHttpClient httpClient, ClientSettings settings) {
public static List<DiscoveryNode> discoverNodes(OkHttpClient httpClient, ClientSettings settings) {
requireNonNull(httpClient, "httpClient is null");
requireNonNull(settings, "settings is null");
requireNonNull(settings.getHost(), "settings.host is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public DiscoveryNode(
this.address = address;
}

public static DiscoveryNode create(String address) {
return new DiscoveryNode(address);
}
// add builder

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testDiscoverNodes() {
Map<String, String> additionalHeaders = new HashMap<>();
additionalHeaders.put(X_Databend_Query_ID, expectedUUID);
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
List<DiscoveryNode> nodes = DatabendClientV1.dicoverNodes(client, settings);
List<DiscoveryNode> nodes = DatabendClientV1.discoverNodes(client, settings);
Assert.assertFalse(nodes.isEmpty());
for (DiscoveryNode node : nodes) {
System.out.println(node.getAddress());
Expand All @@ -119,7 +119,7 @@ public void testDiscoverNodesUnSupported() {
additionalHeaders.put("~mock.unsupported.discovery", "true");
ClientSettings settings = new ClientSettings(DATABEND_HOST, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), additionalHeaders, null, DEFAULT_RETRY_ATTEMPTS);
try {
DatabendClientV1.dicoverNodes(client, settings);
DatabendClientV1.discoverNodes(client, settings);
Assert.fail("Expected exception was not thrown");
} catch (Exception e) {
System.out.println(e.getMessage());
Expand Down
6 changes: 3 additions & 3 deletions databend-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.databend</groupId>
<artifactId>databend-base</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.databend</groupId>
<artifactId>databend-jdbc</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>

<properties>
<!--suppress UnresolvedMavenProperty -->
Expand All @@ -24,7 +24,7 @@
<dependency>
<groupId>com.databend</groupId>
<artifactId>databend-client</artifactId>
<version>0.3.0</version>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class ConnectionProperties {
public static final ConnectionProperty<Integer> MAX_FAILOVER_RETRY = new MaxFailoverRetry();
public static final ConnectionProperty<String> LOAD_BALANCING_POLICY = new LoadBalancingPolicy();
public static final ConnectionProperty<Boolean> AUTO_DISCOVERY = new AutoDiscovery();

public static final ConnectionProperty<Boolean> ENABLE_MOCK = new EnableMock();
public static final ConnectionProperty<String> DATABASE = new Database();
public static final ConnectionProperty<String> ACCESS_TOKEN = new AccessToken();

Expand Down Expand Up @@ -162,6 +162,12 @@ public AutoDiscovery() {
}
}

private static class EnableMock extends AbstractConnectionProperty<Boolean> {
public EnableMock() {
super("enable_mock", Optional.of("false"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class AccessToken
extends AbstractConnectionProperty<String> {
public AccessToken() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ public boolean copyPurge() {
return this.driverUri.copyPurge();
}

public boolean isAutoDiscovery() {
return this.autoDiscovery;
}

public String warehouse() {
return this.driverUri.getWarehouse();
}
Expand Down Expand Up @@ -718,17 +722,43 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws
ClientSettings s = sb.build();
logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost());
// discover new hosts in need.
// if (this.autoDiscovery) {
//
// }
if (this.autoDiscovery) {
tryAutoDiscovery(httpClient, s);
}
return new DatabendClientV1(httpClient, sql, s, this);
} catch (RuntimeException e1) {
e = e1;
} catch (Exception e1) {
throw new SQLException("Error executing query: " + "SQL: " + sql + " " + e1.getMessage() + " cause: " + e1.getCause(), e1);
}
}
throw new SQLException("Failover Retry Error executing query after" + getMaxFailoverRetries() + "failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
throw new SQLException("Failover Retry Error executing query after " + getMaxFailoverRetries() + " failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e);
}

/**
* Try to auto discovery the databend nodes it will log exceptions when auto discovery failed and not affect real query execution
*
* @param client the http client to query on
* @param settings the client settings to use
*/
void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) {
if (this.autoDiscovery) {
if (this.driverUri.enableMock()) {
settings.getAdditionalHeaders().put("~mock.unsupported.discovery", "true");
}
DatabendNodes nodes = this.driverUri.getNodes();
if (nodes != null && nodes.needDiscovery()) {
try {
nodes.discoverUris(client, settings);
} catch (UnsupportedOperationException e) {
logger.log(Level.WARNING, "Current Query Node do not support auto discovery, close the functionality: " + e.getMessage());
this.autoDiscovery = false;
} catch (Exception e) {
logger.log(Level.FINE, "Error auto discovery: " + " cause: " + e.getCause() + " message: " + e.getMessage());
}
}
}

}

DatabendClient startQuery(String sql) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class DatabendDriverUri {
private final Integer connectionTimeout;
private final Integer maxFailoverRetry;
private final boolean autoDiscovery;
private final boolean enableMock;
private final Integer queryTimeout;
private final Integer socketTimeout;
private final Integer waitTimeSecs;
Expand All @@ -73,6 +74,7 @@ private DatabendDriverUri(String url, Properties driverProperties)
this.useSecureConnection = SSL.getValue(properties).orElse(false);
this.useVerify = USE_VERIFY.getValue(properties).orElse(false);
this.debug = DEBUG.getValue(properties).orElse(false);
this.enableMock = ENABLE_MOCK.getValue(properties).orElse(false);
this.strNullAsNull = STRNULL_AS_NULL.getValue(properties).orElse(true);
this.warehouse = WAREHOUSE.getValue(properties).orElse("");
this.sslmode = SSL_MODE.getValue(properties).orElse("disable");
Expand Down Expand Up @@ -117,7 +119,7 @@ private static void initDatabase(URI uri, Map<String, String> uriProperties) thr
uriProperties.put(DATABASE.getKey(), db);
}

private static List<URI> canonicalizeUris(List<URI> uris, boolean isSSLSecured, String sslmode) throws SQLException {
public static List<URI> canonicalizeUris(List<URI> uris, boolean isSSLSecured, String sslmode) throws SQLException {
List<URI> finalUris = new ArrayList<>();
for (URI uri : uris) {
finalUris.add(canonicalizeUri(uri, isSSLSecured, sslmode));
Expand Down Expand Up @@ -351,6 +353,10 @@ public boolean getDebug() {
return debug;
}

public boolean enableMock() {
return enableMock;
}

public String getSslmode() {
return sslmode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@ public interface DatabendNodeRouter {

/**
* Discover all possible query uris through databend discovery api and update candidate node router list in need
*
* @return true if update operation executed, false otherwise
* Ref PR:
* https://github.com/datafuselabs/databend-jdbc/pull/264
* https://github.com/datafuselabs/databend/pull/16353
*/
boolean discoverUris(OkHttpClient client, ClientSettings settings);
void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException;

boolean needDiscovery();
}
59 changes: 39 additions & 20 deletions databend-jdbc/src/main/java/com/databend/jdbc/DatabendNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
import com.databend.client.ClientSettings;
import com.databend.client.DatabendClientV1;
import com.databend.client.DiscoveryNode;
import lombok.Setter;
import okhttp3.OkHttpClient;

import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidParameterException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.logging.Logger;

public class DatabendNodes implements DatabendNodeRouter {

private AtomicReference<List<URI>> query_nodes_uris;
protected final AtomicInteger index;
// keep track of latest discovery scheduled time
protected final AtomicReference<Long> lastDiscoveryTime = new AtomicReference<>(0L);
private static final Logger logger = Logger.getLogger(DatabendNodes.class.getPackage().getName());
@Setter
private boolean debug = false;
// minimum time between discovery
protected long discoveryInterval = 1000 * 60 * 5;
protected DatabendClientLoadBalancingPolicy policy;
Expand Down Expand Up @@ -64,28 +69,42 @@ public DatabendClientLoadBalancingPolicy getPolicy() {
}

@Override
public boolean discoverUris(OkHttpClient client, ClientSettings settings) {
public void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException {
// do nothing if discovery interval is not reached
Long lastDiscoveryTime = this.lastDiscoveryTime.get();
if (System.currentTimeMillis() - lastDiscoveryTime < discoveryInterval) {
return false;
return;
}
List<URI> current_nodes = query_nodes_uris.get();
List<URI> current_uris = query_nodes_uris.get();
if (!this.lastDiscoveryTime.compareAndSet(lastDiscoveryTime, System.currentTimeMillis())) {
return false;
return;
}

List<DiscoveryNode> new_nodes = DatabendClientV1.dicoverNodes(client, settings);
if (!new_nodes.isEmpty()) {
// convert new nodes using lambda
List<URI> new_uris = new_nodes.stream().map(node -> URI.create("http://" + node.getAddress())).collect(Collectors.toList());
updateNodes(new_uris);
return true;
try {
List<DiscoveryNode> new_nodes = DatabendClientV1.discoverNodes(client, settings);
if (!new_nodes.isEmpty()) {
// convert new nodes using lambda
List<URI> new_uris = this.parseURI(new_nodes);
if (this.query_nodes_uris.compareAndSet(current_uris, new_uris)) {
java.util.logging.Level level = debug ? java.util.logging.Level.INFO : java.util.logging.Level.FINE;
// the log would only show that when truly updated the nodes
logger.log(level, "Automatic Discovery updated nodes: " + new_uris);
}
}
} catch (UnsupportedOperationException e) {
throw e;
} catch (RuntimeException e) {
logger.log(java.util.logging.Level.WARNING, "Error updating nodes: " + e.getMessage());
}
return false;

}

private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException {
@Override
public boolean needDiscovery() {
Long lastDiscoveryTime = this.lastDiscoveryTime.get();
return System.currentTimeMillis() - lastDiscoveryTime >= discoveryInterval;
}

public List<URI> parseURI(List<com.databend.client.DiscoveryNode> nodes) throws RuntimeException {
String host = null;
List<URI> uris = new ArrayList<>();
try {
Expand All @@ -103,20 +122,20 @@ private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException {
} else if (hostAndPort.length == 1) {
host = hostAndPort[0];
} else {
throw new SQLException("Invalid host and port, url: " + uri);
throw new InvalidParameterException("Invalid host and port, url: " + uri);
}
if (host == null || host.isEmpty()) {
throw new SQLException("Invalid host " + host);
throw new InvalidParameterException("Invalid host " + host);
}

uris.add(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment));
}

return DatabendDriverUri.canonicalizeUris(uris, this.useSecureConnection, this.sslmode);
} catch (URISyntaxException e) {
throw new SQLException("Invalid URI", e.getMessage());
throw new InvalidParameterException("Invalid URI " + e.getMessage());
} catch (SQLException e) {
throw new RuntimeException("Error parsing URI " + e.getMessage());
}

return uris;
}

public URI pickUri(String query_id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;


import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@
import org.testng.annotations.Test;

import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;


import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ParameterMetaData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.nio.file.Files;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
Expand Down
Loading

0 comments on commit 19245ea

Please sign in to comment.