-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Remote Cluster State] Remote state interfaces (#13785)
* Remote Writable Entity interfaces Signed-off-by: Sooraj Sinha <soosinha@amazon.com> (cherry picked from commit 156eca3) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
5b19454
commit 87f5b78
Showing
8 changed files
with
340 additions
and
0 deletions.
There are no files selected for viewing
91 changes: 91 additions & 0 deletions
91
server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableBlobEntity.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.remote; | ||
|
||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.core.compress.Compressor; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; | ||
|
||
/** | ||
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage | ||
* | ||
* @param <T> The class type which can be uploaded to or downloaded from a blob storage. | ||
*/ | ||
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> { | ||
|
||
protected String blobFileName; | ||
|
||
protected String blobName; | ||
private final String clusterUUID; | ||
private final Compressor compressor; | ||
private final NamedXContentRegistry namedXContentRegistry; | ||
private String[] pathTokens; | ||
|
||
public AbstractRemoteWritableBlobEntity( | ||
final String clusterUUID, | ||
final Compressor compressor, | ||
final NamedXContentRegistry namedXContentRegistry | ||
) { | ||
this.clusterUUID = clusterUUID; | ||
this.compressor = compressor; | ||
this.namedXContentRegistry = namedXContentRegistry; | ||
} | ||
|
||
public abstract BlobPathParameters getBlobPathParameters(); | ||
|
||
public String getFullBlobName() { | ||
return blobName; | ||
} | ||
|
||
public String getBlobFileName() { | ||
if (blobFileName == null) { | ||
String[] pathTokens = getBlobPathTokens(); | ||
if (pathTokens == null || pathTokens.length < 1) { | ||
return null; | ||
} | ||
blobFileName = pathTokens[pathTokens.length - 1]; | ||
} | ||
return blobFileName; | ||
} | ||
|
||
public String[] getBlobPathTokens() { | ||
if (pathTokens != null) { | ||
return pathTokens; | ||
} | ||
if (blobName == null) { | ||
return null; | ||
} | ||
pathTokens = blobName.split(PATH_DELIMITER); | ||
return pathTokens; | ||
} | ||
|
||
public abstract String generateBlobFileName(); | ||
|
||
public String clusterUUID() { | ||
return clusterUUID; | ||
} | ||
|
||
public abstract UploadedMetadata getUploadedMetadata(); | ||
|
||
public void setFullBlobName(BlobPath blobPath) { | ||
this.blobName = blobPath.buildAsString() + blobFileName; | ||
} | ||
|
||
public NamedXContentRegistry getNamedXContentRegistry() { | ||
return namedXContentRegistry; | ||
} | ||
|
||
protected Compressor getCompressor() { | ||
return compressor; | ||
} | ||
|
||
} |
34 changes: 34 additions & 0 deletions
34
server/src/main/java/org/opensearch/common/remote/BlobPathParameters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.remote; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Parameters which can be used to construct a blob path | ||
* | ||
*/ | ||
public class BlobPathParameters { | ||
|
||
private final List<String> pathTokens; | ||
private final String filePrefix; | ||
|
||
public BlobPathParameters(final List<String> pathTokens, final String filePrefix) { | ||
this.pathTokens = pathTokens; | ||
this.filePrefix = filePrefix; | ||
} | ||
|
||
public List<String> getPathTokens() { | ||
return pathTokens; | ||
} | ||
|
||
public String getFilePrefix() { | ||
return filePrefix; | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.remote; | ||
|
||
import org.opensearch.core.action.ActionListener; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type. | ||
* | ||
* @param <T> The object type which can be uploaded to or downloaded from remote storage. | ||
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T. | ||
*/ | ||
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> { | ||
|
||
public void writeAsync(U entity, ActionListener<Void> listener); | ||
|
||
public T read(U entity) throws IOException; | ||
|
||
public void readAsync(U entity, ActionListener<T> listener); | ||
} |
34 changes: 34 additions & 0 deletions
34
server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntity.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.remote; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
/** | ||
* An interface to which provides defines the serialization/deserialization methods for objects to be uploaded to or downloaded from remote store. | ||
* This interface is agnostic of the remote storage type. | ||
* | ||
* @param <T> The object type which can be uploaded to or downloaded from remote storage. | ||
*/ | ||
public interface RemoteWriteableEntity<T> { | ||
/** | ||
* @return An InputStream created by serializing the entity T | ||
* @throws IOException Exception encountered while serialization | ||
*/ | ||
public InputStream serialize() throws IOException; | ||
|
||
/** | ||
* @param inputStream The InputStream which is used to read the serialized entity | ||
* @return The entity T after deserialization | ||
* @throws IOException Exception encountered while deserialization | ||
*/ | ||
public T deserialize(InputStream inputStream) throws IOException; | ||
|
||
} |
11 changes: 11 additions & 0 deletions
11
server/src/main/java/org/opensearch/common/remote/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
/** | ||
* Common remote store package | ||
*/ | ||
package org.opensearch.common.remote; |
23 changes: 23 additions & 0 deletions
23
server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.Base64; | ||
|
||
/** | ||
* Utility class for Remote Cluster State | ||
*/ | ||
public class RemoteClusterStateUtils { | ||
public static final String PATH_DELIMITER = "/"; | ||
|
||
public static String encodeString(String content) { | ||
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.gateway.remote.model; | ||
|
||
import org.opensearch.common.blobstore.BlobPath; | ||
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; | ||
import org.opensearch.common.remote.RemoteWritableEntityStore; | ||
import org.opensearch.common.remote.RemoteWriteableEntity; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.gateway.remote.RemoteClusterStateUtils; | ||
import org.opensearch.index.translog.transfer.BlobStoreTransferService; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* Abstract class for a blob type storage | ||
* | ||
* @param <T> The entity which can be uploaded to / downloaded from blob store | ||
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity. | ||
*/ | ||
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> { | ||
|
||
private final BlobStoreTransferService transferService; | ||
private final BlobStoreRepository blobStoreRepository; | ||
private final String clusterName; | ||
private final ExecutorService executorService; | ||
|
||
public RemoteClusterStateBlobStore( | ||
final BlobStoreTransferService blobStoreTransferService, | ||
final BlobStoreRepository blobStoreRepository, | ||
final String clusterName, | ||
final ThreadPool threadPool, | ||
final String executor | ||
) { | ||
this.transferService = blobStoreTransferService; | ||
this.blobStoreRepository = blobStoreRepository; | ||
this.clusterName = clusterName; | ||
this.executorService = threadPool.executor(executor); | ||
} | ||
|
||
@Override | ||
public void writeAsync(final U entity, final ActionListener<Void> listener) { | ||
try { | ||
try (InputStream inputStream = entity.serialize()) { | ||
BlobPath blobPath = getBlobPathForUpload(entity); | ||
entity.setFullBlobName(blobPath); | ||
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836 | ||
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT, | ||
// listener); | ||
} | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
} | ||
|
||
public T read(final U entity) throws IOException { | ||
// TODO Add timing logs and tracing | ||
assert entity.getFullBlobName() != null; | ||
return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName())); | ||
} | ||
|
||
@Override | ||
public void readAsync(final U entity, final ActionListener<T> listener) { | ||
executorService.execute(() -> { | ||
try { | ||
listener.onResponse(read(entity)); | ||
} catch (Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} | ||
|
||
private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) { | ||
BlobPath blobPath = blobStoreRepository.basePath() | ||
.add(RemoteClusterStateUtils.encodeString(clusterName)) | ||
.add("cluster-state") | ||
.add(obj.clusterUUID()); | ||
for (String token : obj.getBlobPathParameters().getPathTokens()) { | ||
blobPath = blobPath.add(token); | ||
} | ||
return blobPath; | ||
} | ||
|
||
private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) { | ||
String[] pathTokens = obj.getBlobPathTokens(); | ||
BlobPath blobPath = new BlobPath(); | ||
if (pathTokens == null || pathTokens.length < 1) { | ||
return blobPath; | ||
} | ||
// Iterate till second last path token to get the blob folder | ||
for (int i = 0; i < pathTokens.length - 1; i++) { | ||
blobPath = blobPath.add(pathTokens[i]); | ||
} | ||
return blobPath; | ||
} | ||
|
||
} |
12 changes: 12 additions & 0 deletions
12
server/src/main/java/org/opensearch/gateway/remote/model/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** | ||
* Package containing models for remote cluster state | ||
*/ | ||
package org.opensearch.gateway.remote.model; |