Skip to content

Commit

Permalink
Add ZSTD compression for snapshotting (#2996)
Browse files Browse the repository at this point in the history
Changes:

- Added ZSTD compressor for snapshotting
- 2 JSON repository settings:
  - readonly
  - compression
were moved into the BlobStoreRepository class
and removed from other repos classes where they
were used.

Signed-off-by: Andrey Pleskach <ples@aiven.io>
  • Loading branch information
willyborankin authored Jun 1, 2023
1 parent e3740f7 commit 4df347c
Show file tree
Hide file tree
Showing 30 changed files with 729 additions and 459 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.3.0
- Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0
- Bump `io.opencensus:opencensus-api` from 0.18.0 to 0.31.1 ([#7291](https://github.com/opensearch-project/OpenSearch/pull/7291))
- Add `com.github.luben:zstd-jni` version 1.5.5-3 ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Changed
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
Expand All @@ -50,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681))
- Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))

### Deprecated

Expand Down
3 changes: 3 additions & 0 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,6 @@ bytebuddy = 1.14.3

# benchmark dependencies
jmh = 1.35

# compression
zstd = 1.5.5-3
3 changes: 0 additions & 3 deletions distribution/tools/plugin-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ thirdPartyAudit.ignoreViolations(
)

thirdPartyAudit.ignoreMissingClasses(
'com.github.luben.zstd.BufferPool',
'com.github.luben.zstd.ZstdInputStream',
'com.github.luben.zstd.ZstdOutputStream',
'org.brotli.dec.BrotliInputStream',
'org.objectweb.asm.AnnotationVisitor',
'org.objectweb.asm.Attribute',
Expand Down
1 change: 0 additions & 1 deletion modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ public static final class Repository {
MAX_CHUNK_SIZE,
Property.NodeScope
);
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
}

private final BlobPath basePath;
Expand All @@ -118,7 +116,7 @@ public AzureRepository(
) {
super(
metadata,
Repository.COMPRESS_SETTING.get(metadata.settings()),
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
Expand All @@ -142,8 +140,8 @@ public AzureRepository(
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
// For secondary_only setting, the repository should be read only
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
if (READONLY_SETTING.exists(metadata.settings())) {
this.readonly = READONLY_SETTING.get(metadata.settings());
} else {
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.function.Function;

import static org.opensearch.common.settings.Setting.Property;
import static org.opensearch.common.settings.Setting.boolSetting;
import static org.opensearch.common.settings.Setting.byteSizeSetting;
import static org.opensearch.common.settings.Setting.simpleString;

Expand All @@ -70,7 +69,6 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {

static final Setting<String> BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic);
static final Setting<String> BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic);
static final Setting<Boolean> COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic);
static final Setting<ByteSizeValue> CHUNK_SIZE = byteSizeSetting(
"chunk_size",
MAX_CHUNK_SIZE,
Expand All @@ -94,7 +92,14 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
super(
metadata,
getSetting(COMPRESS_SETTING, metadata),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata)
);
this.storageService = storageService;

String basePath = BASE_PATH.get(metadata.settings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HdfsRepository(
final ClusterService clusterService,
final RecoverySettings recoverySettings
) {
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);

/**
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
* files that are already compressed by default. Defaults to false.
*/
static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false);

/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
Expand Down
1 change: 0 additions & 1 deletion plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ thirdPartyAudit {
'org.slf4j.LoggerFactory',
'org.slf4j.spi.LocationAwareLogger',

'com.github.luben.zstd.Zstd',
'com.google.protobuf.nano.CodedOutputByteBufferNano',
'com.google.protobuf.nano.MessageNano',
'com.jcraft.jzlib.Deflater',
Expand Down
2 changes: 1 addition & 1 deletion sandbox/plugins/custom-codecs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ opensearchplugin {
}

dependencies {
api "com.github.luben:zstd-jni:1.5.5-1"
api "com.github.luben:zstd-jni:${versions.zstd}"
}

yamlRestTest.enabled = false;
Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ dependencies {
api "com.google.protobuf:protobuf-java:${versions.protobuf}"
api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}"

//zstd
api "com.github.luben:zstd-jni:${versions.zstd}"

testImplementation(project(":test:framework")) {
// tests use the locally compiled version of server
exclude group: 'org.opensearch', module: 'server'
Expand Down
1 change: 1 addition & 0 deletions server/licenses/zstd-jni-1.5.5-3.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
488dd9b15c9e8cf87d857f65f5cd6359c2853381
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@
*/
public class CompressorFactory {

public static final Compressor COMPRESSOR = new DeflateCompressor();
public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();

@Deprecated
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;

public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();

public static final Compressor NONE_COMPRESSOR = new NoneCompressor();

public static boolean isCompressed(BytesReference bytes) {
return compressor(bytes) != null;
Expand All @@ -61,6 +68,9 @@ public static Compressor compressor(BytesReference bytes) {
// as a xcontent, we have a problem
assert XContentHelper.xContentType(bytes) == null;
return COMPRESSOR;
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
assert XContentHelper.xContentType(bytes) == null;
return ZSTD_COMPRESSOR;
}

XContentType contentType = XContentHelper.xContentType(bytes);
Expand All @@ -81,7 +91,6 @@ private static boolean isAncient(BytesReference bytes) {

/**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
* @throws NullPointerException a NullPointerException will be thrown when bytes is null
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

/**
* Supported compression types
*
* @opensearch.internal
*/
public enum CompressorType {

DEFLATE {
@Override
public Compressor compressor() {
return CompressorFactory.DEFLATE_COMPRESSOR;
}
},

ZSTD {
@Override
public Compressor compressor() {
return CompressorFactory.ZSTD_COMPRESSOR;
}
},

NONE {
@Override
public Compressor compressor() {
return CompressorFactory.NONE_COMPRESSOR;
}
};

public abstract Compressor compressor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,9 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException {
* @return decompressing stream
*/
public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
int len = 0;
while (len < headerBytes.length) {
final int read = in.read(headerBytes, len, headerBytes.length - len);
if (read == -1) {
break;
}
len += read;
}
if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) {
final byte[] header = in.readNBytes(HEADER.length);

if (Arrays.equals(header, HEADER) == false) {
throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
}

Expand Down Expand Up @@ -252,9 +245,11 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti
} finally {
inflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}

// Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
Expand All @@ -271,8 +266,10 @@ public BytesReference compress(BytesReference bytesReference) throws IOException
} finally {
deflater.reset();
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
try {
return buffer.copyBytes();
} finally {
buffer.reset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.compress;

import org.opensearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* {@link Compressor} no compressor implementation.
*
* @opensearch.internal
*/
public class NoneCompressor implements Compressor {
@Override
public boolean isCompressed(BytesReference bytes) {
return false;
}

@Override
public int headerLength() {
return 0;
}

@Override
public InputStream threadLocalInputStream(InputStream in) throws IOException {
return in;
}

@Override
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
return out;
}

@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
return bytesReference;
}

@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
return bytesReference;
}

}
Loading

0 comments on commit 4df347c

Please sign in to comment.