Skip to content

Commit

Permalink
add ping in getConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Jan 31, 2024
1 parent c291393 commit 7b359f0
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
@ThreadSafe
public class DatabendClientV1
implements DatabendClient {
private static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() +
public static final String USER_AGENT_VALUE = DatabendClientV1.class.getSimpleName() +
"/" +
firstNonNull(DatabendClientV1.class.getPackage().getImplementationVersion(), "jvm-unknown");
private static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
private static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final JsonCodec<QueryResults> QUERY_RESULTS_CODEC = jsonCodec(QueryResults.class);

private static final String QUERY_PATH = "/v1/query";
public static final String QUERY_PATH = "/v1/query";
private static final long MAX_MATERIALIZED_JSON_RESPONSE_SIZE = 128 * 1024;
private final OkHttpClient httpClient;
private final String query;
Expand Down Expand Up @@ -222,7 +222,7 @@ public boolean isRunning() {
}

@Override
public Map<String, String> getAdditionalHeaders() {
public Map<String, String> getAdditionalHeaders() {
return additonalHeaders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ private Object column(int index)
}
Object value = null;
value = row.get().get(index - 1);
if (value.toString().equals("NULL")) {
if (value == null || value.toString().equals("NULL")) {
wasNull.set(value == null);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public final class ConnectionProperties {
public static final ConnectionProperty<String> USER = new User();
public static final ConnectionProperty<String> PASSWORD = new Password();
public static final ConnectionProperty<Boolean> SSL = new Ssl();
public static final ConnectionProperty<Boolean> STRNULL_AS_NULL = new StrNullAsNull();
public static final ConnectionProperty<String> WAREHOUSE = new Warehouse();
public static final ConnectionProperty<String> SSL_MODE = new SSLMode();
static final ConnectionProperty<String> TENANT = new Tenant();
Expand All @@ -39,6 +40,7 @@ public final class ConnectionProperties {
.add(USER)
.add(PASSWORD)
.add(SSL)
.add(STRNULL_AS_NULL)
.add(WAREHOUSE)
.add(SSL_MODE)
.add(TENANT)
Expand Down Expand Up @@ -84,6 +86,13 @@ public Ssl() {
}
}

public static class StrNullAsNull
extends AbstractConnectionProperty<Boolean> {
public StrNullAsNull() {
super("strnullasnull", Optional.of("true"), NOT_REQUIRED, ALLOWED, BOOLEAN_CONVERTER);
}
}

private static class Database
extends AbstractConnectionProperty<String> {
public Database() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.databend.jdbc;

import com.databend.client.ClientSettings;
import com.databend.client.DatabendClient;
import com.databend.client.DatabendClientV1;
import com.databend.client.DatabendSession;
import com.databend.client.PaginationOptions;
import com.databend.client.StageAttachment;
import com.databend.client.*;
import com.databend.jdbc.annotation.NotImplemented;
import com.databend.jdbc.cloud.DatabendCopyParams;
import com.databend.jdbc.cloud.DatabendPresignClient;
import com.databend.jdbc.cloud.DatabendPresignClientV1;
import com.fasterxml.jackson.core.JsonProcessingException;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -43,7 +40,9 @@
import java.util.logging.Logger;

import static com.databend.client.ClientSettings.*;
import static com.databend.client.DatabendClientV1.*;
import static com.google.common.base.Preconditions.checkState;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Collections.newSetFromMap;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -492,6 +491,10 @@ public String warehouse() {
return this.driverUri.getWarehouse();
}

public Boolean strNullAsNull() {
return this.driverUri.getStrNullAsNull();
}

public String tenant() {
return this.driverUri.getTenant();
}
Expand All @@ -516,6 +519,56 @@ public URI getURI() {
return this.httpUri;
}

public void PingDatabendClientV1() throws IOException {
PaginationOptions options = getPaginationOptions();
Map<String, String> additionalHeaders = setAdditionalHeaders();
additionalHeaders.put(X_Databend_Query_ID, UUID.randomUUID().toString());
ClientSettings settings = new ClientSettings.Builder().
setQueryTimeoutSecs(this.driverUri.getQueryTimeout()).
setConnectionTimeout(this.driverUri.getConnectionTimeout()).
setSocketTimeout(this.driverUri.getSocketTimeout()).
setSession(this.session.get()).
setHost(this.getURI().toString()).
setAdditionalHeaders(additionalHeaders).
setPaginationOptions(options).build();
String query = "select 1";
HttpUrl url = HttpUrl.get(settings.getHost());
if (url == null) {
throw new IllegalArgumentException("Invalid host: " + settings.getHost());
}
QueryRequest req = QueryRequest.builder().setSession(settings.getSession()).setStageAttachment(settings.getStageAttachment()).setPaginationOptions(settings.getPaginationOptions()).setSql(query).build();
String reqString = req.toString();
if (reqString == null || reqString.isEmpty()) {
throw new IllegalArgumentException("Invalid request: " + req);
}
url = url.newBuilder().encodedPath(QUERY_PATH).build();
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE)
.header("Accept", "application/json")
.header("Content-Type", "application/json");
if (settings.getAdditionalHeaders() != null) {
settings.getAdditionalHeaders().forEach(builder::addHeader);
}
Request request = builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, reqString)).build();
executePing(request);
}


public void executePing(Request request) throws IOException {
requireNonNull(request, "request is null");
try {
JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request, OptionalLong.empty());
if ((response.getStatusCode() < 400)) {
return;
} else {
throw new IOException("failed to ping databend server");
}
} catch (RuntimeException e) {
throw new IOException(e);
}
}

// TODO(zhihanz): session property push down
DatabendClient startQuery(String sql) throws SQLException {
PaginationOptions options = getPaginationOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class DatabendDriverUri {
private final Properties properties;
private final URI uri;
private final boolean useSecureConnection;
private final boolean strNullAsNull;
private final String warehouse;
private final String sslmode;
private final String tenant;
Expand All @@ -62,6 +63,7 @@ private DatabendDriverUri(String url, Properties driverProperties)
Map.Entry<URI, Map<String, String>> uriAndProperties = parse(url);
this.properties = mergeProperties(uriAndProperties.getKey(), uriAndProperties.getValue(), driverProperties);
this.useSecureConnection = SSL.getValue(properties).orElse(false);
this.strNullAsNull = STRNULL_AS_NULL.getValue(properties).orElse(true);
this.warehouse = WAREHOUSE.getValue(properties).orElse("");
this.sslmode = SSL_MODE.getValue(properties).orElse("disable");
this.tenant = TENANT.getValue(properties).orElse("");
Expand Down Expand Up @@ -258,6 +260,10 @@ public String getWarehouse() {
return warehouse;
}

public boolean getStrNullAsNull() {
return strNullAsNull;
}

public String getSslmode() {
return sslmode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import okhttp3.OkHttpClient;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
Expand Down Expand Up @@ -67,6 +68,13 @@ public Connection connect(String url, Properties info)

OkHttpClient.Builder builder = httpClient.newBuilder();
uri.setupClient(builder);
DatabendConnection connection = new DatabendConnection(uri, builder.build());
// ping the server host
try {
connection.PingDatabendClientV1();
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DatabendConnection(uri, builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public void testFull() throws SQLException {
props.setProperty("connection_time", "15");
props.setProperty("warehouse", "test");
props.setProperty("tenant", "tenant1");
props.setProperty("strnullasnull", String.valueOf(false));
DatabendDriverUri uri = DatabendDriverUri.create("jdbc:databend://u1@localhost:33101/db1?password=p1&database=db2&tenant=tenant1&warehouse=test&null_display=null&connection_timeout=15&socket_timeout=15&presigned_url_disabled=true&wait_time_secs=1&max_rows_in_buffer=10&max_rows_per_page=5", props);

Assert.assertEquals(uri.getProperties().get("user"), "u1");
Expand All @@ -204,5 +205,6 @@ public void testFull() throws SQLException {
Assert.assertEquals(uri.getMaxRowsPerPage().intValue(), 7);
Assert.assertFalse(uri.presignedUrlDisabled().booleanValue());
Assert.assertEquals("null", uri.nullDisplay().toString());
Assert.assertEquals(false, uri.getStrNullAsNull());
}
}

0 comments on commit 7b359f0

Please sign in to comment.