Skip to content

Commit

Permalink
SNOW-1757822: Allow JDBC to handle ZSTD decompression (#1932)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dheyman-1 authored Oct 28, 2024
1 parent 3d3f401 commit 83e5849
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 37 deletions.
4 changes: 2 additions & 2 deletions FIPS/scripts/check_content.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/bin/bash -e

# scripts used to check if all dependency is shaded into snowflake internal path
# scripts used to check if all dependencies are shaded into snowflake internal path

set -o pipefail

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

if jar tvf $DIR/../target/snowflake-jdbc-fips.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types; then
if jar tvf $DIR/../target/snowflake-jdbc-fips.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types | grep -v -E "^com/github/" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes class not under the snowflake namespace"
exit 1
fi
6 changes: 3 additions & 3 deletions ci/scripts/check_content.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ set -o pipefail

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"

if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types; then
if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -v -E "^(net|com)/snowflake" | grep -v -E "(com|net)/\$" | grep -v -E "^META-INF" | grep -v -E "^mozilla" | grep -v -E "^com/sun/jna" | grep -v com/sun/ | grep -v mime.types | grep -v -E "^com/github/" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes class not under the snowflake namespace"
exit 1
fi

if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -E "^META-INF/versions/.*.class" | grep -v -E "^META-INF/versions/.*/(net|com)/snowflake"; then
echo "[ERROR] JDBC jar includes multi release classes not under the snowflake namespace"
if jar tvf $DIR/../../target/snowflake-jdbc${package_modifier}.jar | awk '{print $8}' | grep -E "^META-INF/versions/.*.class" | grep -v -E "^META-INF/versions/.*/(net|com)/snowflake" | grep -v -E "^META-INF/versions/.*/com/github" | grep -v -E "^aix/" | grep -v -E "^darwin/" | grep -v -E "^freebsd/" | grep -v -E "^linux/" | grep -v -E "^win/"; then
echo "[ERROR] JDBC jar includes multi-release classes not under the snowflake namespace"
exit 1
fi
5 changes: 0 additions & 5 deletions linkage-checker-exclusion-rules.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
<Source><Package name="org.apache.commons.compress.compressors"/></Source>
<Reason>Optional</Reason>
</LinkageError>
<LinkageError>
<Target><Package name="com.github.luben.zstd"/></Target>
<Source><Package name="org.apache.commons.compress.compressors"/></Source>
<Reason>Optional</Reason>
</LinkageError>
<LinkageError>
<Target><Package name="com.google.appengine.api.urlfetch"/></Target>
<Source><Package name="com.google.api.client.extensions.appengine"/></Source>
Expand Down
10 changes: 10 additions & 0 deletions parent-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<apache.commons.text.version>1.10.0</apache.commons.text.version>
<apache.httpclient.version>4.5.14</apache.httpclient.version>
<apache.httpcore.version>4.4.16</apache.httpcore.version>
<zstd-jni.version>1.5.6-5</zstd-jni.version>
<arrow.version>17.0.0</arrow.version>
<asm.version>9.3</asm.version>
<avro.version>1.8.1</avro.version>
Expand Down Expand Up @@ -327,6 +328,11 @@
<artifactId>httpcore</artifactId>
<version>${apache.httpcore.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
Expand Down Expand Up @@ -650,6 +656,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package net.snowflake.client.jdbc;

import static net.snowflake.client.core.Constants.MB;
import static net.snowflake.common.core.FileCompressionType.GZIP;
import static net.snowflake.common.core.FileCompressionType.ZSTD;

import com.github.luben.zstd.ZstdInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;
import net.snowflake.common.core.SqlState;
import org.apache.http.Header;

class CompressedStreamFactory {

private static final int STREAM_BUFFER_SIZE = MB;

/**
* Determine the format of the response, if it is not either plain text or gzip, raise an error.
*/
public InputStream createBasedOnEncodingHeader(InputStream is, Header encoding)
throws IOException, SnowflakeSQLException {
if (encoding != null) {
if (GZIP.name().equalsIgnoreCase(encoding.getValue())) {
return new GZIPInputStream(is, STREAM_BUFFER_SIZE);
} else if (ZSTD.name().equalsIgnoreCase(encoding.getValue())) {
return new ZstdInputStream(is);
} else {
throw new SnowflakeSQLException(
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
"Exception: unexpected compression got " + encoding.getValue());
}
} else {
return DefaultResultStreamProvider.detectGzipAndGetStream(is);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package net.snowflake.client.jdbc;

import static net.snowflake.client.core.Constants.MB;

import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
Expand Down Expand Up @@ -34,7 +32,11 @@ public class DefaultResultStreamProvider implements ResultStreamProvider {
// SSE-C algorithm value
private static final String SSE_C_AES = "AES256";

private static final int STREAM_BUFFER_SIZE = MB;
private CompressedStreamFactory compressedStreamFactory;

public DefaultResultStreamProvider() {
this.compressedStreamFactory = new CompressedStreamFactory();
}

@Override
public InputStream getInputStream(ChunkDownloadContext context) throws Exception {
Expand Down Expand Up @@ -71,9 +73,11 @@ public InputStream getInputStream(ChunkDownloadContext context) throws Exception

InputStream inputStream;
final HttpEntity entity = response.getEntity();
Header encoding = response.getFirstHeader("Content-Encoding");
try {
// read the chunk data
inputStream = detectContentEncodingAndGetInputStream(response, entity.getContent());
// create stream based on compression type
inputStream =
compressedStreamFactory.createBasedOnEncodingHeader(entity.getContent(), encoding);
} catch (Exception ex) {
logger.error("Failed to decompress data: {}", response);

Expand Down Expand Up @@ -144,28 +148,6 @@ else if (context.getQrmk() != null) {
return response;
}

private InputStream detectContentEncodingAndGetInputStream(HttpResponse response, InputStream is)
throws IOException, SnowflakeSQLException {
InputStream inputStream = is; // Determine the format of the response, if it is not
// either plain text or gzip, raise an error.
Header encoding = response.getFirstHeader("Content-Encoding");
if (encoding != null) {
if ("gzip".equalsIgnoreCase(encoding.getValue())) {
/* specify buffer size for GZIPInputStream */
inputStream = new GZIPInputStream(is, STREAM_BUFFER_SIZE);
} else {
throw new SnowflakeSQLException(
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
"Exception: unexpected compression got " + encoding.getValue());
}
} else {
inputStream = detectGzipAndGetStream(is);
}

return inputStream;
}

public static InputStream detectGzipAndGetStream(InputStream is) throws IOException {
PushbackInputStream pb = new PushbackInputStream(is, 2);
byte[] signature = new byte[2];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package net.snowflake.client.jdbc;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.junit.Test;

public class CompressedStreamFactoryTest {

private final CompressedStreamFactory factory = new CompressedStreamFactory();

@Test
public void testDetectContentEncodingAndGetInputStream_Gzip() throws Exception {
// Original data to compress and validate
String originalData = "Some data in GZIP";

// Creating encoding header
Header encodingHeader = new BasicHeader("Content-Encoding", "gzip");

// Creating a gzip byte array using GZIPOutputStream
byte[] gzipData;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(originalData.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.close(); // close to flush and finish the compression
gzipData = byteArrayOutputStream.toByteArray();
}

// Mocking input stream with the gzip data
InputStream gzipStream = new ByteArrayInputStream(gzipData);

// Call the private method using reflection
InputStream resultStream = factory.createBasedOnEncodingHeader(gzipStream, encodingHeader);

// Decompress and validate the data matches original
assertTrue(resultStream instanceof GZIPInputStream);
String decompressedData = IOUtils.toString(resultStream, StandardCharsets.UTF_8);
assertEquals(originalData, decompressedData);
}

@Test
public void testDetectContentEncodingAndGetInputStream_Zstd() throws Exception {
// Original data to compress and validate
String originalData = "Some data in ZSTD";

// Creating encoding header
Header encodingHeader = new BasicHeader("Content-Encoding", "zstd");

// Creating a zstd byte array using ZstdOutputStream
byte[] zstdData;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ZstdOutputStream zstdOutputStream = new ZstdOutputStream(byteArrayOutputStream)) {
zstdOutputStream.write(originalData.getBytes(StandardCharsets.UTF_8));
zstdOutputStream.close(); // close to flush and finish the compression
zstdData = byteArrayOutputStream.toByteArray();
}

// Mocking input stream with the zstd data
InputStream zstdStream = new ByteArrayInputStream(zstdData);

// Call the private method using reflection
InputStream resultStream = factory.createBasedOnEncodingHeader(zstdStream, encodingHeader);

// Decompress and validate the data matches original
assertTrue(resultStream instanceof ZstdInputStream);
String decompressedData = IOUtils.toString(resultStream, StandardCharsets.UTF_8);
assertEquals(originalData, decompressedData);
}
}
6 changes: 6 additions & 0 deletions thin_public_pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<slf4j.version>2.0.13</slf4j.version>
<threeten.version>1.6.9</threeten.version>
<zstd-jni.version>1.5.6-5</zstd-jni.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -267,6 +268,11 @@
<artifactId>jsoup</artifactId>
<version>${jsoup.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down

0 comments on commit 83e5849

Please sign in to comment.