From e620fa67931d1bd0d39c06f4513cb4408712f15e Mon Sep 17 00:00:00 2001 From: hantmac Date: Sun, 18 Feb 2024 15:23:54 +0800 Subject: [PATCH 1/3] feat: support compress data when uploadStream --- .../com/databend/jdbc/DatabendConnection.java | 49 +++++++++++++++++-- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 21a853fd..9ff0ea63 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -11,6 +11,8 @@ import okhttp3.OkHttpClient; import okhttp3.Request; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.zip.GZIPOutputStream; import static com.databend.client.ClientSettings.*; import static com.databend.client.DatabendClientV1.*; @@ -605,12 +608,34 @@ private Map setAdditionalHeaders() { } + /** + * Method to put data from a stream at a stage location. The data will be uploaded as one file. No + * splitting is done in this method. + * + *

Stream size must match the total size of data in the input stream unless compressData + * parameter is set to true. + * + *

caller is responsible for passing the correct size for the data in the stream and releasing + * the inputStream after the method is called. + * + *

Note this method is deprecated since streamSize is not required now. Keep the function + * signature for backward compatibility + * + * @param stageName stage name: e.g. ~ or table name or stage name + * @param destPrefix path prefix under which the data should be uploaded on the stage + * @param inputStream input stream from which the data will be uploaded + * @param destFileName destination file name to use + * @param fileSize data size in the stream + * @throws SQLException failed to put data from a stream at stage + */ @Override public void uploadStream(String stageName, String destPrefix, InputStream inputStream, String destFileName, long fileSize, boolean compressData) throws SQLException { - // TODO(zhihanz) handle compress data - // remove / in the end of stage name - // remove / in the beginning of destPrefix and end of destPrefix + // TODO(hantmac) handle compress data + /* + remove / in the end of stage name + remove / in the beginning of destPrefix and end of destPrefix + */ String s; if (stageName == null) { s = "~"; @@ -620,16 +645,30 @@ public void uploadStream(String stageName, String destPrefix, InputStream inputS String p = destPrefix.replaceAll("^/", "").replaceAll("/$", ""); String dest = p + "/" + destFileName; try { + InputStream dataStream = inputStream; + if (compressData) { + // Wrap the input stream with a GZIPOutputStream for compression + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + byte[] buffer = new byte[1024]; + int len; + while ((len = inputStream.read(buffer)) != -1) { + gzipOutputStream.write(buffer, 0, len); + } + } + dataStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + fileSize = byteArrayOutputStream.size(); // Update the file size to the compressed size + } if (this.driverUri.presignedUrlDisabled()) { DatabendPresignClient cli = new DatabendPresignClientV1(httpClient, this.httpUri.toString()); - cli.presignUpload(null, inputStream, s, p + "/", destFileName, fileSize, true); + cli.presignUpload(null, dataStream, s, p + "/", destFileName, fileSize, true); } else { logger.log(Level.FINE, "presign to @" + s + "/" + dest); PresignContext ctx = PresignContext.getPresignContext(this, PresignContext.PresignMethod.UPLOAD, s, dest); Headers h = ctx.getHeaders(); String presignUrl = ctx.getUrl(); DatabendPresignClient cli = new DatabendPresignClientV1(new OkHttpClient(), this.httpUri.toString()); - cli.presignUpload(null, inputStream, h, presignUrl, fileSize, true); + cli.presignUpload(null, dataStream, h, presignUrl, fileSize, true); } } catch (JsonProcessingException e) { System.out.println(e.getMessage()); From 6cc9b6c4ca88443f6502a56a77251ab9aee9eeab Mon Sep 17 00:00:00 2001 From: hantmac Date: Sun, 18 Feb 2024 15:30:12 +0800 Subject: [PATCH 2/3] fix --- .../src/test/java/com/databend/jdbc/TestFileTransfer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java b/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java index cf713ff1..726f34c6 100644 --- a/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java +++ b/databend-jdbc/src/test/java/com/databend/jdbc/TestFileTransfer.java @@ -114,7 +114,7 @@ public void testFileTransfer() String stageName = "test_stage"; DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class); PresignContext.createStageIfNotExists(databendConnection, stageName); - databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false); + databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), true); downloaded = databendConnection.downloadStream(stageName, "jdbc/test/test.csv", false); byte[] arr = streamToByteArray(downloaded); Assert.assertEquals(arr.length, f.length()); @@ -137,7 +137,7 @@ public void testFileTransferThroughAPI() { String stageName = "test_stage"; DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class); PresignContext.createStageIfNotExists(databendConnection, stageName); - databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false); + databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), true); InputStream downloaded = databendConnection.downloadStream(stageName, "jdbc/test/test.csv", false); byte[] arr = streamToByteArray(downloaded); Assert.assertEquals(arr.length, f.length()); @@ -157,7 +157,7 @@ public void testCopyInto() { String stageName = "test_stage"; DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class); PresignContext.createStageIfNotExists(databendConnection, stageName); - databendConnection.uploadStream(stageName, "jdbc/c2/", fileInputStream, "complex.csv", f.length(), false); + databendConnection.uploadStream(stageName, "jdbc/c2/", fileInputStream, "complex.csv", f.length(), true); fileInputStream.close(); DatabendStage s = DatabendStage.builder().stageName(stageName).path("jdbc/c2/").build(); DatabendCopyParams p = DatabendCopyParams.builder().setPattern("complex.csv").setDatabaseTableName("copy_into").setDatabendStage(s).build(); From ffc0810a309598806a4d870a2eceb36af30e9fb7 Mon Sep 17 00:00:00 2001 From: hantmac Date: Sun, 18 Feb 2024 15:35:02 +0800 Subject: [PATCH 3/3] fix --- .../src/main/java/com/databend/jdbc/DatabendConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java index 9ff0ea63..052e8271 100644 --- a/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java +++ b/databend-jdbc/src/main/java/com/databend/jdbc/DatabendConnection.java @@ -631,7 +631,6 @@ private Map setAdditionalHeaders() { @Override public void uploadStream(String stageName, String destPrefix, InputStream inputStream, String destFileName, long fileSize, boolean compressData) throws SQLException { - // TODO(hantmac) handle compress data /* remove / in the end of stage name remove / in the beginning of destPrefix and end of destPrefix