Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

[FLPATH-470] Restart workflow - backend #444

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.redhat.parodos.flows;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;

import com.redhat.parodos.flows.common.WorkFlowTestBuilder;
import com.redhat.parodos.flows.common.WorkFlowTestBuilder.TestComponents;
Expand Down Expand Up @@ -79,24 +81,52 @@ public void runComplexWorkFlow() throws ApiException, InterruptedException {
log.info("Onboarding workflow id {}", workFlowDefinitions.get(0).getId());
log.info("Onboarding workflow name {}", workFlowDefinitions.get(0).getName());

WorkRequestDTO work1 = new WorkRequestDTO();
work1.setWorkName("certWorkFlowTask");
work1.setArguments(Arrays.asList(new ArgumentRequestDTO().key("user-id").value("test-user-id"),
new ArgumentRequestDTO().key("api-server").value("api.com")));

WorkRequestDTO work2 = new WorkRequestDTO();
work2.setWorkName("adGroupWorkFlowTask");
work2.setArguments(Arrays.asList(new ArgumentRequestDTO().key("user-id").value("test-user-id"),
new ArgumentRequestDTO().key("api-server").value("api.com")));

WorkRequestDTO work3 = new WorkRequestDTO();
work3.setWorkName("dynatraceWorkFlowTask");
work3.setArguments(Arrays.asList(new ArgumentRequestDTO().key("user-id").value("test-user-id"),
new ArgumentRequestDTO().key("api-server").value("api.com")));
WorkRequestDTO adGroupsTask = new WorkRequestDTO();
adGroupsTask.setWorkName("adGroupsWorkFlowTask");
adGroupsTask.setArguments(Arrays.asList(new ArgumentRequestDTO().key("userId").value("test-user-id"),
new ArgumentRequestDTO().key("adGroups").value("adGroupsVALUE")));
adGroupsTask.setType(WorkRequestDTO.TypeEnum.TASK);

WorkRequestDTO namespaceWorkFlowTask = new WorkRequestDTO();
namespaceWorkFlowTask.setWorkName("namespaceWorkFlowTask");
namespaceWorkFlowTask.setArguments(Collections
.singletonList(new ArgumentRequestDTO().key("projectId").value(UUID.randomUUID().toString())));
namespaceWorkFlowTask.setType(WorkRequestDTO.TypeEnum.TASK);

WorkRequestDTO sslCertificationWorkFlowTask = new WorkRequestDTO();
sslCertificationWorkFlowTask.setWorkName("sslCertificationWorkFlowTask");
sslCertificationWorkFlowTask
.setArguments(Arrays.asList(new ArgumentRequestDTO().key("domainName").value("api.com"),
new ArgumentRequestDTO().key("ipAddress").value("127.0.0.1")));
sslCertificationWorkFlowTask.setType(WorkRequestDTO.TypeEnum.TASK);

// splunkMonitoringWorkFlowTask and adGroupsWorkFlowTask are task of
// subWorkFlowOne
WorkRequestDTO subWorkFlowOne = new WorkRequestDTO();
subWorkFlowOne.workName("subWorkFlowOne");
subWorkFlowOne.setType(WorkRequestDTO.TypeEnum.WORKFLOW);
subWorkFlowOne.addWorksItem(adGroupsTask);

// namespaceWorkFlowTask is task of subWorkFlowTwo and subWorkFlowOne is a
// sub-workflow
WorkRequestDTO subWorkFlowTwo = new WorkRequestDTO();
subWorkFlowTwo.workName("subWorkFlowTwo");
subWorkFlowTwo.setType(WorkRequestDTO.TypeEnum.WORKFLOW);
subWorkFlowTwo.addWorksItem(namespaceWorkFlowTask);
subWorkFlowTwo.addWorksItem(subWorkFlowOne);

// sslCertificationWorkFlowTask is task of subWorkFlowTwo and subWorkFlowTwo is a
// sub-workflow
WorkRequestDTO subWorkFlowThree = new WorkRequestDTO();
subWorkFlowThree.workName("subWorkFlowThree");
subWorkFlowThree.setType(WorkRequestDTO.TypeEnum.WORKFLOW);
subWorkFlowThree.addWorksItem(sslCertificationWorkFlowTask);
subWorkFlowThree.addWorksItem(subWorkFlowTwo);

workFlowRequestDTO.setProjectId(components.project().getId());
workFlowRequestDTO.setWorkFlowName(workFlowDefinitions.get(0).getName());
workFlowRequestDTO.setWorks(Arrays.asList(work1, work2, work3));
workFlowRequestDTO.setWorks(Arrays.asList(subWorkFlowOne, subWorkFlowTwo, subWorkFlowThree, adGroupsTask,
namespaceWorkFlowTask, sslCertificationWorkFlowTask));
workFlowResponseDTO = workflowApi.execute(workFlowRequestDTO);

assertNotNull("There is no valid WorkFlowExecutionId", workFlowResponseDTO.getWorkFlowExecutionId());
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import com.redhat.parodos.workflow.exception.MissingParameterException;
import com.redhat.parodos.workflow.task.log.WorkFlowTaskLogger;
Expand Down Expand Up @@ -116,6 +117,21 @@ public String getRequiredParameterValue(String parameterName) throws MissingPara
});
}

public boolean validateWorkflowParameters(WorkContext workContext) {
gabriel-farache marked this conversation as resolved.
Show resolved Hide resolved
AtomicBoolean valid = new AtomicBoolean(true);
getWorkFlowTaskParameters().stream().filter(param -> !param.isOptional()).forEach(param -> {
try {
getRequiredParameterValue(param.getKey());
}
catch (MissingParameterException e) {
valid.set(false);
log.warn("{} is missing from workContext for workflow task {}", param.getKey(), getName());
}
});

return valid.get();
}

