From 4b6c280396be6f12484ec98adadeee752527904e Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 24 Feb 2021 12:58:54 +0100 Subject: [PATCH] [Transform] stop transform regardless of transform nodes (#69419) allow stop transform to stop a transform task if its waiting for assignment(e.g. if the cluster lacks a transform node) fixes #69260 --- .../action/TransformNodeAssignments.java | 68 +++++++++++++++++++ .../transform/action/TransformNodes.java | 20 ++++-- .../TransportGetTransformStatsAction.java | 4 +- .../action/TransportStopTransformAction.java | 47 ++++++++++++- .../transform/action/TransformNodesTests.java | 51 ++++++++++---- 5 files changed, 171 insertions(+), 19 deletions(-) create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodeAssignments.java diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodeAssignments.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodeAssignments.java new file mode 100644 index 0000000000000..f396a30bd0a54 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodeAssignments.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.action; + +import java.util.Collections; +import java.util.Set; + +/** + * Record of transform tasks and their current persistent task state. + * + * This class is aimed to be used by start/stop and stats action. + */ +public final class TransformNodeAssignments { + + // set of nodes where requested transforms are executed on + private final Set executorNodes; + // set of transforms that are currently assigned to a node + private final Set assigned; + // set of transforms that currently wait for node assignment + private final Set waitingForAssignment; + // set of transforms that have neither a task nor wait for assignment, so considered stopped + private final Set stopped; + + TransformNodeAssignments( + final Set executorNodes, + final Set assigned, + final Set waitingForAssignment, + final Set stopped + ) { + this.executorNodes = Collections.unmodifiableSet(executorNodes); + this.assigned = Collections.unmodifiableSet(assigned); + this.waitingForAssignment = Collections.unmodifiableSet(waitingForAssignment); + this.stopped = Collections.unmodifiableSet(stopped); + } + + /* + * Get nodes where (requested) transforms are executed. + */ + public Set getExecutorNodes() { + return executorNodes; + } + + /* + * Get transforms which have tasks currently assigned to a node + */ + public Set getAssigned() { + return assigned; + } + + /* + * Get transforms which are currently waiting to be assigned to a node + */ + public Set getWaitingForAssignment() { + return waitingForAssignment; + } + + /* + * Get transforms which have no tasks, which means they are stopped + */ + public Set getStopped() { + return stopped; + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index 668ecf36e7736..8a6d6f8f23111 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -15,6 +15,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public final class TransformNodes { @@ -27,9 +28,11 @@ private TransformNodes() {} * @param clusterState State * @return The executor nodes */ - public static String[] transformTaskNodes(List transformIds, ClusterState clusterState) { + public static TransformNodeAssignments transformTaskNodes(List transformIds, ClusterState clusterState) { Set executorNodes = new HashSet<>(); + Set assigned = new HashSet<>(); + Set waitingForAssignment = new HashSet<>(); PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); @@ -38,14 +41,23 @@ public static String[] transformTaskNodes(List transformIds, ClusterStat Collection> tasks = tasksMetadata.findTasks( TransformField.TASK_NAME, - t -> transformIdsSet.contains(t.getId()) && t.isAssigned() + t -> transformIdsSet.contains(t.getId()) ); for (PersistentTasksCustomMetadata.PersistentTask task : tasks) { - executorNodes.add(task.getExecutorNode()); + if (task.isAssigned()) { + executorNodes.add(task.getExecutorNode()); + assigned.add(task.getId()); + } else { + waitingForAssignment.add(task.getId()); + } } } - return executorNodes.toArray(new String[0]); + Set stopped = transformIds.stream() + .filter(id -> (assigned.contains(id) || waitingForAssignment.contains(id)) == false) + .collect(Collectors.toSet()); + + return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 1d3705b945e61..92977d1646c9d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -138,7 +138,9 @@ protected void doExecute(Task task, Request request, ActionListener fi ActionListener.wrap(hitsAndIds -> { request.setExpandedIds(hitsAndIds.v2()); final ClusterState state = clusterService.state(); - request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state)); + TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); + // TODO: if empty the request is send to all nodes(benign but superfluous) + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); super.doExecute(task, request, ActionListener.wrap(response -> { PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); if (tasksInProgress != null) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index 973643ac4256f..ccb9b9b8fc86d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -158,6 +159,7 @@ static Tuple, Set> findTasksWithoutConfig(ClusterState state protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster() == false) { // Delegates stop transform to elected master node so it becomes the coordinating node. if (nodes.getMasterNode() == null) { @@ -185,8 +187,21 @@ protected void doExecute(Task task, Request request, ActionListener li ActionListener.wrap(hitsAndIds -> { validateTaskState(state, hitsAndIds.v2(), request.isForce()); request.setExpandedIds(new HashSet<>(hitsAndIds.v2())); - request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state)); - super.doExecute(task, request, finalListener); + final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); + + final ActionListener doExecuteListener; + if (transformNodeAssignments.getWaitingForAssignment().size() > 0) { + doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments); + } else { + doExecuteListener = finalListener; + } + + if (transformNodeAssignments.getExecutorNodes().size() > 0) { + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); + super.doExecute(task, request, doExecuteListener); + } else { + doExecuteListener.onResponse(new Response(true)); + } }, e -> { if (e instanceof ResourceNotFoundException) { Tuple, Set> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId()); @@ -194,6 +209,7 @@ protected void doExecute(Task task, Request request, ActionListener li listener.onFailure(e); // found transforms without a config } else if (request.isForce()) { + // TODO: handle tasks waiting for assignment request.setExpandedIds(runningTasksAndNodes.v1()); request.setNodes(runningTasksAndNodes.v2().toArray(new String[0])); super.doExecute(task, request, finalListener); @@ -458,4 +474,31 @@ private void waitForTransformStopped( listener.onFailure(e); })); } + + private ActionListener cancelTransformTasksWithNoAssignment( + final ActionListener finalListener, + final TransformNodeAssignments transformNodeAssignments + ) { + final ActionListener doExecuteListener = ActionListener.wrap(response -> { + GroupedActionListener> groupedListener = new GroupedActionListener<>( + ActionListener.wrap(r -> { finalListener.onResponse(response); }, finalListener::onFailure), + transformNodeAssignments.getWaitingForAssignment().size() + ); + + for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) { + persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener); + } + + }, e -> { + GroupedActionListener> groupedListener = new GroupedActionListener<>( + ActionListener.wrap(r -> { finalListener.onFailure(e); }, finalListener::onFailure), + transformNodeAssignments.getWaitingForAssignment().size() + ); + + for (String unassignedTaskId : transformNodeAssignments.getWaitingForAssignment()) { + persistentTasksService.sendRemoveRequest(unassignedTaskId, groupedListener); + } + }); + return doExecuteListener; + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java index 1854fe742ddec..0e05839ea8517 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java @@ -21,8 +21,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; public class TransformNodesTests extends ESTestCase { @@ -32,6 +30,7 @@ public void testTransformNodes() { String transformIdFailed = "df-id-failed"; String transformIdBaz = "df-id-baz"; String transformIdOther = "df-id-other"; + String transformIdStopped = "df-id-stopped"; PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder(); tasksBuilder.addTask( @@ -91,21 +90,49 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) { .build(); // don't ask for transformIdOther - String[] nodes = TransformNodes.transformTaskNodes( - Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz), + TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes( + Arrays.asList(transformIdFoo, transformIdBar, transformIdFailed, transformIdBaz, transformIdStopped), cs ); - assertEquals(2, nodes.length); - Set nodesSet = new HashSet<>(Arrays.asList(nodes)); - assertTrue(nodesSet.contains("node-1")); - assertTrue(nodesSet.contains("node-2")); - assertFalse(nodesSet.contains(null)); - assertFalse(nodesSet.contains("node-3")); + assertEquals(2, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2")); + assertFalse(transformNodeAssignments.getExecutorNodes().contains(null)); + assertFalse(transformNodeAssignments.getExecutorNodes().contains("node-3")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(3, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + assertEquals(1, transformNodeAssignments.getStopped().size()); + assertTrue(transformNodeAssignments.getStopped().contains(transformIdStopped)); + + transformNodeAssignments = TransformNodes.transformTaskNodes( + Arrays.asList(transformIdFoo, transformIdFailed), + cs + ); + + assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(1, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + assertEquals(0, transformNodeAssignments.getStopped().size()); } public void testTransformNodes_NoTasks() { ClusterState emptyState = ClusterState.builder(new ClusterName("_name")).build(); - String[] nodes = TransformNodes.transformTaskNodes(Collections.singletonList("df-id"), emptyState); - assertEquals(0, nodes.length); + TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes( + Collections.singletonList("df-id"), + emptyState + ); + + assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); + assertEquals(1, transformNodeAssignments.getStopped().size()); + assertTrue(transformNodeAssignments.getStopped().contains("df-id")); } }