Skip to content

Commit

Permalink
[Transform] stop transform regardless of transform nodes (#69419)
Browse files Browse the repository at this point in the history
allow stop transform to stop a transform task if its waiting for assignment(e.g. if
the cluster lacks a transform node)

fixes #69260
  • Loading branch information
Hendrik Muhs authored Feb 24, 2021
1 parent a64f4db commit 4b6c280
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform.action;

import java.util.Collections;
import java.util.Set;

/**
* Record of transform tasks and their current persistent task state.
*
* This class is aimed to be used by start/stop and stats action.
*/
public final class TransformNodeAssignments {

// set of nodes where requested transforms are executed on
private final Set<String> executorNodes;
// set of transforms that are currently assigned to a node
private final Set<String> assigned;
// set of transforms that currently wait for node assignment
private final Set<String> waitingForAssignment;
// set of transforms that have neither a task nor wait for assignment, so considered stopped
private final Set<String> stopped;

TransformNodeAssignments(
final Set<String> executorNodes,
final Set<String> assigned,
final Set<String> waitingForAssignment,
final Set<String> stopped
) {
this.executorNodes = Collections.unmodifiableSet(executorNodes);
this.assigned = Collections.unmodifiableSet(assigned);
this.waitingForAssignment = Collections.unmodifiableSet(waitingForAssignment);
this.stopped = Collections.unmodifiableSet(stopped);
}

/*
* Get nodes where (requested) transforms are executed.
*/
public Set<String> getExecutorNodes() {
return executorNodes;
}

/*
* Get transforms which have tasks currently assigned to a node
*/
public Set<String> getAssigned() {
return assigned;
}

/*
* Get transforms which are currently waiting to be assigned to a node
*/
public Set<String> getWaitingForAssignment() {
return waitingForAssignment;
}

/*
* Get transforms which have no tasks, which means they are stopped
*/
public Set<String> getStopped() {
return stopped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class TransformNodes {

Expand All @@ -27,9 +28,11 @@ private TransformNodes() {}
* @param clusterState State
* @return The executor nodes
*/
public static String[] transformTaskNodes(List<String> transformIds, ClusterState clusterState) {
public static TransformNodeAssignments transformTaskNodes(List<String> transformIds, ClusterState clusterState) {

Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();

PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);

Expand All @@ -38,14 +41,23 @@ public static String[] transformTaskNodes(List<String> transformIds, ClusterStat

Collection<PersistentTasksCustomMetadata.PersistentTask<?>> tasks = tasksMetadata.findTasks(
TransformField.TASK_NAME,
t -> transformIdsSet.contains(t.getId()) && t.isAssigned()
t -> transformIdsSet.contains(t.getId())
);

for (PersistentTasksCustomMetadata.PersistentTask<?> task : tasks) {
executorNodes.add(task.getExecutorNode());
if (task.isAssigned()) {
executorNodes.add(task.getExecutorNode());
assigned.add(task.getId());
} else {
waitingForAssignment.add(task.getId());
}
}
}

return executorNodes.toArray(new String[0]);
Set<String> stopped = transformIds.stream()
.filter(id -> (assigned.contains(id) || waitingForAssignment.contains(id)) == false)
.collect(Collectors.toSet());

return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ protected void doExecute(Task task, Request request, ActionListener<Response> fi
ActionListener.wrap(hitsAndIds -> {
request.setExpandedIds(hitsAndIds.v2());
final ClusterState state = clusterService.state();
request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state));
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
// TODO: if empty the request is send to all nodes(benign but superfluous)
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
super.doExecute(task, request, ActionListener.wrap(response -> {
PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
if (tasksInProgress != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -158,6 +159,7 @@ static Tuple<Set<String>, Set<String>> findTasksWithoutConfig(ClusterState state
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();

if (nodes.isLocalNodeElectedMaster() == false) {
// Delegates stop transform to elected master node so it becomes the coordinating node.
if (nodes.getMasterNode() == null) {
Expand Down Expand Up @@ -185,15 +187,29 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
ActionListener.wrap(hitsAndIds -> {
validateTaskState(state, hitsAndIds.v2(), request.isForce());
request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state));
super.doExecute(task, request, finalListener);
final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);

final ActionListener<Response> doExecuteListener;
if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {
doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments);
} else {
doExecuteListener = finalListener;
}

if (transformNodeAssignments.getExecutorNodes().size() > 0) {
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
super.doExecute(task, request, doExecuteListener);
} else {
doExecuteListener.onResponse(new Response(true));
}
}, e -> {
if (e instanceof ResourceNotFoundException) {
Tuple<Set<String>, Set<String>> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId());
if (runningTasksAndNodes.v1().isEmpty()) {
listener.onFailure(e);
// found transforms without a config
} else if (request.isForce()) {
// TODO: handle tasks waiting for assignment
request.setExpandedIds(runningTasksAndNodes.v1());
request.setNodes(runningTasksAndNodes.v2().toArray(new String[0]));
super.doExecute(task, request, finalListener);
Expand Down Expand Up @@ -458,4 +474,31 @@ private void waitForTransformStopped(
listener.onFailure(e);
}));
}

private ActionListener<Response> cancelTransformTasksWithNoAssignment(
final ActionListener<Response> finalListener,
final TransformNodeAssignments transformNodeAssignments
) {
final ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { finalListener.onResponse(response); }, finalListener::onFailure),
transformNodeAssignments.getWaitingForAssignment().size()
);

for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener);
}

}, e -> {
GroupedActionListener<PersistentTask<?>> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { finalListener.onFailure(e); }, finalListener::onFailure),
transformNodeAssignments.getWaitingForAssignment().size()
);

for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) {
persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener);
}
});
return doExecuteListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class TransformNodesTests extends ESTestCase {

Expand All @@ -32,6 +30,7 @@ public void testTransformNodes() {
String transformIdFailed = "df-id-failed";
String transformIdBaz = "df-id-baz";
String transformIdOther = "df-id-other";
String transformIdStopped = "df-id-stopped";

PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
tasksBuilder.addTask(
Expand Down Expand Up @@ -91,21 +90,49 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) {
.build();

// don't ask for transformIdOther
String[] nodes = TransformNodes.transformTaskNodes(
Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz),
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(
Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz, transformIdStopped),
cs
);
assertEquals(2, nodes.length);
Set<String> nodesSet = new HashSet<>(Arrays.asList(nodes));
assertTrue(nodesSet.contains("node-1"));
assertTrue(nodesSet.contains("node-2"));
assertFalse(nodesSet.contains(null));
assertFalse(nodesSet.contains("node-3"));
assertEquals(2, transformNodeAssignments.getExecutorNodes().size());
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1"));
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2"));
assertFalse(transformNodeAssignments.getExecutorNodes().contains(null));
assertFalse(transformNodeAssignments.getExecutorNodes().contains("node-3"));
assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size());
assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed));
assertEquals(3, transformNodeAssignments.getAssigned().size());
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo));
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar));
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz));
assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed));
assertEquals(1, transformNodeAssignments.getStopped().size());
assertTrue(transformNodeAssignments.getStopped().contains(transformIdStopped));

transformNodeAssignments = TransformNodes.transformTaskNodes(
Arrays.asList(transformIdFoo, transformIdFailed),
cs
);

assertEquals(1, transformNodeAssignments.getExecutorNodes().size());
assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1"));
assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size());
assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed));
assertEquals(1, transformNodeAssignments.getAssigned().size());
assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo));
assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed));
assertEquals(0, transformNodeAssignments.getStopped().size());
}

public void testTransformNodes_NoTasks() {
ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build();
String[] nodes = TransformNodes.transformTaskNodes(Collections.singletonList("df-id"), emptyState);
assertEquals(0, nodes.length);
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(
Collections.singletonList("df-id"),
emptyState
);

assertEquals(0, transformNodeAssignments.getExecutorNodes().size());
assertEquals(1, transformNodeAssignments.getStopped().size());
assertTrue(transformNodeAssignments.getStopped().contains("df-id"));
}
}

0 comments on commit 4b6c280

Please sign in to comment.