/**
* Gets an optional parameter. Returns the defaultValue if not found
* @param parameterName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public T submitWithRetry(Callable<T> task) {
// @formatter:on
}

public T submitWithRetry(Callable<T> task, long maxRetryTime) {
// @formatter:off
return submitWithRetry(task, () -> {}, () -> {}, maxRetryTime, RETRY_DELAY);
// @formatter:on
}

/**
* Submit a task to the executor service, retrying on failure until the task
* @param task The task to submit
Expand All @@ -58,8 +64,13 @@ public T submitWithRetry(Callable<T> task, Runnable onSuccess, Runnable onFailur
onSuccess.run();
future.complete(result); // Success, complete the future with the result
}
catch (WorkFlowServiceUtils.InProgressStatusException e) {
return;
}
catch (Exception e) {
onFailure.run();
future.completeExceptionally(e);
gabriel-farache marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}, 0, retryDelay, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,22 @@ public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi work
try (var executorService = new RetryExecutorService<WorkFlowStatusResponseDTO>()) {
Callable<WorkFlowStatusResponseDTO> task = () -> {
WorkFlowStatusResponseDTO status = workflowApi.getStatus(workFlowExecutionId);
if (status.getStatus() == StatusEnum.IN_PROGRESS) {
throw new InProgressStatusException(
"Workflow status is still in progress, not yet in state: %s".formatted(expectedStatus));
}
if (status.getStatus() != expectedStatus) {
throw new ApiException("Workflow status is not " + expectedStatus);
throw new WrongStatusException(
"Workflow is finished and its status is %s while was expecting it to be %s"
.formatted(status.getStatus(), expectedStatus));
}
return status;
};

result = executorService.submitWithRetry(task);
}
catch (Exception e) {
throw new RuntimeException("Workflow status is not " + expectedStatus, e);
throw new RuntimeException("Workflow status is not %s".formatted(expectedStatus), e);
}
return result;
}
Expand Down Expand Up @@ -236,4 +242,20 @@ public static ProjectResponseDTO getProjectAsync(ApiClient apiClient, String pro
return testProject;
}

public static class InProgressStatusException extends Exception {

public InProgressStatusException(String message) {
super(message);
}

}

public static class WrongStatusException extends Exception {

public WrongStatusException(String message) {
super(message);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public class AdGroupsWorkFlowTask extends BaseInfrastructureWorkFlowTask {
@Override
public WorkReport execute(WorkContext workContext) {
log.info("AdGroupsWorkFlowTask");
if (!validateWorkflowParameters(workContext)) {
log.warn("Missing keys in context for workflowExecution {}, context: {}", getMainExecutionId(),
workContext);
return new DefaultWorkReport(WorkStatus.FAILED, workContext);
}
return new DefaultWorkReport(WorkStatus.COMPLETED, workContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public class NamespaceWorkFlowTask extends BaseInfrastructureWorkFlowTask {
@Override
public WorkReport execute(WorkContext workContext) {
log.info("NamespaceWorkFlowTask");
if (!validateWorkflowParameters(workContext)) {
log.warn("Missing keys in context for workflowExecution {}, context: {}", getMainExecutionId(),
workContext);
return new DefaultWorkReport(WorkStatus.FAILED, workContext);
}
return new DefaultWorkReport(WorkStatus.COMPLETED, workContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public class SslCertificationWorkFlowTask extends BaseInfrastructureWorkFlowTask
@Override
public WorkReport execute(WorkContext workContext) {
log.info("SslCertificationWorkFlowTask");
if (!validateWorkflowParameters(workContext)) {
log.warn("Missing keys in context for workflowExecution {}, context: {}", getMainExecutionId(),
workContext);
return new DefaultWorkReport(WorkStatus.FAILED, workContext);
}
return new DefaultWorkReport(WorkStatus.COMPLETED, workContext);
}

Expand Down
1 change: 1 addition & 0 deletions workflow-service-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Class | Method | HTTP request | Description
*WorkflowApi* | [**getStatus**](docs/WorkflowApi.md#getStatus) | **GET** /api/v1/workflows/{workFlowExecutionId}/status | Returns a workflow status
*WorkflowApi* | [**getStatusByProjectId**](docs/WorkflowApi.md#getStatusByProjectId) | **GET** /api/v1/workflows | Returns workflows by project id
*WorkflowApi* | [**getWorkflowParameters**](docs/WorkflowApi.md#getWorkflowParameters) | **GET** /api/v1/workflows/{workFlowExecutionId}/context | Returns workflow context parameters
*WorkflowApi* | [**restartWorkFlow**](docs/WorkflowApi.md#restartWorkFlow) | **POST** /api/v1/workflows/{workFlowExecutionId}/restart | Restart a workflow execution with same parameters
*WorkflowApi* | [**updateWorkFlowCheckerTaskStatus**](docs/WorkflowApi.md#updateWorkFlowCheckerTaskStatus) | **POST** /api/v1/workflows/{workFlowExecutionId}/checkers/{workFlowCheckerTaskName} | Updates a workflow checker task status
*WorkflowDefinitionApi* | [**getWorkFlowDefinitionById**](docs/WorkflowDefinitionApi.md#getWorkFlowDefinitionById) | **GET** /api/v1/workflowdefinitions/{id} | Returns information about a workflow definition by id
*WorkflowDefinitionApi* | [**getWorkFlowDefinitions**](docs/WorkflowDefinitionApi.md#getWorkFlowDefinitions) | **GET** /api/v1/workflowdefinitions | Returns a list of workflow definition
Expand Down
Loading