Skip to content

Commit

Permalink
chore: refactor build stageAttachment
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Jan 17, 2024
1 parent ee3eeb3 commit 2861fb0
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public NullDisplay() {
private static class BinaryFormat
extends AbstractConnectionProperty<String> {
public BinaryFormat() {
super("binary_format", Optional.of("hex"), NOT_REQUIRED, ALLOWED, STRING_CONVERTER);
super("binary_format", Optional.of(""), NOT_REQUIRED, ALLOWED, STRING_CONVERTER);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private DatabendDriverUri(String url, Properties driverProperties)
this.presignedUrlDisabled = PRESIGNED_URL_DISABLED.getRequiredValue(properties);
this.copyPurge = COPY_PURGE.getValue(properties).orElse(true);
this.nullDisplay = NULL_DISPLAY.getValue(properties).orElse("\\N");
this.binaryFormat = BINARY_FORMAT.getValue(properties).orElse("hex");
this.binaryFormat = BINARY_FORMAT.getValue(properties).orElse("");
this.waitTimeSecs = WAIT_TIME_SECS.getRequiredValue(properties);
this.connectionTimeout = CONNECTION_TIMEOUT.getRequiredValue(properties);
this.queryTimeout = QUERY_TIMEOUT.getRequiredValue(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,7 @@ private StageAttachment uploadBatches() throws SQLException {
String fileName = saved.getName();
c.uploadStream(null, stagePrefix, fis, fileName, saved.length(), false);
String stagePath = "@~/" + stagePrefix + fileName;
Map<String, String> fileFormatOptions = new HashMap<>();
fileFormatOptions.put("binary_format", String.valueOf(c.binaryFormat()));
Map<String, String> copyOptions = new HashMap<>();
copyOptions.put("PURGE", String.valueOf(c.copyPurge()));
copyOptions.put("NULL_DISPLAY", String.valueOf(c.nullDisplay()));
StageAttachment attachment = new StageAttachment.Builder()
.setLocation(stagePath)
.setCopyOptions(copyOptions)
.setFileFormatOptions(fileFormatOptions)
.build();
StageAttachment attachment = buildStateAttachment(c, stagePath);
return attachment;
} catch (Exception e) {
throw new SQLException(e);
Expand All @@ -225,6 +216,38 @@ private StageAttachment uploadBatches() throws SQLException {
}
}

/**
* This method is used to build a StageAttachment object which represents a stage in Databend.
* A stage in Databend is a temporary storage area where data files are stored before being loaded into the Databend database.
*
* @param connection The DatabendConnection object which contains the connection details to the Databend database.
* @param stagePath The path of the stage in the Databend database.
* @return A StageAttachment object which contains the details of the stage.
*/
public static StageAttachment buildStateAttachment(DatabendConnection connection, String stagePath) {
Map<String, String> fileFormatOptions = new HashMap<>();
if (!Objects.equals(connection.binaryFormat(), "")) {
fileFormatOptions.put("binary_format", String.valueOf(connection.binaryFormat()));
}
Map<String, String> copyOptions = new HashMap<>();
copyOptions.put("PURGE", String.valueOf(connection.copyPurge()));
copyOptions.put("NULL_DISPLAY", String.valueOf(connection.nullDisplay()));
StageAttachment attachment;
if (fileFormatOptions.size() != 0) {
attachment = new StageAttachment.Builder()
.setLocation(stagePath)
.setCopyOptions(copyOptions)
.setFileFormatOptions(fileFormatOptions)
.build();
} else {
attachment = new StageAttachment.Builder()
.setLocation(stagePath)
.setCopyOptions(copyOptions)
.build();
}
return attachment;
}

/**
* delete stage file on stage attachment
*
Expand Down Expand Up @@ -874,12 +897,12 @@ public void setBinaryStream(int i, InputStream inputStream)
}
buffer.flush();
byte[] bytes = buffer.toByteArray();
if (connection().binaryFormat().equalsIgnoreCase("hex")) {
String hexString = bytesToHex(bytes);
batchInsertUtils.ifPresent(insertUtils -> insertUtils.setPlaceHolderValue(i, hexString));
} else {
if (connection().binaryFormat().equalsIgnoreCase("base64")) {
String base64String = bytesToBase64(bytes);
batchInsertUtils.ifPresent(insertUtils -> insertUtils.setPlaceHolderValue(i, base64String));
} else {
String hexString = bytesToHex(bytes);
batchInsertUtils.ifPresent(insertUtils -> insertUtils.setPlaceHolderValue(i, hexString));
}
} catch (IOException e) {
throw new SQLException("Error reading InputStream", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.databend.jdbc;

import com.databend.client.StageAttachment;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand All @@ -20,6 +22,10 @@

import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkState;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;

public class TestPrepareStatement {
private Connection createConnection()
throws SQLException {
Expand All @@ -42,12 +48,13 @@ public void setUp()
c.createStatement().execute("drop table if exists test_prepare_statement");
c.createStatement().execute("drop table if exists test_prepare_time");
c.createStatement().execute("drop table if exists objects_test1");
c.createStatement().execute("drop table if exists binary1");
c.createStatement().execute("create table test_prepare_statement (a int, b string)");
c.createStatement().execute("create table test_prepare_time(a DATE, b TIMESTAMP)");
// json data
c.createStatement().execute("CREATE TABLE IF NOT EXISTS objects_test1(id TINYINT, obj VARIANT, d TIMESTAMP, s String, arr ARRAY(INT64)) Engine = Fuse");
// Binary data
c.createStatement().execute("create table binary1 (a binary);");
c.createStatement().execute("create table IF NOT EXISTS binary1 (a binary);");
}

@Test(groups = "IT")
Expand Down Expand Up @@ -422,4 +429,16 @@ public void testSetBlobNotNull() throws SQLException {
Assertions.assertEquals(1, result.length);
}
}

@Test
public void shouldBuildStageAttachmentWithFileFormatOptions() throws SQLException {
Connection conn = createConnection();
Assertions.assertEquals("", conn.unwrap(DatabendConnection.class).binaryFormat());
StageAttachment stageAttachment = DatabendPreparedStatement.buildStateAttachment((DatabendConnection) conn, "stagePath");

Assertions.assertFalse(stageAttachment.getFileFormatOptions().containsKey("binary_format"));
Assertions.assertTrue(stageAttachment.getFileFormatOptions().containsKey("type"));
Assertions.assertEquals("true", stageAttachment.getCopyOptions().get("PURGE"));
Assertions.assertEquals("\\N", stageAttachment.getCopyOptions().get("NULL_DISPLAY"));
}
}

0 comments on commit 2861fb0

Please sign in to comment.