Skip to content

Commit

Permalink
feat: support binary and binary_format options
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Jan 16, 2024
1 parent 3a7e951 commit f59ab1c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public enum DatabendDataType {
TUPLE(Types.OTHER, DatabendTypes.TUPLE, false, 0, false, "Tuple"),
VARIANT(Types.OTHER, DatabendTypes.VARIANT, false, 0, false, "Variant", "Json"),

BINARY(Types.BINARY, DatabendTypes.BINARY, false, 0, false, "Binary"),

NULL(Types.NULL, DatabendTypes.NULL, false, 0, false, "NULL"),
;

Expand Down Expand Up @@ -115,6 +117,8 @@ public static DatabendDataType getByTypeName(String typeName) {
return MAP;
} else if (startsWithIgnoreCase(typeName, DatabendTypes.TUPLE)) {
return TUPLE;
} else if (startsWithIgnoreCase(typeName, DatabendTypes.BINARY)) {
return BINARY;
}
return NULL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

package com.databend.client.data;

public final class DatabendTypes
{
public final class DatabendTypes {
public static final String NULL = "null";
public static final String NULLABLE = "nullable";
public static final String BOOLEAN = "boolean";
Expand All @@ -40,6 +39,7 @@ public final class DatabendTypes
public static final String MAP = "map";
public static final String BITMAP = "bitmap";
public static final String VARIANT = "variant";
public static final String BINARY = "binary";
public static final String VARIANT_ARRAY = "variantarray";
public static final String VARIANT_OBJECT = "variantobject";
public static final String INTERVAL = "interval";
Expand Down
9 changes: 5 additions & 4 deletions databend-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-simple</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class ConnectionProperties {
public static final ConnectionProperty<Boolean> PRESIGNED_URL_DISABLED = new PresignedUrlDisabled();
public static final ConnectionProperty<Boolean> COPY_PURGE = new CopyPurge();
public static final ConnectionProperty<String> NULL_DISPLAY = new NullDisplay();
public static final ConnectionProperty<String> BINARY_FORMAT = new BinaryFormat();
public static final ConnectionProperty<Integer> WAIT_TIME_SECS = new WaitTimeSecs();

public static final ConnectionProperty<Integer> MAX_ROWS_IN_BUFFER = new MaxRowsInBuffer();
Expand Down Expand Up @@ -110,6 +111,13 @@ public NullDisplay() {
}
}

private static class BinaryFormat
extends AbstractConnectionProperty<String> {
public BinaryFormat() {
super("binary_format", Optional.of("hex"), NOT_REQUIRED, ALLOWED, STRING_CONVERTER);
}
}

private static class QueryTimeout
extends AbstractConnectionProperty<Integer> {
public QueryTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ public String nullDisplay() {
return this.driverUri.nullDisplay();
}

public String binaryFormat() {
return this.driverUri.binaryFormat();
}

public PaginationOptions getPaginationOptions() {
PaginationOptions.Builder builder = PaginationOptions.builder();
builder.setWaitTimeSecs(this.driverUri.getWaitTimeSecs());
Expand Down Expand Up @@ -570,8 +574,7 @@ public void uploadStream(String stageName, String destPrefix, InputStream inputS
// For datax batch insert test, do not throw exception
throw new SQLException(e);
} catch (IOException e) {
System.out.println(e.getMessage());
throw new SQLException("failed to upload input stream", e);
logger.warning("failed to upload input stream" + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class DatabendDriverUri {
private final boolean useSecureConnection;
private final boolean copyPurge;
private final String nullDisplay;
private final String binaryFormat;
private final String database;
private final boolean presignedUrlDisabled;
private final Integer connectionTimeout;
Expand All @@ -64,6 +65,7 @@ private DatabendDriverUri(String url, Properties driverProperties)
this.presignedUrlDisabled = PRESIGNED_URL_DISABLED.getRequiredValue(properties);
this.copyPurge = COPY_PURGE.getValue(properties).orElse(true);
this.nullDisplay = NULL_DISPLAY.getValue(properties).orElse("\\N");
this.binaryFormat = BINARY_FORMAT.getValue(properties).orElse("hex");
this.waitTimeSecs = WAIT_TIME_SECS.getRequiredValue(properties);
this.connectionTimeout = CONNECTION_TIMEOUT.getRequiredValue(properties);
this.queryTimeout = QUERY_TIMEOUT.getRequiredValue(properties);
Expand Down Expand Up @@ -247,6 +249,10 @@ public String nullDisplay() {
return nullDisplay;
}

public String binaryFormat() {
return binaryFormat;
}

public Integer getConnectionTimeout() {
return connectionTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.Reader;
import java.io.*;
import java.math.BigDecimal;
import java.net.URL;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -204,10 +201,15 @@ private StageAttachment uploadBatches() throws SQLException {
String fileName = saved.getName();
c.uploadStream(null, stagePrefix, fis, fileName, saved.length(), false);
String stagePath = "@~/" + stagePrefix + fileName;
Map<String, String> fileFormatOptions = new HashMap<>();
fileFormatOptions.put("BINARY_FORMAT", String.valueOf(c.binaryFormat()));
Map<String, String> copyOptions = new HashMap<>();
copyOptions.put("PURGE", String.valueOf(c.copyPurge()));
copyOptions.put("NULL_DISPLAY", String.valueOf(c.nullDisplay()));
StageAttachment attachment = new StageAttachment.Builder().setLocation(stagePath).setCopyOptions(copyOptions)
StageAttachment attachment = new StageAttachment.Builder()
.setLocation(stagePath)
.setCopyOptions(copyOptions)
.setFileFormatOptions(fileFormatOptions)
.build();
return attachment;
} catch (Exception e) {
Expand Down Expand Up @@ -714,9 +716,13 @@ public void setRef(int i, Ref ref)
}

@Override
public void setBlob(int i, Blob blob)
public void setBlob(int i, Blob x)
throws SQLException {
throw new SQLFeatureNotSupportedException("PreparedStatement", "setBlob");
if (x != null) {
setBinaryStream(i, x.getBinaryStream());
} else {
setNull(i, Types.BLOB);
}
}

@Override
Expand Down Expand Up @@ -854,7 +860,38 @@ public void setAsciiStream(int i, InputStream inputStream)
@Override
public void setBinaryStream(int i, InputStream inputStream)
throws SQLException {
throw new SQLFeatureNotSupportedException("PreparedStatement", "setBinaryStream");
checkOpen();
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[1024];
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
}
buffer.flush();
byte[] bytes = buffer.toByteArray();
if (connection().binaryFormat().equalsIgnoreCase("hex")) {
String hexString = bytesToHex(bytes);
batchInsertUtils.ifPresent(insertUtils -> insertUtils.setPlaceHolderValue(i, hexString));
} else {
String base64String = bytesToBase64(bytes);
batchInsertUtils.ifPresent(insertUtils -> insertUtils.setPlaceHolderValue(i, base64String));
}
} catch (IOException e) {
throw new SQLException("Error reading InputStream", e);
}
}

private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}

private static String bytesToBase64(byte[] bytes) {
return Base64.getEncoder().encodeToString(bytes);
}

@Override
Expand All @@ -878,7 +915,7 @@ public void setClob(int i, Reader reader)
@Override
public void setBlob(int i, InputStream inputStream)
throws SQLException {
throw new SQLFeatureNotSupportedException("PreparedStatement", "setBlob");
setBinaryStream(i, inputStream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
Expand All @@ -15,6 +17,7 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.ThreadLocalRandom;

public class TestPrepareStatement {
Expand Down Expand Up @@ -43,6 +46,8 @@ public void setUp()
c.createStatement().execute("create table test_prepare_time(a DATE, b TIMESTAMP)");
// json data
c.createStatement().execute("CREATE TABLE IF NOT EXISTS objects_test1(id TINYINT, obj VARIANT, d TIMESTAMP, s String, arr ARRAY(INT64)) Engine = Fuse");
// Binary data
c.createStatement().execute("create table binary1 (a binary);");
}

@Test(groups = "IT")
Expand Down Expand Up @@ -401,4 +406,20 @@ public void testAllPreparedStatement() throws SQLException {
System.out.println(r3.getString(2));
}
}

@Test
public void testSetBlobNotNull() throws SQLException {
String sql = "insert into binary1 values (?)";
Connection conn = createConnection();
// Create a Blob
String blobData = "blob data";
InputStream blobInputStream = new ByteArrayInputStream(blobData.getBytes());
try (PreparedStatement statement = conn.prepareStatement(sql)) {
statement.setBlob(1, blobInputStream);
statement.addBatch();
int[] result = statement.executeBatch();
System.out.println(result);
// Assertions.assertEquals(1, result.length);
}
}
}
5 changes: 4 additions & 1 deletion docs/Connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ The above URL can be used as follows
String url="jdbc:databend://databend:secret@0.0.0.0:8000/hello_databend"
Connection conn=DriverManager.getConnection(url);
```
If you are using [Databend Cloud](https://app.databend.com/), you can get a warehouse DSN according to [this doc](https://databend.rs/cloud/using-databend-cloud/warehouses#connecting).

If you are using [Databend Cloud](https://app.databend.com/), you can get a warehouse DSN according
to [this doc](https://databend.rs/cloud/using-databend-cloud/warehouses#connecting).
Then the above URL within warehouse DSN can be used as follows:

```java
Expand Down Expand Up @@ -86,3 +88,4 @@ String url="jdbc:databend://databend:secret@0.0.0.0:8000/hello_databend";
| connection_timeout | okhttp connection_timeout param | 0 | jdbc:databend://0.0.0.0:8000/default?connection_timeout=100000 |
| query_timeout | time that you wait a SQL execution | 90 | jdbc:databend://0.0.0.0:8000/default?query_timeout=120 |
| null_display | null value display | \N | jdbc:databend://0.0.0.0:8000/hello_databend?null_display=null |
| binary_format | binary format, support hex and base64 | hex | jdbc:databend://0.0.0.0:8000/default?binary_format=hex |

0 comments on commit f59ab1c

Please sign in to comment.