Skip to content

Commit

Permalink
Integrate remote translog download on failover (opensearch-project#5699)
Browse files Browse the repository at this point in the history
* Integrate remote translog download on failover

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored and gbbafna committed Jan 9, 2023
1 parent ff2d4f1 commit 654dd56
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 105 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# CHANGELOG

All notable changes to this project are documented in this file.
Expand All @@ -20,7 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))
- Added Request level Durability using Remote Translog functionality ([#5757](https://github.com/opensearch-project/OpenSearch/pull/5757))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand All @@ -42,7 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480))
- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.Version;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.TriFunction;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -501,7 +503,7 @@ public IndexService newIndexService(
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -557,7 +559,7 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
repositoriesServiceSupplier
translogFactorySupplier
);
success = true;
return indexService;
Expand Down
20 changes: 5 additions & 15 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand All @@ -99,7 +97,6 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -117,6 +114,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -176,7 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -209,7 +207,7 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -281,7 +279,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
this.translogFactorySupplier = translogFactorySupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -524,14 +522,6 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary()
? new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down Expand Up @@ -562,7 +552,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
translogFactory,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -321,7 +322,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;

private final Store remoteStore;
private final TranslogFactory translogFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -344,7 +345,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -431,7 +432,7 @@ public boolean shouldCache(Query query) {
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
this.translogFactorySupplier = translogFactorySupplier;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3371,7 +3372,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
translogFactory
translogFactorySupplier.apply(indexSettings, shardRouting)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
return new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
fileTransferTracker
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.translog.transfer.listener.FileTransferListener;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -32,8 +33,16 @@ public FileTransferTracker(ShardId shardId) {

@Override
public void onSuccess(TransferFileSnapshot fileSnapshot) {
TransferState targetState = TransferState.SUCCESS;
fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> {
add(fileSnapshot.getName(), TransferState.SUCCESS);
}

void add(String file, boolean success) {
TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED;
add(file, targetState);
}

private void add(String file, TransferState targetState) {
fileTransferTracker.compute(file, (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
}
Expand All @@ -43,13 +52,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {

@Override
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
TransferState targetState = TransferState.FAILED;
fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> {
if (v == null || v.validateNextState(targetState)) {
return targetState;
}
throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState);
});
add(fileSnapshot.getName(), TransferState.FAILED);
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
Expand Down Expand Up @@ -80,7 +83,7 @@ public boolean validateNextState(TransferState target) {
case FAILED:
return true;
case SUCCESS:
return Set.of(SUCCESS).contains(target);
return Objects.equals(SUCCESS, target);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;

import java.io.IOException;
Expand All @@ -33,7 +32,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand All @@ -48,9 +46,8 @@ public class TranslogTransferManager {

private final TransferService transferService;
private final BlobPath remoteBaseTransferPath;
private final BlobPath remoteMetadaTransferPath;
private final FileTransferListener fileTransferListener;
private final UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter;
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;

private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000;

Expand All @@ -61,31 +58,29 @@ public class TranslogTransferManager {
public TranslogTransferManager(
TransferService transferService,
BlobPath remoteBaseTransferPath,
FileTransferListener fileTransferListener,
UnaryOperator<Set<TransferFileSnapshot>> exclusionFilter
FileTransferTracker fileTransferTracker
) {
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteMetadaTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferListener = fileTransferListener;
this.exclusionFilter = exclusionFilter;
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}

public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener)
throws IOException {
List<Exception> exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount());
Set<TransferFileSnapshot> toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount());
try {
toUpload.addAll(exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
logger.trace("Nothing to upload for transfer");
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
}
final CountDownLatch latch = new CountDownLatch(toUpload.size());
LatchedActionListener<TransferFileSnapshot> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(fileTransferListener::onSuccess, ex -> {
ActionListener.wrap(fileTransferTracker::onSuccess, ex -> {
assert ex instanceof FileTransferException;
logger.error(
() -> new ParameterizedMessage(
Expand All @@ -95,7 +90,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
ex
);
FileTransferException e = (FileTransferException) ex;
fileTransferListener.onFailure(e.getFileSnapshot(), ex);
fileTransferTracker.onFailure(e.getFileSnapshot(), ex);
exceptionList.add(ex);
}),
latch
Expand All @@ -119,7 +114,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
throw ex;
}
if (exceptionList.isEmpty()) {
transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadaTransferPath);
transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath);
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
} else {
Expand Down Expand Up @@ -160,14 +155,16 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
fileTransferTracker.add(fileName, true);
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadaTransferPath)
return transferService.listAll(remoteMetadataTransferPath)
.stream()
.max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)
.map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
Expand All @@ -191,12 +188,10 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));
TransferFileSnapshot fileSnapshot = new TransferFileSnapshot(
return new TransferFileSnapshot(
translogTransferMetadata.getFileName(),
translogTransferMetadata.createMetadataBytes(),
translogTransferMetadata.getPrimaryTerm()
);

return fileSnapshot;
}
}
Loading

0 comments on commit 654dd56

Please sign in to comment.