Skip to content

Commit

Permalink
Add faster scaling composite hash value encoding for remote path (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#13251)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 23, 2024
1 parent 042ba81 commit 090f982
Show file tree
Hide file tree
Showing 13 changed files with 742 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -229,7 +229,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED))
.get();
createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));
Client client = client();
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX))
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX))
.get();

restoreSnapshotResponse = client.admin()
Expand All @@ -272,13 +272,13 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
.get();
assertEquals(RestStatus.ACCEPTED, restoreSnapshotResponse.status());
ensureGreen(restoredIndexName1version2);
validatePathType(restoredIndexName1version2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
validatePathType(restoredIndexName1version2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A_COMPOSITE_1);

// Create index with cluster setting cluster.remote_store.index.path.prefix.type as hashed_prefix.
// Create index with cluster setting cluster.remote_store.index.path.type as hashed_prefix.
indexSettings = getIndexSettings(1, 0).build();
createIndex(indexName2, indexSettings);
ensureGreen(indexName2);
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A_COMPOSITE_1);

// Validating that custom data has not changed for indexes which were created before the cluster setting got updated
validatePathType(indexName1, PathType.FIXED);
Expand All @@ -294,7 +294,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
client(clusterManagerNode).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), PathType.FIXED))
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED))
.get();

// Close index 2
Expand All @@ -309,7 +309,7 @@ public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
ensureGreen(indexName2);

// Validating that custom data has not changed for testindex2 which was created before the cluster setting got updated
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A);
validatePathType(indexName2, PathType.HASHED_PREFIX, PathHashAlgorithm.FNV_1A_COMPOSITE_1);
}

