Skip to content

Commit

Permalink
Merge pull request #155 from datafuselabs/feat/compress-uploadstream
Browse files Browse the repository at this point in the history
feat: support compress data when uploadStream
  • Loading branch information
hantmac authored Feb 18, 2024
2 parents 9feb8fa + ffc0810 commit d14a048
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import okhttp3.OkHttpClient;
import okhttp3.Request;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand Down Expand Up @@ -38,6 +40,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;

import static com.databend.client.ClientSettings.*;
import static com.databend.client.DatabendClientV1.*;
Expand Down Expand Up @@ -605,12 +608,33 @@ private Map<String, String> setAdditionalHeaders() {
}


/**
* Method to put data from a stream at a stage location. The data will be uploaded as one file. No
* splitting is done in this method.
*
* <p>Stream size must match the total size of data in the input stream unless compressData
* parameter is set to true.
*
* <p>caller is responsible for passing the correct size for the data in the stream and releasing
* the inputStream after the method is called.
*
* <p>Note this method is deprecated since streamSize is not required now. Keep the function
* signature for backward compatibility
*
* @param stageName stage name: e.g. ~ or table name or stage name
* @param destPrefix path prefix under which the data should be uploaded on the stage
* @param inputStream input stream from which the data will be uploaded
* @param destFileName destination file name to use
* @param fileSize data size in the stream
* @throws SQLException failed to put data from a stream at stage
*/
@Override
public void uploadStream(String stageName, String destPrefix, InputStream inputStream, String destFileName, long fileSize, boolean compressData)
throws SQLException {
// TODO(zhihanz) handle compress data
// remove / in the end of stage name
// remove / in the beginning of destPrefix and end of destPrefix
/*
remove / in the end of stage name
remove / in the beginning of destPrefix and end of destPrefix
*/
String s;
if (stageName == null) {
s = "~";
Expand All @@ -620,16 +644,30 @@ public void uploadStream(String stageName, String destPrefix, InputStream inputS
String p = destPrefix.replaceAll("^/", "").replaceAll("/$", "");
String dest = p + "/" + destFileName;
try {
InputStream dataStream = inputStream;
if (compressData) {
// Wrap the input stream with a GZIPOutputStream for compression
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
byte[] buffer = new byte[1024];
int len;
while ((len = inputStream.read(buffer)) != -1) {
gzipOutputStream.write(buffer, 0, len);
}
}
dataStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
fileSize = byteArrayOutputStream.size(); // Update the file size to the compressed size
}
if (this.driverUri.presignedUrlDisabled()) {
DatabendPresignClient cli = new DatabendPresignClientV1(httpClient, this.httpUri.toString());
cli.presignUpload(null, inputStream, s, p + "/", destFileName, fileSize, true);
cli.presignUpload(null, dataStream, s, p + "/", destFileName, fileSize, true);
} else {
logger.log(Level.FINE, "presign to @" + s + "/" + dest);
PresignContext ctx = PresignContext.getPresignContext(this, PresignContext.PresignMethod.UPLOAD, s, dest);
Headers h = ctx.getHeaders();
String presignUrl = ctx.getUrl();
DatabendPresignClient cli = new DatabendPresignClientV1(new OkHttpClient(), this.httpUri.toString());
cli.presignUpload(null, inputStream, h, presignUrl, fileSize, true);
cli.presignUpload(null, dataStream, h, presignUrl, fileSize, true);
}
} catch (JsonProcessingException e) {
System.out.println(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testFileTransfer()
String stageName = "test_stage";
DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class);
PresignContext.createStageIfNotExists(databendConnection, stageName);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), true);
downloaded = databendConnection.downloadStream(stageName, "jdbc/test/test.csv", false);
byte[] arr = streamToByteArray(downloaded);
Assert.assertEquals(arr.length, f.length());
Expand All @@ -137,7 +137,7 @@ public void testFileTransferThroughAPI() {
String stageName = "test_stage";
DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class);
PresignContext.createStageIfNotExists(databendConnection, stageName);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), false);
databendConnection.uploadStream(stageName, "jdbc/test/", fileInputStream, "test.csv", f.length(), true);
InputStream downloaded = databendConnection.downloadStream(stageName, "jdbc/test/test.csv", false);
byte[] arr = streamToByteArray(downloaded);
Assert.assertEquals(arr.length, f.length());
Expand All @@ -157,7 +157,7 @@ public void testCopyInto() {
String stageName = "test_stage";
DatabendConnection databendConnection = connection.unwrap(DatabendConnection.class);
PresignContext.createStageIfNotExists(databendConnection, stageName);
databendConnection.uploadStream(stageName, "jdbc/c2/", fileInputStream, "complex.csv", f.length(), false);
databendConnection.uploadStream(stageName, "jdbc/c2/", fileInputStream, "complex.csv", f.length(), true);
fileInputStream.close();
DatabendStage s = DatabendStage.builder().stageName(stageName).path("jdbc/c2/").build();
DatabendCopyParams p = DatabendCopyParams.builder().setPattern("complex.csv").setDatabaseTableName("copy_into").setDatabendStage(s).build();
Expand Down

0 comments on commit d14a048

Please sign in to comment.