Skip to content

Commit

Permalink
Send clear session as routable remote request (#36805)
Browse files Browse the repository at this point in the history
This commit adds a RemoteClusterAwareRequest interface that allows a
request to specify which remote node it should be routed to. The remote
cluster aware client will attempt to route the request directly to this
node. Otherwise it will send it as a proxy action to eventually end up
on the requested node.

It implements the ccr clean_session action with this client.
  • Loading branch information
Tim-Brooks authored Dec 21, 2018
1 parent a9834cd commit d9b2ed6
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -45,14 +46,19 @@ final class RemoteClusterAwareClient extends AbstractClient {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
Transport.Connection connection;
if (request instanceof RemoteClusterAwareRequest) {
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
} else {
connection = remoteClusterService.getConnection(clusterAlias);
}
service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
},
listener::onFailure));
}


@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;

public interface RemoteClusterAwareRequest {

/**
* Returns the preferred discovery node for this request. The remote cluster client will attempt to send
* this request directly to this node. Otherwise, it will send the request as a proxy action that will
* be routed by the remote cluster to this node.
*
* @return preferred discovery node
*/
DiscoveryNode getPreferredTargetNode();

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,19 @@
package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.List;

public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {

public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
Expand All @@ -36,86 +31,47 @@ private ClearCcrRestoreSessionAction() {

@Override
public ClearCcrRestoreSessionResponse newResponse() {
return new ClearCcrRestoreSessionResponse();
throw new UnsupportedOperationException();
}

public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {
@Override
public Writeable.Reader<ClearCcrRestoreSessionResponse> getResponseReader() {
return ClearCcrRestoreSessionResponse::new;
}

public static class TransportDeleteCcrRestoreSessionAction
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {

private final CcrRestoreSourceService ccrRestoreService;
private final ThreadPool threadPool;

@Inject
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new,
ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService,
CcrRestoreSourceService ccrRestoreService) {
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new);
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
this.ccrRestoreService = ccrRestoreService;
this.threadPool = transportService.getThreadPool();
}

@Override
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
List<FailedNodeException> failures) {
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
}

@Override
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
return request.getRequest();
}

@Override
protected Response newNodeResponse() {
return new Response();
}

@Override
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
ccrRestoreService.closeSession(request.getSessionUUID());
return new Response(clusterService.localNode());
protected void doExecute(Task task, ClearCcrRestoreSessionRequest request,
ActionListener<ClearCcrRestoreSessionResponse> listener) {
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
// may be unnecessary when we remove these callbacks.
threadPool.generic().execute(() -> {
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
});
}
}

public static class Response extends BaseNodeResponse {

private Response() {
}

private Response(StreamInput in) throws IOException {
readFrom(in);
}

private Response(DiscoveryNode node) {
super(node);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
}

public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {
public static class ClearCcrRestoreSessionResponse extends ActionResponse {

ClearCcrRestoreSessionResponse() {
}

ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
super(clusterName, chunkResponses, failures);
}

@Override
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Response::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
out.writeList(nodes);
ClearCcrRestoreSessionResponse(StreamInput in) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,68 +6,52 @@

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.transport.RemoteClusterAwareRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {
public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest {

private Request request;
private DiscoveryNode node;
private String sessionUUID;

ClearCcrRestoreSessionRequest() {
ClearCcrRestoreSessionRequest(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}

public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
super(nodeId);
this.request = request;
public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) {
this.sessionUUID = sessionUUID;
this.node = node;
}

@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
request = new Request();
request.readFrom(streamInput);
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
request.writeTo(streamOutput);
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}

public Request getRequest() {
return request;
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}

public static class Request extends BaseNodeRequest {

private String sessionUUID;

Request() {
}

public Request(String nodeId, String sessionUUID) {
super(nodeId);
this.sessionUUID = sessionUUID;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}
String getSessionUUID() {
return sessionUUID;
}

public String getSessionUUID() {
return sessionUUID;
}
@Override
public DiscoveryNode getPreferredTargetNode() {
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques
throw new ShardNotFoundException(shardId);
}
ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId());
return new PutCcrRestoreSessionResponse(clusterService.localNode());
}

@Override
Expand All @@ -93,34 +94,34 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) {

public static class PutCcrRestoreSessionResponse extends ActionResponse {

private String nodeId;
private DiscoveryNode node;

PutCcrRestoreSessionResponse() {
}

PutCcrRestoreSessionResponse(String nodeId) {
this.nodeId = nodeId;
PutCcrRestoreSessionResponse(DiscoveryNode node) {
this.node = node;
}

PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
node = new DiscoveryNode(in);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
node = new DiscoveryNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
node.writeTo(out);
}

public String getNodeId() {
return nodeId;
public DiscoveryNode getNode() {
return node;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet();
String nodeId = response.getNodeId();
DiscoveryNode node = response.getNode();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
closeSession(remoteClient, node, sessionUUID);
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
}

Expand All @@ -278,13 +278,9 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
}
}

private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
if (response.hasFailures()) {
throw response.failures().get(0);
}
}
}

0 comments on commit d9b2ed6

Please sign in to comment.