Skip to content

Commit

Permalink
Update Shallow Snapshot flows to support remote path type & hash algo
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 3, 2024
1 parent fb5d036 commit 2ef85af
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
import org.opensearch.common.hash.FNV1a;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

Expand Down Expand Up @@ -78,7 +81,7 @@ public String getName() {
*/
@PublicApi(since = "2.14.0")
public enum PathType {
FIXED {
FIXED(0) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// Hash algorithm is not used in FIXED path type
Expand All @@ -94,7 +97,7 @@ boolean requiresHashAlgorithm() {
return false;
}
},
HASHED_PREFIX {
HASHED_PREFIX(1) {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
// TODO - We need to implement this, keeping the same path as Fixed for sake of multiple tests that can fail otherwise.
Expand All @@ -112,6 +115,33 @@ boolean requiresHashAlgorithm() {
}
};

private final int code;

PathType(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathType> CODE_TO_ENUM;
static {
PathType[] values = values();
Map<Integer, PathType> codeToStatus = new HashMap<>(values.length);
for (PathType value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathType}.
*/
public static PathType fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

/**
* This method generates the path for the given path input which constitutes multiple fields and characteristics
* of the data.
Expand Down Expand Up @@ -158,7 +188,7 @@ public static PathType parseString(String pathType) {
@PublicApi(since = "2.14.0")
public enum PathHashAlgorithm {

FNV_1A {
FNV_1A(0) {
@Override
long hash(PathInput pathInput) {
String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType()
Expand All @@ -167,6 +197,33 @@ long hash(PathInput pathInput) {
}
};

private final int code;

PathHashAlgorithm(int code) {
this.code = code;
}

public int getCode() {
return code;
}

private static final Map<Integer, PathHashAlgorithm> CODE_TO_ENUM;
static {
PathHashAlgorithm[] values = values();
Map<Integer, PathHashAlgorithm> codeToStatus = new HashMap<>(values.length);
for (PathHashAlgorithm value : values) {
codeToStatus.put(value.code, value);
}
CODE_TO_ENUM = unmodifiableMap(codeToStatus);
}

/**
* Turn a status code into a {@link PathHashAlgorithm}.
*/
public static PathHashAlgorithm fromCode(int code) {
return CODE_TO_ENUM.get(code);
}

abstract long hash(PathInput pathInput);

public static PathHashAlgorithm parseString(String pathHashAlgorithm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
Expand Down Expand Up @@ -412,8 +410,7 @@ void recoverFromSnapshotAndRemoteStore(
remoteStoreRepository,
indexUUID,
shardId,
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
shallowCopyShardMetadata.getRemoteStorePathStrategy()

Check warning on line 413 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L413

Added line #L413 was not covered by tests
);
sourceRemoteDirectory.initializeToSpecificCommit(
primaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Remote Store based Shard snapshot metadata
Expand All @@ -41,8 +45,10 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment,
private final String repositoryBasePath;
private final String indexUUID;
private final List<String> fileNames;
private final PathType pathType;
private final PathHashAlgorithm pathHashAlgorithm;

static final String DEFAULT_VERSION = "1";
static final String DEFAULT_VERSION = "2";
static final String NAME = "name";
static final String VERSION = "version";
static final String INDEX_VERSION = "index_version";
Expand All @@ -61,6 +67,8 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment,

static final String TOTAL_FILE_COUNT = "number_of_files";
static final String TOTAL_SIZE = "total_size";
static final String PATH_TYPE = "path_type";
static final String PATH_HASH_ALGORITHM = "path_hash_algorithm";

private static final ParseField PARSE_NAME = new ParseField(NAME);
private static final ParseField PARSE_VERSION = new ParseField(VERSION);
Expand All @@ -75,6 +83,8 @@ public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment,
private static final ParseField PARSE_REMOTE_STORE_REPOSITORY = new ParseField(REMOTE_STORE_REPOSITORY);
private static final ParseField PARSE_REPOSITORY_BASE_PATH = new ParseField(REPOSITORY_BASE_PATH);
private static final ParseField PARSE_FILE_NAMES = new ParseField(FILE_NAMES);
private static final ParseField PARSE_PATH_TYPE = new ParseField(PATH_TYPE);
private static final ParseField PARSE_PATH_HASH_ALGORITHM = new ParseField(PATH_HASH_ALGORITHM);

/**
* Serializes shard snapshot metadata info into JSON
Expand All @@ -101,6 +111,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(fileName);
}
builder.endArray();
builder.field(PATH_TYPE, pathType.getCode());
builder.field(PATH_HASH_ALGORITHM, pathHashAlgorithm.getCode());
return builder;
}

Expand All @@ -116,31 +128,27 @@ public RemoteStoreShardShallowCopySnapshot(
String indexUUID,
String remoteStoreRepository,
String repositoryBasePath,
List<String> fileNames
List<String> fileNames,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm
) {
this.version = DEFAULT_VERSION;
verifyParameters(
version,
this(
DEFAULT_VERSION,
snapshot,
indexVersion,
primaryTerm,
commitGeneration,
startTime,
time,
totalFileCount,
totalSize,
indexUUID,
remoteStoreRepository,
repositoryBasePath
repositoryBasePath,
fileNames,
pathType,
pathHashAlgorithm
);
this.snapshot = snapshot;
this.indexVersion = indexVersion;
this.primaryTerm = primaryTerm;
this.commitGeneration = commitGeneration;
this.startTime = startTime;
this.time = time;
this.totalFileCount = totalFileCount;
this.totalSize = totalSize;
this.indexUUID = indexUUID;
this.remoteStoreRepository = remoteStoreRepository;
this.repositoryBasePath = repositoryBasePath;
this.fileNames = fileNames;
}

private RemoteStoreShardShallowCopySnapshot(
Expand All @@ -156,7 +164,9 @@ private RemoteStoreShardShallowCopySnapshot(
String indexUUID,
String remoteStoreRepository,
String repositoryBasePath,
List<String> fileNames
List<String> fileNames,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm
) {
verifyParameters(
version,
Expand All @@ -181,6 +191,8 @@ private RemoteStoreShardShallowCopySnapshot(
this.remoteStoreRepository = remoteStoreRepository;
this.repositoryBasePath = repositoryBasePath;
this.fileNames = fileNames;
this.pathType = pathType;
this.pathHashAlgorithm = pathHashAlgorithm;
}

/**
Expand All @@ -203,6 +215,8 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa
long primaryTerm = -1;
long commitGeneration = -1;
List<String> fileNames = new ArrayList<>();
PathType pathType = null;
PathHashAlgorithm pathHashAlgorithm = null;

if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
Expand Down Expand Up @@ -237,6 +251,10 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa
remoteStoreRepository = parser.text();
} else if (PARSE_REPOSITORY_BASE_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
repositoryBasePath = parser.text();
} else if (PARSE_PATH_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
pathType = PathType.fromCode(parser.intValue());
} else if (PARSE_PATH_HASH_ALGORITHM.match(currentFieldName, parser.getDeprecationHandler())) {
pathHashAlgorithm = PathHashAlgorithm.fromCode(parser.intValue());
} else {
throw new OpenSearchParseException("unknown parameter [{}]", currentFieldName);
}
Expand Down Expand Up @@ -266,7 +284,9 @@ public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser pa
indexUUID,
remoteStoreRepository,
repositoryBasePath,
fileNames
fileNames,
pathType,
pathHashAlgorithm
);
}

Expand Down Expand Up @@ -433,7 +453,9 @@ public RemoteStoreShardShallowCopySnapshot asClone(String targetSnapshotName, lo
indexUUID,
remoteStoreRepository,
repositoryBasePath,
fileNames
fileNames,
pathType,
pathHashAlgorithm
);
}

Expand All @@ -449,4 +471,11 @@ public IndexShardSnapshotStatus getIndexShardSnapshotStatus() {
null
); // Not adding a real generation here as it doesn't matter to callers
}

public RemoteStorePathStrategy getRemoteStorePathStrategy() {
if (Objects.nonNull(pathType) && Objects.nonNull(pathHashAlgorithm)) {
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);

Check warning on line 477 in server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java#L477

Added line #L477 was not covered by tests
}
return new RemoteStorePathStrategy(PathType.FIXED);

Check warning on line 479 in server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java#L479

Added line #L479 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
Expand Down Expand Up @@ -672,8 +671,7 @@ public void cloneRemoteStoreIndexShardSnapshot(
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.shardId()),
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
remStoreBasedShardMetadata.getRemoteStorePathStrategy()

Check warning on line 674 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L674

Added line #L674 was not covered by tests
);
remoteStoreMetadataLockManger.cloneLock(
FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(),
Expand Down Expand Up @@ -1154,8 +1152,7 @@ protected void releaseRemoteStoreLockAndCleanup(
remoteStoreRepoForIndex,
indexUUID,
shardId,
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy()

Check warning on line 1155 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1155

Added line #L1155 was not covered by tests
);
remoteStoreMetadataLockManager.release(FileLockInfo.getLockInfoBuilder().withAcquirerId(shallowSnapshotUUID).build());
logger.debug("Successfully released lock for shard {} of index with uuid {}", shardId, indexUUID);
Expand All @@ -1178,8 +1175,7 @@ protected void releaseRemoteStoreLockAndCleanup(
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
ThreadPool.Names.REMOTE_PURGE,
new RemoteStorePathStrategy(PathType.FIXED)
// TODO - The path type needs to be obtained from RemoteStoreShardShallowCopySnapshot
remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy()

Check warning on line 1178 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1178

Added line #L1178 was not covered by tests
);
}
}
Expand Down Expand Up @@ -2694,6 +2690,7 @@ public void snapshotRemoteStoreIndexShard(
// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
RemoteStorePathStrategy pathStrategy = store.indexSettings().getRemoteStorePathStrategy();
REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write(
new RemoteStoreShardShallowCopySnapshot(
snapshotId.getName(),
Expand All @@ -2707,7 +2704,9 @@ public void snapshotRemoteStoreIndexShard(
store.indexSettings().getUUID(),
store.indexSettings().getRemoteStoreRepository(),
this.basePath().toString(),
fileNames
fileNames,
pathStrategy.getType(),
pathStrategy.getHashAlgorithm()
),
shardContainer,
snapshotId.getUUID(),
Expand Down
Loading

0 comments on commit 2ef85af

Please sign in to comment.