Skip to content

Commit

Permalink
Merge pull request #286 from youngsofun/cookie
Browse files Browse the repository at this point in the history
feat: support temp table.
  • Loading branch information
hantmac authored Nov 19, 2024
2 parents 875f4e9 + db1db35 commit 044b5d1
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/actions/setup_databend_cluster/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ inputs:
version:
description: "query and meta service version"
required: true
default: "1.2.629-nightly"
default: "1.2.655-nightly"
target:
description: ""
required: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.647-nightly'
version: '1.2.655-nightly'
target: 'x86_64-unknown-linux-gnu'

- name: Test with conn to node 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class DatabendSession {
// txn
private String txnState;
private Boolean needSticky;
private Boolean needKeepAlive;

private Map<String, Object> additionalProperties = new HashMap<>();

Expand All @@ -50,16 +51,18 @@ public DatabendSession(
@JsonProperty("database") String database,
@JsonProperty("settings") Map<String, String> settings,
@JsonProperty("txn_state") String txnState,
@JsonProperty("need_sticky") Boolean needSticky) {
this.database = database;
@JsonProperty("need_sticky") Boolean needSticky,
@JsonProperty("need_keep_alive") Boolean needKeepAlive) {
this.database = database;
this.settings = settings;
this.txnState = txnState;
this.needSticky = needSticky != null ? needSticky : false;
this.needKeepAlive = needKeepAlive != null ? needKeepAlive: false;
}

// default
public static DatabendSession createDefault() {
return new DatabendSession(DEFAULT_DATABASE, null, null, false);
return new DatabendSession(DEFAULT_DATABASE, null, null, false, false);
}

public static Builder builder() {
Expand Down Expand Up @@ -87,6 +90,11 @@ public Boolean getNeedSticky() {
return needSticky;
}

@JsonProperty("need_keep_alive")
public Boolean getNeedKeepAlive() {
return needKeepAlive;
}

@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return additionalProperties;
Expand Down Expand Up @@ -153,7 +161,7 @@ public void setAutoCommit(boolean autoCommit) {
}

public DatabendSession build() {
return new DatabendSession(database, settings, txnState, false);
return new DatabendSession(database, settings, txnState, false, false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.databend.client;

import okhttp3.Cookie;
import okhttp3.CookieJar;
import okhttp3.HttpUrl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class GlobalCookieJar implements CookieJar {
private final Map<String, Cookie> cookieStore = new ConcurrentHashMap<>();

@Override
public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
for (Cookie cookie : cookies) {
cookieStore.put(cookie.name(), cookie);
}
}

@Override
public List<Cookie> loadForRequest(HttpUrl url) {
return new ArrayList<>(cookieStore.values());
}

public void add(Cookie cookie) {
cookieStore.put(cookie.name(), cookie);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import com.databend.jdbc.cloud.DatabendPresignClient;
import com.databend.jdbc.cloud.DatabendPresignClientV1;
import com.databend.jdbc.exception.DatabendFailedToPingException;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -39,13 +38,7 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -58,13 +51,18 @@
import java.util.zip.GZIPOutputStream;

import static com.databend.client.ClientSettings.*;
import static com.databend.client.DatabendClientV1.MEDIA_TYPE_JSON;
import static com.databend.client.DatabendClientV1.USER_AGENT_VALUE;
import static com.google.common.base.Preconditions.checkState;
import static java.net.URI.create;
import static java.util.Collections.newSetFromMap;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;


public class DatabendConnection implements Connection, FileTransferAPI, Consumer<DatabendSession> {
private static final Logger logger = Logger.getLogger(DatabendConnection.class.getPackage().getName());
public static final String LOGOUT_PATH = "/v1/session/logout";
private static FileHandler FILE_HANDLER;

private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -309,6 +307,7 @@ public void close()
for (Statement stmt : statements) {
stmt.close();
}
logout();
}

@Override
Expand Down Expand Up @@ -909,4 +908,67 @@ public void copyIntoTable(String database, String tableName, DatabendCopyParams
while (rs.next()) {
}
}

void logout() throws SQLException {
DatabendSession session = this.session.get();
if (session == null || !session.getNeedKeepAlive()) {
return;
}

int times = getMaxFailoverRetries() + 1;
List hosts = new LinkedList<String>();
String failReason = null;
String lastHost = null;

for (int i = 1; i <= times; i++) {
String candidateHost = this.driverUri.getUri("").toString();
// candidateHost = "http://localhost:8888";
hosts.add(candidateHost);
if (lastHost == candidateHost) {
break;
}
lastHost = candidateHost;
logger.log(Level.FINE, "retry " + i + " times to logout on " + candidateHost);

ClientSettings settings = this.makeClientSettings("", candidateHost).build();
HttpUrl url = HttpUrl.get(candidateHost).newBuilder().encodedPath(LOGOUT_PATH).build();
Request.Builder builder = new Request.Builder()
.url(url)
.header("User-Agent", USER_AGENT_VALUE);
if (settings.getAdditionalHeaders() != null) {
settings.getAdditionalHeaders().forEach(builder::addHeader);
}
if (session.getNeedSticky()) {
builder.addHeader(ClientSettings.X_DATABEND_ROUTE_HINT, uriRouteHint(candidateHost));
String lastNodeID = this.lastNodeID.get();
if (lastNodeID != null)
builder.addHeader(ClientSettings.X_DATABEND_STICKY_NODE, lastNodeID);
}
for (int j = 1; j <= 3; j++) {
Request request = builder.post(okhttp3.RequestBody.create(MEDIA_TYPE_JSON, "{}")).build();
try (Response response = httpClient.newCall(request).execute()) {
if (response.code() != 200) {
throw new SQLException("Error logout: code =" + response.code() + ", body = " + response.body());
}
return;
} catch (IOException e) {
System.out.println("e = " + e.getMessage());
if (e.getCause() instanceof ConnectException) {
if (failReason == null) {
failReason = e.getMessage();
}
try {
MILLISECONDS.sleep(j * 100);
} catch (InterruptedException e2) {
Thread.currentThread().interrupt();
return;
}
} else {
break;
}
}
}
}
throw new SQLException("Failover Retry Error executing query after retries on hosts " + hosts + ": " + failReason);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.databend.jdbc;

import com.databend.client.GlobalCookieJar;
import com.google.common.base.Splitter;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import okhttp3.Cookie;
import okhttp3.OkHttpClient;

import java.net.URI;
Expand Down Expand Up @@ -414,6 +416,10 @@ public Properties getProperties() {

public void setupClient(OkHttpClient.Builder builder) throws SQLException {
try {
GlobalCookieJar cookieJar = new GlobalCookieJar();
cookieJar.add(new Cookie.Builder().name("cookie_enabled").value("true").domain("not_used").build());
builder.cookieJar(cookieJar);

String password = PASSWORD.getValue(properties).orElse("");
if (!password.isEmpty()) {
builder.addInterceptor(basicAuthInterceptor(USER.getValue(properties).orElse(""), password));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.databend.jdbc;

import com.databend.client.GlobalCookieJar;
import okhttp3.Cookie;
import okhttp3.OkHttpClient;

import java.io.Closeable;
Expand Down Expand Up @@ -55,6 +57,9 @@ public Connection connect(String url, Properties info)

DatabendDriverUri uri = DatabendDriverUri.create(url, info);

GlobalCookieJar cookieJar = new GlobalCookieJar();
cookieJar.add(new Cookie.Builder().name("cookie_enabled").value("true").domain("not_used").build());

OkHttpClient.Builder builder = httpClient.newBuilder();
uri.setupClient(builder);
DatabendConnection connection = new DatabendConnection(uri, builder.build());
Expand Down
28 changes: 28 additions & 0 deletions databend-jdbc/src/test/java/com/databend/jdbc/TestTempTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.databend.jdbc;

import org.junit.Assert;
import org.testng.annotations.Test;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class TestTempTable {

@Test
public void testTempTable() throws SQLException {
try(Connection c1 = Utils.createConnection()) {
Statement statement= c1.createStatement();
statement.execute("create or replace temp table table1(i int)");
statement.execute("insert into table1 values (1), (2)");
statement.executeQuery("select * from table1");
ResultSet rs = statement.getResultSet();
Assert.assertEquals(true, rs.next());
Assert.assertEquals(1, rs.getInt(1));
Assert.assertEquals(true, rs.next());
Assert.assertEquals(2, rs.getInt(1));
Assert.assertEquals(false, rs.next());
}
}
}

0 comments on commit 044b5d1

Please sign in to comment.