Skip to content

Commit

Permalink
[Segment Replication] Add cancellation support in RemoteStoreReplicat…
Browse files Browse the repository at this point in the history
…ionSource

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Aug 10, 2023
1 parent de2b6b7 commit 39624e0
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.opensearch.core.action.ActionListener;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
Expand Down Expand Up @@ -42,11 +43,14 @@ public class RemoteStoreReplicationSource implements SegmentReplicationSource {
private final IndexShard indexShard;
private final RemoteSegmentStoreDirectory remoteDirectory;

public RemoteStoreReplicationSource(IndexShard indexShard) {
private final CancellableThreads cancellableThreads;

public RemoteStoreReplicationSource(IndexShard indexShard, CancellableThreads cancellableThreads) {
this.indexShard = indexShard;
FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate();
this.cancellableThreads = cancellableThreads;
}

@Override
Expand All @@ -59,6 +63,7 @@ public void getCheckpointMetadata(
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
try {
cancellableThreads.checkForCancel();
RemoteSegmentMetadata mdFile = remoteDirectory.init();
// During initial recovery flow, the remote store might not have metadata as primary hasn't uploaded anything yet.
if (mdFile == null && indexShard.state().equals(IndexShardState.STARTED) == false) {
Expand Down Expand Up @@ -101,7 +106,7 @@ public void getSegmentFiles(
return;
}
logger.trace("Downloading segments files from remote store {}", filesToFetch);

cancellableThreads.checkForCancel();
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.readLatestMetadataFile();
List<StoreFileMetadata> downloadedSegments = new ArrayList<>();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
Expand All @@ -111,6 +116,7 @@ public void getSegmentFiles(
indexShard.remoteStore().incRef();
final Directory storeDirectory = indexShard.store().directory();
for (StoreFileMetadata fileMetadata : filesToFetch) {
cancellableThreads.checkForCancel();
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoverySettings;
Expand All @@ -37,9 +38,9 @@ public SegmentReplicationSourceFactory(
this.clusterService = clusterService;
}

public SegmentReplicationSource get(IndexShard shard) {
public SegmentReplicationSource get(IndexShard shard, CancellableThreads cancellableThreads) {
if (shard.indexSettings().isSegRepWithRemoteEnabled()) {
return new RemoteStoreReplicationSource(shard);
return new RemoteStoreReplicationSource(shard, cancellableThreads);
} else {
return new PrimaryShardReplicationSource(
shard.recoveryState().getTargetNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public SegmentReplicationTarget(
IndexShard indexShard,
ReplicationCheckpoint checkpoint,
SegmentReplicationSource source,
ReplicationListener listener
ReplicationListener listener,
CancellableThreads cancellableThreads
) {
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener);
super("replication_target", indexShard, new ReplicationLuceneIndex(), listener, cancellableThreads);
this.checkpoint = checkpoint;
this.source = source;
this.state = new SegmentReplicationState(
Expand Down Expand Up @@ -95,7 +96,7 @@ public SegmentReplicationState state() {
}

public SegmentReplicationTarget retryCopy() {
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener);
return new SegmentReplicationTarget(indexShard, checkpoint, source, listener, cancellableThreads);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.IndexEventListener;
Expand Down Expand Up @@ -410,18 +411,15 @@ public SegmentReplicationTarget startReplication(
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
indexShard,
checkpoint,
sourceFactory.get(indexShard),
listener
);
final CancellableThreads cancellableThreads = new CancellableThreads();
final SegmentReplicationSource source = sourceFactory.get(indexShard, cancellableThreads);
final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, checkpoint, source, listener, cancellableThreads);
startReplication(target);
return target;
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
// Used and meant only for tests
public void startReplication(final SegmentReplicationTarget target) {
final long replicationId;
try {
replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,25 @@ public CancellableThreads cancellableThreads() {
public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure);

public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
this(name, indexShard, stateIndex, listener, new CancellableThreads());
}

public ReplicationTarget(
String name,
IndexShard indexShard,
ReplicationLuceneIndex stateIndex,
ReplicationListener listener,
CancellableThreads cancellableThreads
) {
super(name);
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.listener = listener;
this.id = ID_GENERATOR.incrementAndGet();
this.stateIndex = stateIndex;
this.indexShard = indexShard;
this.store = indexShard.store();
this.cancellableThreads = cancellableThreads;
// make sure the store is not released until we are done.
this.cancellableThreads = new CancellableThreads();
store.incRef();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.junit.Before;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.replication.TestRemoteStoreReplicationSource;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber;

public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests {
Expand Down Expand Up @@ -66,6 +90,134 @@ public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws
testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true);
}

public void testCloseShardWhileGettingCheckpoint() throws Exception {
String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }";
try (
ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir())
) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

primary.refresh("Test");

final SegmentReplicationTargetService targetService = newTargetService();
final CancellableThreads cancellableThreads = new CancellableThreads();

// Create custom replication source in order to trigger shard close operations at specific point of segment replication
// lifecycle
SegmentReplicationSource source = new TestRemoteStoreReplicationSource(cancellableThreads, replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// shard is closing while fetching metadata
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
try {
cancellableThreads.checkForCancel();
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
Assert.fail("Unreachable");
}
};
startReplicationAndAssertCancellation(replica, primary, targetService, source, cancellableThreads);
shards.removeReplica(replica);
closeShards(replica);
}
}

public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationTargetService targetService = newTargetService();
final CancellableThreads cancellableThreads = new CancellableThreads();
SegmentReplicationSource source = new TestRemoteStoreReplicationSource(cancellableThreads, replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
logger.info("--> getCheckpointMetadata");
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory();
RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init();
final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
Map<String, StoreFileMetadata> metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> new StoreFileMetadata(
e.getValue().getOriginalFilename(),
e.getValue().getLength(),
Store.digestToString(Long.valueOf(e.getValue().getChecksum())),
version,
null
)
)
);
listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
try {

logger.info("--> getSegmentFiles {}", filesToFetch);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory();
RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
final Directory storeDirectory = indexShard.store().directory();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT);
break; // download single file
}
// shard is closing while we are copying files.
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
// This should through exception
cancellableThreads.checkForCancel();
} catch (IOException e) {
throw new RuntimeException(e);
} }
};
startReplicationAndAssertCancellation(replica, primary, targetService, source, cancellableThreads);
shards.removeReplica(replica);
closeShards(replica);
}
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception {
try (
ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir())
Expand Down
Loading

0 comments on commit 39624e0

Please sign in to comment.