Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AllocatedPersistentTask.getState() #30858

Merged
merged 1 commit into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;

Expand All @@ -38,18 +37,16 @@
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private volatile String persistentTaskId;
private volatile long allocationId;

private final AtomicReference<State> state;
@Nullable
private volatile Exception failure;

private volatile String persistentTaskId;
private volatile long allocationId;
private volatile @Nullable Exception failure;
private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;


public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -101,24 +98,10 @@ public Exception getFailure() {
return failure;
}

boolean markAsCancelled() {
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
}

public State getState() {
return state.get();
}

public long getAllocationId() {
return allocationId;
}

public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}

/**
* Waits for this persistent task to have the desired state.
*/
Expand All @@ -128,6 +111,14 @@ public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
}

final boolean isCompleted() {
return state.get() == State.COMPLETED;
}

boolean markAsCancelled() {
return state.compareAndSet(State.STARTED, State.PENDING_CANCEL);
}

public void markAsCompleted() {
completeAndNotifyIfNeeded(null);
}
Expand All @@ -138,11 +129,10 @@ public void markAsFailed(Exception e) {
} else {
completeAndNotifyIfNeeded(e);
}

}

private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
final State prevState = state.getAndSet(State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
Expand Down Expand Up @@ -173,4 +163,10 @@ public void onFailure(Exception e) {
}
}
}

public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void clusterChanged(ClusterChangedEvent event) {

for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
if (task.isCompleted()) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
Expand All @@ -73,7 +74,6 @@ public void setUp() throws Exception {
threadPool = new TestThreadPool(getClass().getName());
}


@Override
@After
public void tearDown() throws Exception {
Expand All @@ -95,7 +95,7 @@ private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings
return state.build();
}

public void testStartTask() throws Exception {
public void testStartTask() {
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
Expand Down Expand Up @@ -131,8 +131,8 @@ public void testStartTask() throws Exception {

if (added == false) {
logger.info("No local node action was added");

}

MetaData.Builder metaData = MetaData.builder(state.metaData());
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
Expand All @@ -149,6 +149,7 @@ public void testStartTask() throws Exception {

// Make sure action wasn't called again
assertThat(executor.executions.size(), equalTo(1));
assertThat(executor.get(0).task.isCompleted(), is(false));

// Start another task on this node
state = newClusterState;
Expand All @@ -157,10 +158,15 @@ public void testStartTask() throws Exception {

// Make sure action was called this time
assertThat(executor.size(), equalTo(2));
assertThat(executor.get(1).task.isCompleted(), is(false));

// Finish both tasks
executor.get(0).task.markAsFailed(new RuntimeException());
executor.get(1).task.markAsCompleted();

assertThat(executor.get(0).task.isCompleted(), is(true));
assertThat(executor.get(1).task.isCompleted(), is(true));

String failedTaskId = executor.get(0).task.getPersistentTaskId();
String finishedTaskId = executor.get(1).task.getPersistentTaskId();
executor.clear();
Expand All @@ -186,7 +192,6 @@ public void testStartTask() throws Exception {
// Make sure action was only allocated on this node once
assertThat(executor.size(), equalTo(1));
}

}

public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
Expand Down Expand Up @@ -300,7 +305,6 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti

// Check the the task is now removed from task manager
assertThat(taskManager.getTasks().values(), empty());

}

private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,
Expand Down