Skip to content

Commit

Permalink
Force flush on new elected primary after relocation.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Mar 13, 2023
1 parent a5711a1 commit c8e5154
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ public void updateShardState(
: "a primary relocation is completed by the cluster-managerr, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
if (indexSettings.isSegRepEnabled()) {
flush(new FlushRequest().waitIfOngoing(true).force(true));
}
} else if (currentRouting.primary()
&& currentRouting.relocating()
&& replicationTracker.isRelocated()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationTimer;
Expand Down Expand Up @@ -119,7 +120,7 @@ final void publish(IndexShard indexShard) {
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
indexShard.recoveryState().getTargetNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
Expand Down Expand Up @@ -156,17 +157,15 @@ public void handleException(TransportException e) {
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(
e,
NodeClosedException.class,
IndexNotFoundException.class,
AlreadyClosedException.class,
IndexShardClosedException.class
IndexShardClosedException.class,
ShardNotInPrimaryModeException.class
) != null) {
// the index was deleted or the shard is closed
// Node is shutting down or the index was deleted or the shard is closed
return;
}
logger.warn(
Expand Down

0 comments on commit c8e5154

Please sign in to comment.