Skip to content

Commit

Permalink
Update to support assertNoSnapshottedIndexCommit check by clearing co…
Browse files Browse the repository at this point in the history
…pyState map before close.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Feb 17, 2023
1 parent ab1159d commit bb0ca98
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.transport.TransportService;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class OngoingSegmentReplications implements Closeable {
private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;
private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class);
private final Map<ShardId, CopyState> copyStateMap;
private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class);

/**
* Constructor.
Expand Down Expand Up @@ -101,7 +101,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*
* @param request {@link CheckpointInfoRequest}
* @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer.
* @param copyState {@link GatedCloseable} containing {@link CopyState} that will be released upon copy completion.
* @param copyState {@link GatedCloseable} containing {@link CopyState} that will be released upon copy completion.
*/
void prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter, GatedCloseable<CopyState> copyState) {
final SegmentReplicationSourceHandler segmentReplicationSourceHandler = allocationIdToHandlers.putIfAbsent(
Expand All @@ -120,26 +120,46 @@ void prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileCh
}
}

private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
GatedCloseable<CopyState> copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
}

/**
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
void cancel(IndexShard shard, String reason) {
if (shard.routingEntry().primary()) {
clearCopyStateForShard(shard.shardId());
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
final CopyState remove = copyStateMap.remove(shard.shardId());
if (remove != null) {
remove.decRef();
}
}
}


/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
Expand All @@ -164,22 +184,6 @@ int size() {
return allocationIdToHandlers.size();
}

private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
GatedCloseable<CopyState> copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
}

/**
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
* This will also decref the handler's CopyState reference.
Expand All @@ -195,39 +199,34 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
}
}

public synchronized void setCopyState(IndexShard indexShard) {
// We can only compute CopyState for shards that have started.
if (indexShard.state() == IndexShardState.STARTED) {
final CopyState state;
/**
* Build and store a new {@link CopyState} for the given {@link IndexShard}.
*
* @param indexShard - Primary shard.
*/
public void setCopyState(IndexShard indexShard) {
if (indexShard.state() == IndexShardState.STARTED && indexShard.verifyPrimaryMode()) {
try {
state = new CopyState(indexShard);
final CopyState state = new CopyState(indexShard);
final CopyState oldCopyState = copyStateMap.remove(indexShard.shardId());
if (oldCopyState != null) {
oldCopyState.decRef();
}
copyStateMap.put(indexShard.shardId(), state);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final CopyState oldCopyState = copyStateMap.remove(indexShard.shardId());
if (oldCopyState != null) {
oldCopyState.decRef();
}
// build the CopyState object and cache it before returning
copyStateMap.put(indexShard.shardId(), state);
}
}

@Override
public synchronized void close() throws IOException {
for (CopyState value : copyStateMap.values()) {
value.decRef();
}
}

public synchronized void clearCopyStateForShard(ShardId shardId) {
final CopyState remove = copyStateMap.remove(shardId);
if (remove != null) {
remove.decRef();
}
}

public synchronized GatedCloseable<CopyState> getLatestCopyState(ShardId shardId) {
/**
* Get the latest {@link CopyState} for the given shardId. This method returns an incref'd CopyState wrapped
* in a {@link GatedCloseable}, when released the copyState is decRef'd.
*
* @param shardId {@link ShardId}
* @return {@link GatedCloseable} Closeable containing the CopyState.
*/
public GatedCloseable<CopyState> getCopyState(ShardId shardId) {
final CopyState copyState = copyStateMap.get(shardId);
if (copyState != null) {
copyState.incRef();
Expand All @@ -238,7 +237,18 @@ public synchronized GatedCloseable<CopyState> getLatestCopyState(ShardId shardId
throw new IndexShardNotStartedException(shardId, indexShard.state());
}

// for tests.
Map<ShardId, CopyState> getCopyStateMap() {
return copyStateMap;
}

@Override
public void close() throws IOException {
// Extra check to ensure all copyState has been cleaned up.
for (CopyState value : copyStateMap.values()) {
if (value.refCount() > 0) {
value.decRef();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
final ReplicationTimer timer = new ReplicationTimer();
assert ongoingSegmentReplications != null;
timer.start();
final GatedCloseable<CopyState> copyStateGatedCloseable = ongoingSegmentReplications.getLatestCopyState(
final GatedCloseable<CopyState> copyStateGatedCloseable = ongoingSegmentReplications.getCopyState(
request.getCheckpoint().getShardId()
);
final CopyState copyState = copyStateGatedCloseable.get();
Expand All @@ -131,11 +131,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
} else {
if (request.getCheckpoint().isAheadOf(copyState.getCheckpoint()) || copyState.getMetadataMap().isEmpty()) {
// if there are no files to send, or the replica is already at this checkpoint, send the infos but do not hold
// snapshotted
// infos.
// During recovery of an empty cluster it is possible we have no files to send but the primary has flushed to set
// userData,
// in this case we still want to send over infos.
// snapshotted infos. During recovery of an empty cluster it is possible we have no files to send but the
// primary has flushed to set userData, in this case we still want to send over infos.
channel.sendResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), Collections.emptyMap(), copyState.getInfosBytes())
);
Expand Down Expand Up @@ -221,25 +218,13 @@ protected void doClose() throws IOException {
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null) {
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
}
}

/**
* Cancels any replications on this node to a replica that has been promoted as primary.
*/
@Override
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) {
ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
}
if (indexShard != null && oldRouting.primary() && newRouting.primary() == false) {
ongoingSegmentReplications.clearCopyStateForShard(indexShard.shardId());
if (indexShard.routingEntry().primary()) {
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
}
}
}

public void onPrimaryRefresh(IndexShard indexShard) {
ongoingSegmentReplications.setCopyState(indexShard);
}

}
Loading

0 comments on commit bb0ca98

Please sign in to comment.