Skip to content

Commit

Permalink
Update Core to 3.13.3 and update ES7 changes (Netflix#174)
Browse files Browse the repository at this point in the history
* Update Community to 3.13.3 and update ES7 changes

* Spotless
  • Loading branch information
james-deee authored and SimonMisencik committed Feb 17, 2023
1 parent c2df66f commit b9e0234
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public Map<TaskModel.Status, ObservableQueue> getQueues(
}

private String getQueueGroup(final NATSStreamProperties properties) {
if (properties.getDefaultQueueGroup() == null || properties.getDefaultQueueGroup().isBlank()) {
if (properties.getDefaultQueueGroup() == null
|| properties.getDefaultQueueGroup().isBlank()) {
return "";
}
return ":" + properties.getDefaultQueueGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public class JetStreamObservableQueue implements ObservableQueue {
private final String queueGroup;

public JetStreamObservableQueue(
JetStreamProperties properties, String queueType, String queueUri, Scheduler scheduler) {
JetStreamProperties properties,
String queueType,
String queueUri,
Scheduler scheduler) {
LOG.debug("JSM obs queue create, qtype={}, quri={}", queueType, queueUri);

this.queueUri = queueUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public Map<TaskModel.Status, ObservableQueue> getQueues(
}

private String getQueueGroup(final JetStreamProperties properties) {
if (properties.getDefaultQueueGroup() == null || properties.getDefaultQueueGroup().isBlank()) {
if (properties.getDefaultQueueGroup() == null
|| properties.getDefaultQueueGroup().isBlank()) {
return "";
}
return ":" + properties.getDefaultQueueGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,94 @@ public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] va
}
}

@Override
public void removeTask(String workflowId, String taskId) {
long startTime = Instant.now().toEpochMilli();

SearchResult<String> taskSearchResult =
searchTasks(
String.format("(taskId='%s') AND (workflowId='%s')", taskId, workflowId),
"*",
0,
1,
null);

if (taskSearchResult.getTotalHits() == 0) {
logger.error("Task: {} does not belong to workflow: {}", taskId, workflowId);
Monitors.error(className, "removeTask");
return;
}

DeleteRequest request = new DeleteRequest(taskIndexName, taskId);

try {
DeleteResponse response = elasticSearchClient.delete(request, RequestOptions.DEFAULT);

if (response.getResult() != DocWriteResponse.Result.DELETED) {
logger.error("Index removal failed - task not found by id: {}", workflowId);
Monitors.error(className, "removeTask");
return;
}
long endTime = Instant.now().toEpochMilli();
logger.debug(
"Time taken {} for removing task:{} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("remove_task", "", endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
} catch (IOException e) {
logger.error(
"Failed to remove task {} of workflow: {} from index", taskId, workflowId, e);
Monitors.error(className, "removeTask");
}
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {
try {
if (keys.length != values.length) {
throw new IllegalArgumentException("Number of keys and values do not match");
}

long startTime = Instant.now().toEpochMilli();
UpdateRequest request = new UpdateRequest(taskIndexName, taskId);
Map<String, Object> source =
IntStream.range(0, keys.length)
.boxed()
.collect(Collectors.toMap(i -> keys[i], i -> values[i]));
request.doc(source);

logger.debug("Updating task: {} of workflow: {} with {}", taskId, workflowId, source);
elasticSearchClient.update(request, RequestOptions.DEFAULT);
long endTime = Instant.now().toEpochMilli();
logger.debug(
"Time taken {} for updating task: {} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("update_task", "", endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
} catch (Exception e) {
logger.error("Failed to update task: {} of workflow: {}", taskId, workflowId, e);
Monitors.error(className, "update");
}
}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.runAsync(
() -> updateTask(workflowId, taskId, keys, values), executorService);
}

@Override
public CompletableFuture<Void> asyncUpdateWorkflow(
String workflowInstanceId, String[] keys, Object[] values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class TestElasticSearchRestDAOV7 extends ElasticSearchRestDaoBaseTest {
Expand Down Expand Up @@ -190,6 +191,84 @@ public void shouldIndexTaskAsync() throws Exception {
assertEquals(taskSummary.getTaskId(), tasks.get(0));
}

@Test
public void shouldRemoveTask() {
WorkflowSummary workflowSummary =
TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflowSummary);

// wait for workflow to be indexed
tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1);

TaskSummary taskSummary =
TestUtils.loadTaskSnapshot(
objectMapper, "task_summary", workflowSummary.getWorkflowId());
indexDAO.indexTask(taskSummary);

// Wait for the task to be indexed
List<String> tasks = tryFindResults(() -> searchTasks(taskSummary), 1);

indexDAO.removeTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId());

tasks = tryFindResults(() -> searchTasks(taskSummary), 0);

assertTrue("Task was not removed.", tasks.isEmpty());
}

@Test
public void shouldAsyncRemoveTask() throws Exception {
WorkflowSummary workflowSummary =
TestUtils.loadWorkflowSnapshot(objectMapper, "workflow_summary");
indexDAO.indexWorkflow(workflowSummary);

// wait for workflow to be indexed
tryFindResults(() -> searchWorkflows(workflowSummary.getWorkflowId()), 1);

TaskSummary taskSummary =
TestUtils.loadTaskSnapshot(
objectMapper, "task_summary", workflowSummary.getWorkflowId());
indexDAO.indexTask(taskSummary);

// Wait for the task to be indexed
List<String> tasks = tryFindResults(() -> searchTasks(taskSummary), 1);

indexDAO.asyncRemoveTask(workflowSummary.getWorkflowId(), taskSummary.getTaskId()).get();

tasks = tryFindResults(() -> searchTasks(taskSummary), 0);

assertTrue("Task was not removed.", tasks.isEmpty());
}

@Test
public void shouldNotRemoveTaskWhenNotAssociatedWithWorkflow() {
TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary");
indexDAO.indexTask(taskSummary);

// Wait for the task to be indexed
List<String> tasks = tryFindResults(() -> searchTasks(taskSummary), 1);

indexDAO.removeTask("InvalidWorkflow", taskSummary.getTaskId());

tasks = tryFindResults(() -> searchTasks(taskSummary), 0);

assertFalse("Task was removed.", tasks.isEmpty());
}

@Test
public void shouldNotAsyncRemoveTaskWhenNotAssociatedWithWorkflow() throws Exception {
TaskSummary taskSummary = TestUtils.loadTaskSnapshot(objectMapper, "task_summary");
indexDAO.indexTask(taskSummary);

// Wait for the task to be indexed
List<String> tasks = tryFindResults(() -> searchTasks(taskSummary), 1);

indexDAO.asyncRemoveTask("InvalidWorkflow", taskSummary.getTaskId()).get();

tasks = tryFindResults(() -> searchTasks(taskSummary), 0);

assertFalse("Task was removed.", tasks.isEmpty());
}

@Test
public void shouldAddTaskExecutionLogs() {
List<TaskExecLog> logs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ public static TaskSummary loadTaskSnapshot(ObjectMapper objectMapper, String res
}
}

public static TaskSummary loadTaskSnapshot(
ObjectMapper objectMapper, String resourceFileName, String workflowId) {
try {
String content = loadJsonResource(resourceFileName);
content = content.replace(WORKFLOW_INSTANCE_ID_PLACEHOLDER, workflowId);

return objectMapper.readValue(content, TaskSummary.class);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public static String loadJsonResource(String resourceFileName) {
try {
return Resources.toString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -57,7 +55,9 @@ public class PostgresExecutionDAO extends PostgresBaseDAO
public PostgresExecutionDAO(
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
super(retryTemplate, objectMapper, dataSource);
this.executor = Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(THREAD_GROUP, runnable));
this.executor =
Executors.newSingleThreadScheduledExecutor(
runnable -> new Thread(THREAD_GROUP, runnable));
}

private static String dateStr(Long timeInMs) {
Expand Down Expand Up @@ -317,18 +317,19 @@ public boolean removeWorkflow(String workflowId) {
return removed;
}

/**
* Scheduled executor based implementation.
*/
/** Scheduled executor based implementation. */
@Override
public boolean removeWorkflowWithExpiry(String workflowId, int ttlSeconds) {
executor.schedule(() -> {
try {
removeWorkflow(workflowId);
} catch (Throwable e) {
logger.warn("Unable to remove workflow: {} with expiry", workflowId, e);
}
}, ttlSeconds, TimeUnit.SECONDS);
executor.schedule(
() -> {
try {
removeWorkflow(workflowId);
} catch (Throwable e) {
logger.warn("Unable to remove workflow: {} with expiry", workflowId, e);
}
},
ttlSeconds,
TimeUnit.SECONDS);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
*/
package com.netflix.conductor.postgres.dao;

import com.google.common.collect.Iterables;
import java.util.List;

import org.flywaydb.core.Flyway;
Expand All @@ -32,6 +31,8 @@
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.postgres.config.PostgresConfiguration;

import com.google.common.collect.Iterables;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.service.WorkflowService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -33,17 +31,16 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.StartWorkflowInput;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.netflix.conductor.service.WorkflowService;

import com.fasterxml.jackson.databind.ObjectMapper;

Expand Down Expand Up @@ -195,7 +192,8 @@ public void testListenerOnCompletedWorkflow() throws IOException, InterruptedExc
@SuppressWarnings("BusyWait")
private void checkIfWorkflowIsCompleted(String id) throws InterruptedException {
int statusRetrieveAttempts = 0;
while (workflowExecutor.getExecutionStatus(id, false).getStatus() != Workflow.WorkflowStatus.COMPLETED) {
while (workflowExecutor.getExecutionStatus(id, false).getStatus()
!= Workflow.WorkflowStatus.COMPLETED) {
if (statusRetrieveAttempts > 5) {
break;
}
Expand Down

0 comments on commit b9e0234

Please sign in to comment.