private void validatePathType(String index, PathType pathType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,

IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,

Expand All @@ -730,9 +731,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT,
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,
RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.index.remote;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
Expand All @@ -23,19 +24,23 @@
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.remote.RemoteStoreUtils.longToCompositeBase64AndBinaryEncoding;
import static org.opensearch.index.remote.RemoteStoreUtils.longToUrlBase64;

/**
* This class contains the different enums related to remote store like data categories and types, path types
* and hashing algorithm.
*
* @opensearch.api
*/
@ExperimentalApi
public class RemoteStoreEnums {

/**
* Categories of the data in Remote store.
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public enum DataCategory {
SEGMENTS("segments", Set.of(DataType.values())),
TRANSLOG("translog", Set.of(DATA, METADATA));
Expand All @@ -61,6 +66,7 @@ public String getName() {
* Types of data in remote store.
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public enum DataType {
DATA("data"),
METADATA("metadata"),
Expand All @@ -82,6 +88,7 @@ public String getName() {
* For more information, see <a href="https://github.com/opensearch-project/OpenSearch/issues/12567">Github issue #12567</a>.
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public enum PathType {
FIXED(0) {
@Override
Expand Down Expand Up @@ -214,15 +221,29 @@ public static PathType parseString(String pathType) {
* Type of hashes supported for path types that have hashing.
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public enum PathHashAlgorithm {

FNV_1A(0) {
FNV_1A_BASE64(0) {
@Override
String hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
.getName();
long hash = FNV1a.hash64(input);
return RemoteStoreUtils.longToUrlBase64(hash);
return longToUrlBase64(hash);
}
},
/**
* This hash algorithm will generate a hash value which will use 1st 6 bits to create bas64 character and next 14
* bits to create binary string.
*/
FNV_1A_COMPOSITE_1(1) {
@Override
String hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
.getName();
long hash = FNV1a.hash64(input);
return longToCompositeBase64AndBinaryEncoding(hash, 20);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ public class RemoteStorePathStrategyResolver {

private volatile PathType type;

private volatile PathHashAlgorithm hashAlgorithm;

private final Supplier<Version> minNodeVersionSupplier;

public RemoteStorePathStrategyResolver(ClusterSettings clusterSettings, Supplier<Version> minNodeVersionSupplier) {
this.minNodeVersionSupplier = minNodeVersionSupplier;
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, this::setType);
type = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING);
hashAlgorithm = clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, this::setType);
clusterSettings.addSettingsUpdateConsumer(IndicesService.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, this::setHashAlgorithm);
}

public RemoteStorePathStrategy get() {
Expand All @@ -39,11 +43,15 @@ public RemoteStorePathStrategy get() {
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
pathType = Version.CURRENT.compareTo(minNodeVersionSupplier.get()) <= 0 ? type : PathType.FIXED;
// If the path type is fixed, hash algorithm is not applicable.
pathHashAlgorithm = pathType == PathType.FIXED ? null : PathHashAlgorithm.FNV_1A;
pathHashAlgorithm = pathType == PathType.FIXED ? null : hashAlgorithm;
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

private void setType(PathType type) {
this.type = type;
}

private void setHashAlgorithm(PathHashAlgorithm hashAlgorithm) {
this.hashAlgorithm = hashAlgorithm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

Expand All @@ -26,10 +27,16 @@
public class RemoteStoreUtils {
public static final int LONG_MAX_LENGTH = String.valueOf(Long.MAX_VALUE).length();

/**
* URL safe base 64 character set. This must not be changed as this is used in deriving the base64 equivalent of binary.
*/
static final char[] URL_BASE64_CHARSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_".toCharArray();

/**
* This method subtracts given numbers from Long.MAX_VALUE and returns a string representation of the result.
* The resultant string is guaranteed to be of the same length that of Long.MAX_VALUE. If shorter, we add left padding
* of 0s to the string.
*
* @param num number to get the inverted long string for
* @return String value of Long.MAX_VALUE - num
*/
Expand All @@ -46,6 +53,7 @@ public static String invertLong(long num) {

/**
* This method converts the given string into long and subtracts it from Long.MAX_VALUE
*
* @param str long in string format to be inverted
* @return long value of the invert result
*/
Expand All @@ -59,6 +67,7 @@ public static long invertLong(String str) {

/**
* Extracts the segment name from the provided segment file name
*
* @param filename Segment file name to parse
* @return Name of the segment that the segment file belongs to
*/
Expand All @@ -79,10 +88,9 @@ public static String getSegmentName(String filename) {
}

/**
*
* @param mdFiles List of segment/translog metadata files
* @param fn Function to extract PrimaryTerm_Generation and Node Id from metadata file name .
* fn returns null if node id is not part of the file name
* @param fn Function to extract PrimaryTerm_Generation and Node Id from metadata file name .
* fn returns null if node id is not part of the file name
*/
public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String, Tuple<String, String>> fn) {
Map<String, String> nodesByPrimaryTermAndGen = new HashMap<>();
Expand Down Expand Up @@ -116,4 +124,26 @@ static String longToUrlBase64(long value) {
String base64Str = Base64.getUrlEncoder().encodeToString(hashBytes);
return base64Str.substring(0, base64Str.length() - 1);
}

static long urlBase64ToLong(String base64Str) {
byte[] hashBytes = Base64.getUrlDecoder().decode(base64Str);
return ByteBuffer.wrap(hashBytes).getLong();
}

/**
* Converts an input hash which occupies 64 bits of memory into a composite encoded string. The string will have 2 parts -
* 1. Base 64 string and 2. Binary String. We will use the first 6 bits for creating the base 64 string.
* For the second part, the rest of the bits (of length {@code len}-6) will be used as is in string form.
*/
static String longToCompositeBase64AndBinaryEncoding(long value, int len) {
if (len < 7 || len > 64) {
throw new IllegalArgumentException("In longToCompositeBase64AndBinaryEncoding, len must be between 7 and 64 (both inclusive)");
}
String binaryEncoding = String.format(Locale.ROOT, "%64s", Long.toBinaryString(value)).replace(' ', '0');
String base64Part = binaryEncoding.substring(0, 6);
String binaryPart = binaryEncoding.substring(6, len);
int base64DecimalValue = Integer.valueOf(base64Part, 2);
assert base64DecimalValue >= 0 && base64DecimalValue < 64;
return URL_BASE64_CHARSET[base64DecimalValue] + binaryPart;
}
}
23 changes: 20 additions & 3 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.opensearch.index.query.QueryRewriteContext;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
Expand Down Expand Up @@ -309,17 +311,32 @@ public class IndicesService extends AbstractLifecycleComponent
);

/**
* This setting is used to set the remote store blob store path prefix strategy. This setting is effective only for
* This setting is used to set the remote store blob store path type strategy. This setting is effective only for
* remote store enabled cluster.
*/
public static final Setting<PathType> CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index.path.prefix.type",
@ExperimentalApi
public static final Setting<PathType> CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.index.path.type",
PathType.FIXED.toString(),
PathType::parseString,
Property.NodeScope,
Property.Dynamic
);

/**
* This setting is used to set the remote store blob store path hash algorithm strategy. This setting is effective only for
* remote store enabled cluster. This setting will come to effect if the {@link #CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
@ExperimentalApi
public static final Setting<PathHashAlgorithm> CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING = new Setting<>(
"cluster.remote_store.index.path.hash_algorithm",
PathHashAlgorithm.FNV_1A_COMPOSITE_1.toString(),
PathHashAlgorithm::parseString,
Property.NodeScope,
Property.Dynamic
);

/**
* The node's settings.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,7 +1720,7 @@ public void testRemoteCustomData() {
validateRemoteCustomData(
indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY),
PathHashAlgorithm.NAME,
PathHashAlgorithm.FNV_1A.name()
PathHashAlgorithm.FNV_1A_COMPOSITE_1.name()
);
}

Expand All @@ -1729,7 +1729,7 @@ private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, PathType
if (remoteStoreEnabled) {
settingsBuilder.put(NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test");
}
settingsBuilder.put(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), pathType.toString());
settingsBuilder.put(IndicesService.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType.toString());
Settings settings = settingsBuilder.build();

ClusterService clusterService = mock(ClusterService.class);
Expand Down
Loading

0 comments on commit 090f982

Please sign in to comment.