Skip to content

Commit

Permalink
Add retryable action for refres-mapping and shard action which dont u…
Browse files Browse the repository at this point in the history
…se TransportClusterManagerNodeAction

Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Sep 1, 2022
1 parent 45760f5 commit 4bf4e8b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MetadataMappingService;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.EmptyTransportResponseHandler;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportResponse;
Expand All @@ -66,11 +72,17 @@ public class NodeMappingRefreshAction {

private final TransportService transportService;
private final MetadataMappingService metadataMappingService;
private final ThreadPool threadPool;

@Inject
public NodeMappingRefreshAction(TransportService transportService, MetadataMappingService metadataMappingService) {
public NodeMappingRefreshAction(
TransportService transportService,
MetadataMappingService metadataMappingService,
ThreadPool threadPool
) {
this.transportService = transportService;
this.metadataMappingService = metadataMappingService;
this.threadPool = threadPool;
transportService.registerRequestHandler(
ACTION_NAME,
ThreadPool.Names.SAME,
Expand All @@ -80,11 +92,70 @@ public NodeMappingRefreshAction(TransportService transportService, MetadataMappi
}

public void nodeMappingRefresh(final DiscoveryNode clusterManagerNode, final NodeMappingRefreshRequest request) {
new NodeMappingRefreshClusterManagerAction(clusterManagerNode, request).run();
}

private void sendNodeMappingRefreshToClusterManager(
final DiscoveryNode clusterManagerNode,
final NodeMappingRefreshRequest request,
ActionListener listener
) {
if (clusterManagerNode == null) {
logger.warn("can't send mapping refresh for [{}], no cluster-manager known.", request.index());
return;
}
transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
transportService.sendRequest(clusterManagerNode, ACTION_NAME, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
}

/**
* RetryableAction for performing retires for cluster manager throttling.
*/
private class NodeMappingRefreshClusterManagerAction extends RetryableAction {

private final DiscoveryNode clusterManagerNode;
private final NodeMappingRefreshRequest request;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 10;

private NodeMappingRefreshClusterManagerAction(DiscoveryNode clusterManagerNode, NodeMappingRefreshRequest request) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
new ActionListener() {
@Override
public void onResponse(Object o) {}

@Override
public void onFailure(Exception e) {
logger.warn("Mapping refresh for [{}] failed due to [{}]", request.index, e.getMessage());
}
},
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
ThreadPool.Names.SAME
);
this.clusterManagerNode = clusterManagerNode;
this.request = request;
}

@Override
public void tryAction(ActionListener listener) {
sendNodeMappingRefreshToClusterManager(clusterManagerNode, request, listener);
}

@Override
public boolean shouldRetry(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.opensearch.OpenSearchException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
Expand All @@ -55,6 +57,7 @@
import org.opensearch.cluster.routing.allocation.FailedShard;
import org.opensearch.cluster.routing.allocation.StaleShard;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -180,6 +183,15 @@ private void sendShardAction(
final ClusterState currentState,
final TransportRequest request,
final ActionListener<Void> listener
) {
new ShardStateClusterManagerAction(currentState, actionName, request, listener).run();
}

private void sendShardActionToClusterManager(
final String actionName,
final ClusterState currentState,
final TransportRequest request,
final ActionListener<Void> listener
) {
ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
DiscoveryNode clusterManagerNode = currentState.nodes().getMasterNode();
Expand Down Expand Up @@ -222,6 +234,52 @@ public void handleException(TransportException exp) {
}
}

/**
* RetryableAction for performing retires for cluster manager throttling.
*/
private class ShardStateClusterManagerAction extends RetryableAction {
private final String actionName;
private final ActionListener listener;
private final ClusterState currentState;
private final TransportRequest request;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 5000;

private ShardStateClusterManagerAction(
ClusterState currentState,
String actionName,
TransportRequest request,
ActionListener listener
) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
TimeValue.timeValueMillis(Integer.MAX_VALUE), // Shard tasks are internal and don't have timeout
listener,
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
ThreadPool.Names.SAME
);
this.actionName = actionName;
this.listener = listener;
this.currentState = currentState;
this.request = request;
}

@Override
public void tryAction(ActionListener listener) {
sendShardActionToClusterManager(actionName, currentState, request, listener);
}

@Override
public boolean shouldRetry(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}
}

private static Class[] CLUSTER_MANAGER_CHANNEL_EXCEPTIONS = new Class[] {
NotMasterException.class,
ConnectTransportException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,7 @@ public void onFailure(final Exception e) {
threadPool,
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
shardStateAction,
new NodeMappingRefreshAction(transportService, metadataMappingService),
new NodeMappingRefreshAction(transportService, metadataMappingService, threadPool),
repositoriesService,
mock(SearchService.class),
new PeerRecoverySourceService(transportService, indicesService, recoverySettings),
Expand Down

0 comments on commit 4bf4e8b

Please sign in to comment.