Skip to content

Commit

Permalink
Force replication for remote translog enabled indices
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 21, 2022
1 parent 1531a94 commit 09bd677
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public TransportShardBulkAction(
EXECUTOR_NAME_FUNCTION,
false,
indexingPressureService,
systemIndices
systemIndices,
true
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class ReplicationOperation<
private final TimeValue initialRetryBackoffBound;
private final TimeValue retryTimeout;
private final long primaryTerm;
private final boolean forceReplicationIfRemoteTranslogEnabled;

// exposed for tests
private final ActionListener<PrimaryResultT> resultListener;
Expand All @@ -119,6 +120,22 @@ public ReplicationOperation(
long primaryTerm,
TimeValue initialRetryBackoffBound,
TimeValue retryTimeout
) {
this(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, initialRetryBackoffBound, retryTimeout, false);
}

public ReplicationOperation(
Request request,
Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
Replicas<ReplicaRequest> replicas,
Logger logger,
ThreadPool threadPool,
String opType,
long primaryTerm,
TimeValue initialRetryBackoffBound,
TimeValue retryTimeout,
boolean forceReplicationIfRemoteTranslogEnabled
) {
this.replicasProxy = replicas;
this.primary = primary;
Expand All @@ -130,6 +147,7 @@ public ReplicationOperation(
this.primaryTerm = primaryTerm;
this.initialRetryBackoffBound = initialRetryBackoffBound;
this.retryTimeout = retryTimeout;
this.forceReplicationIfRemoteTranslogEnabled = forceReplicationIfRemoteTranslogEnabled;
}

public void execute() throws Exception {
Expand Down Expand Up @@ -230,7 +248,8 @@ private void performOnReplicas(
for (final ReplicationAwareShardRouting shardRouting : replicationGroup.getReplicationTargets()) {
ShardRouting shard = shardRouting.getShardRouting();
// TODO - Add condition of underlying action being replicated regardless i.e. shard bulk and publish checkpoint action
if (!shard.isSameAllocation(primaryRouting) && shardRouting.isReplicated()) {
if (!shard.isSameAllocation(primaryRouting)
&& (shardRouting.isReplicated() || (shardRouting.isRemoteTranslogEnabled() && forceReplicationIfRemoteTranslogEnabled))) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public abstract class TransportReplicationAction<
private final boolean syncGlobalCheckpointAfterOperation;
private volatile TimeValue initialRetryBackoffBound;
private volatile TimeValue retryTimeout;
private final boolean forceReplicationIfRemoteTranslogEnabled;

protected TransportReplicationAction(
Settings settings,
Expand Down Expand Up @@ -180,6 +181,38 @@ protected TransportReplicationAction(
);
}

protected TransportReplicationAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader,
String executor,
boolean forceReplicationIfRemoteTranslogEnabled
) {
this(
settings,
actionName,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
requestReader,
replicaRequestReader,
executor,
false,
false,
forceReplicationIfRemoteTranslogEnabled
);
}

protected TransportReplicationAction(
Settings settings,
String actionName,
Expand All @@ -194,6 +227,40 @@ protected TransportReplicationAction(
String executor,
boolean syncGlobalCheckpointAfterOperation,
boolean forceExecutionOnPrimary
) {
this(
settings,
actionName,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
requestReader,
replicaRequestReader,
executor,
syncGlobalCheckpointAfterOperation,
forceExecutionOnPrimary,
false
);
}

protected TransportReplicationAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> requestReader,
Writeable.Reader<ReplicaRequest> replicaRequestReader,
String executor,
boolean syncGlobalCheckpointAfterOperation,
boolean forceExecutionOnPrimary,
boolean forceReplicationIfRemoteTranslogEnabled
) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
Expand All @@ -209,6 +276,7 @@ protected TransportReplicationAction(
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
this.forceReplicationIfRemoteTranslogEnabled = forceReplicationIfRemoteTranslogEnabled;

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

Expand Down Expand Up @@ -533,7 +601,8 @@ public void handleException(TransportException exp) {
actionName,
primaryRequest.getPrimaryTerm(),
initialRetryBackoffBound,
retryTimeout
retryTimeout,
forceReplicationIfRemoteTranslogEnabled
).execute();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,42 @@ protected TransportWriteAction(
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
) {
this(
settings,
actionName,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
request,
replicaRequest,
executorFunction,
forceExecutionOnPrimary,
indexingPressureService,
systemIndices,
false
);
}

protected TransportWriteAction(
Settings settings,
String actionName,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
Writeable.Reader<Request> request,
Writeable.Reader<ReplicaRequest> replicaRequest,
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices,
boolean forceReplicationIfRemoteTranslogEnabled
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -114,7 +150,8 @@ protected TransportWriteAction(
replicaRequest,
ThreadPool.Names.SAME,
true,
forceExecutionOnPrimary
forceExecutionOnPrimary,
forceReplicationIfRemoteTranslogEnabled
);
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
new InternalTranslogFactory(),
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,8 @@ public boolean isRemoteStoreEnabled() {
* Returns if remote translog store is enabled for this index.
*/
public boolean isRemoteTranslogStoreEnabled() {
return isRemoteTranslogStoreEnabled;
// return isRemoteTranslogStoreEnabled;
return isSegRepEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public PublishCheckpointAction(
actionFilters,
PublishCheckpointRequest::new,
PublishCheckpointRequest::new,
ThreadPool.Names.REFRESH
ThreadPool.Names.REFRESH,
true
);
this.replicationService = targetService;
}
Expand Down

0 comments on commit 09bd677

Please sign in to comment.