-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support passive auto discovery #267
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -602,6 +602,10 @@ public boolean copyPurge() { | |
return this.driverUri.copyPurge(); | ||
} | ||
|
||
public boolean isAutoDiscovery() { | ||
return this.autoDiscovery; | ||
} | ||
|
||
public String warehouse() { | ||
return this.driverUri.getWarehouse(); | ||
} | ||
|
@@ -718,17 +722,43 @@ DatabendClient startQueryWithFailover(String sql, StageAttachment attach) throws | |
ClientSettings s = sb.build(); | ||
logger.log(Level.FINE, "retry " + i + " times to execute query: " + sql + " on " + s.getHost()); | ||
// discover new hosts in need. | ||
// if (this.autoDiscovery) { | ||
// | ||
// } | ||
if (this.autoDiscovery) { | ||
tryAutoDiscovery(httpClient, s); | ||
} | ||
return new DatabendClientV1(httpClient, sql, s, this); | ||
} catch (RuntimeException e1) { | ||
e = e1; | ||
} catch (Exception e1) { | ||
throw new SQLException("Error executing query: " + "SQL: " + sql + " " + e1.getMessage() + " cause: " + e1.getCause(), e1); | ||
} | ||
} | ||
throw new SQLException("Failover Retry Error executing query after" + getMaxFailoverRetries() + "failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); | ||
throw new SQLException("Failover Retry Error executing query after " + getMaxFailoverRetries() + " failover retry: " + "SQL: " + sql + " " + e.getMessage() + " cause: " + e.getCause(), e); | ||
} | ||
|
||
/** | ||
* Try to auto discovery the databend nodes it will log exceptions when auto discovery failed and not affect real query execution | ||
* | ||
* @param client the http client to query on | ||
* @param settings the client settings to use | ||
*/ | ||
void tryAutoDiscovery(OkHttpClient client, ClientSettings settings) { | ||
if (this.autoDiscovery) { | ||
if (this.driverUri.enableMock()) { | ||
settings.getAdditionalHeaders().put("~mock.unsupported.discovery", "true"); | ||
} | ||
DatabendNodes nodes = this.driverUri.getNodes(); | ||
if (nodes != null && nodes.needDiscovery()) { | ||
try { | ||
nodes.discoverUris(client, settings); | ||
} catch (UnsupportedOperationException e) { | ||
logger.log(Level.WARNING, "Current Query Node do not support auto discovery, close the functionality: " + e.getMessage()); | ||
this.autoDiscovery = false; | ||
} catch (Exception e) { | ||
logger.log(Level.FINE, "Error auto discovery: " + " cause: " + e.getCause() + " message: " + e.getMessage()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. first time to find out FINE log level, what's it's normal use case? (not an issue at all, just curious 👀) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. info level will print problem by default, I think it is similar to DEBUG level on golang |
||
} | ||
} | ||
} | ||
|
||
} | ||
|
||
DatabendClient startQuery(String sql) throws SQLException { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,23 +3,28 @@ | |
import com.databend.client.ClientSettings; | ||
import com.databend.client.DatabendClientV1; | ||
import com.databend.client.DiscoveryNode; | ||
import lombok.Setter; | ||
import okhttp3.OkHttpClient; | ||
|
||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.security.InvalidParameterException; | ||
import java.sql.SQLException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.stream.Collectors; | ||
import java.util.logging.Logger; | ||
|
||
public class DatabendNodes implements DatabendNodeRouter { | ||
|
||
private AtomicReference<List<URI>> query_nodes_uris; | ||
protected final AtomicInteger index; | ||
// keep track of latest discovery scheduled time | ||
protected final AtomicReference<Long> lastDiscoveryTime = new AtomicReference<>(0L); | ||
private static final Logger logger = Logger.getLogger(DatabendNodes.class.getPackage().getName()); | ||
@Setter | ||
private boolean debug = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i guess if there is only one setter, define a |
||
// minimum time between discovery | ||
protected long discoveryInterval = 1000 * 60 * 5; | ||
protected DatabendClientLoadBalancingPolicy policy; | ||
|
@@ -64,28 +69,42 @@ public DatabendClientLoadBalancingPolicy getPolicy() { | |
} | ||
|
||
@Override | ||
public boolean discoverUris(OkHttpClient client, ClientSettings settings) { | ||
public void discoverUris(OkHttpClient client, ClientSettings settings) throws UnsupportedOperationException { | ||
// do nothing if discovery interval is not reached | ||
Long lastDiscoveryTime = this.lastDiscoveryTime.get(); | ||
if (System.currentTimeMillis() - lastDiscoveryTime < discoveryInterval) { | ||
return false; | ||
return; | ||
} | ||
List<URI> current_nodes = query_nodes_uris.get(); | ||
List<URI> current_uris = query_nodes_uris.get(); | ||
if (!this.lastDiscoveryTime.compareAndSet(lastDiscoveryTime, System.currentTimeMillis())) { | ||
return false; | ||
return; | ||
} | ||
|
||
List<DiscoveryNode> new_nodes = DatabendClientV1.dicoverNodes(client, settings); | ||
if (!new_nodes.isEmpty()) { | ||
// convert new nodes using lambda | ||
List<URI> new_uris = new_nodes.stream().map(node -> URI.create("http://" + node.getAddress())).collect(Collectors.toList()); | ||
updateNodes(new_uris); | ||
return true; | ||
try { | ||
List<DiscoveryNode> new_nodes = DatabendClientV1.discoverNodes(client, settings); | ||
if (!new_nodes.isEmpty()) { | ||
// convert new nodes using lambda | ||
List<URI> new_uris = this.parseURI(new_nodes); | ||
if (this.query_nodes_uris.compareAndSet(current_uris, new_uris)) { | ||
java.util.logging.Level level = debug ? java.util.logging.Level.INFO : java.util.logging.Level.FINE; | ||
// the log would only show that when truly updated the nodes | ||
logger.log(level, "Automatic Discovery updated nodes: " + new_uris); | ||
} | ||
} | ||
} catch (UnsupportedOperationException e) { | ||
throw e; | ||
} catch (RuntimeException e) { | ||
logger.log(java.util.logging.Level.WARNING, "Error updating nodes: " + e.getMessage()); | ||
} | ||
return false; | ||
|
||
} | ||
|
||
private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException { | ||
@Override | ||
public boolean needDiscovery() { | ||
Long lastDiscoveryTime = this.lastDiscoveryTime.get(); | ||
return System.currentTimeMillis() - lastDiscoveryTime >= discoveryInterval; | ||
} | ||
|
||
public List<URI> parseURI(List<com.databend.client.DiscoveryNode> nodes) throws RuntimeException { | ||
String host = null; | ||
List<URI> uris = new ArrayList<>(); | ||
try { | ||
|
@@ -103,20 +122,20 @@ private List<URI> parseURI(List<DiscoveryNode> nodes) throws SQLException { | |
} else if (hostAndPort.length == 1) { | ||
host = hostAndPort[0]; | ||
} else { | ||
throw new SQLException("Invalid host and port, url: " + uri); | ||
throw new InvalidParameterException("Invalid host and port, url: " + uri); | ||
} | ||
if (host == null || host.isEmpty()) { | ||
throw new SQLException("Invalid host " + host); | ||
throw new InvalidParameterException("Invalid host " + host); | ||
} | ||
|
||
uris.add(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uriPath, uriQuery, uriFragment)); | ||
} | ||
|
||
return DatabendDriverUri.canonicalizeUris(uris, this.useSecureConnection, this.sslmode); | ||
} catch (URISyntaxException e) { | ||
throw new SQLException("Invalid URI", e.getMessage()); | ||
throw new InvalidParameterException("Invalid URI " + e.getMessage()); | ||
} catch (SQLException e) { | ||
throw new RuntimeException("Error parsing URI " + e.getMessage()); | ||
} | ||
|
||
return uris; | ||
} | ||
|
||
public URI pickUri(String query_id) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
neat mock 👍