Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Improve waiting strategy for Integration tests #399

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class EscalationFlowTest {
private static final String WORKFLOW_NAME = "workflowStartingCheckingAndEscalation";

@Test
public void runEscalationFlow() throws ApiException, InterruptedException {
public void runEscalationFlow() throws ApiException {
log.info("******** Running The Escalation workFlow ********");
TestComponents components = new WorkFlowTestBuilder().withDefaultProject().withWorkFlowDefinition(WORKFLOW_NAME)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SimpleRollbackWorkFlowTest {
private static final String WORKFLOW_NAME = "simpleFailedWorkFlow" + WorkFlowConstants.INFRASTRUCTURE_WORKFLOW;

@Test
public void runRollbackWorkFlow() throws ApiException, InterruptedException {
public void runRollbackWorkFlow() throws ApiException {
log.info("******** Running The Simple WorkFlow ********");
TestComponents components = new WorkFlowTestBuilder().withDefaultProject()
.withWorkFlowDefinition(WORKFLOW_NAME, getWorkFlowDefinitionResponseConsumer()).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.redhat.parodos.sdkutils;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class RetryExecutorService<T> implements AutoCloseable {

private final ExecutorService executor;

public RetryExecutorService() {
executor = Executors.newFixedThreadPool(1);
}

/**
* Submit a task to the executor service, retrying on failure until the task succeeds
* @param task The task to submit
* @return The result of the task
*/
public T submitWithRetry(Callable<T> task) {
// @formatter:off
return submitWithRetry(task, () -> {}, () -> {}, 10 * 60 * 1000, 5000);
Copy link
Collaborator Author

@masayag masayag Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gciavarrini what should be the timeout? was it used to be 1 minute in the previous implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the bash implementation there were 100 retries.
I chose 1 minute because it seemed like a reasonable time interval and I didn't receive any feedback on this.
TBH, I'm not sure which is the best timeout value :)

// @formatter:on
}

/**
* Submit a task to the executor service, retrying on failure until the task
* @param task The task to submit
* @param onSuccess A callback to invoke when the task succeeds
* @param onFailure A callback to invoke when the task fails
* @param maxRetryTime The maximum time to retry the task for
* @param retryDelay The delay between retries
* @return The result of the task
*/
public T submitWithRetry(Callable<T> task, Runnable onSuccess, Runnable onFailure, long maxRetryTime,
long retryDelay) {
Future<T> future = executor.submit(() -> {
long startTime = System.currentTimeMillis();
long endTime = startTime + maxRetryTime;

while (System.currentTimeMillis() < endTime) {
try {
T result = task.call();
onSuccess.run();
return result; // Success, no need to retry
}
catch (Exception e) {
// Task failed, invoke onFailure callback
onFailure.run();

// Sleep for the retry delay
try {
// FIXME: This is a blocking call, we should use a non-blocking
// sleep
Thread.sleep(retryDelay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return null; // Interrupted, exit the task
}
}
}

return null; // Retry limit reached
});

try {
return future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
executor.shutdown();
boolean awaited = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (!awaited) {
throw new RuntimeException("Failed to await termination of executor service");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.Callable;
import java.util.stream.Stream;

import javax.annotation.Nullable;
Expand All @@ -16,17 +13,15 @@
import com.redhat.parodos.sdk.api.LoginApi;
import com.redhat.parodos.sdk.api.ProjectApi;
import com.redhat.parodos.sdk.api.WorkflowApi;
import com.redhat.parodos.sdk.invoker.ApiCallback;
import com.redhat.parodos.sdk.invoker.ApiClient;
import com.redhat.parodos.sdk.invoker.ApiException;
import com.redhat.parodos.sdk.invoker.ApiResponse;
import com.redhat.parodos.sdk.invoker.Configuration;
import com.redhat.parodos.sdk.model.ProjectRequestDTO;
import com.redhat.parodos.sdk.model.ProjectResponseDTO;
import com.redhat.parodos.sdk.model.WorkFlowStatusResponseDTO;
import com.redhat.parodos.sdk.model.WorkFlowStatusResponseDTO.StatusEnum;
import com.redhat.parodos.workflow.utils.CredUtils;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.core.env.MissingRequiredPropertiesException;
Expand All @@ -46,8 +41,7 @@ private WorkFlowServiceUtils() {
* in environment variables.
* @return the ApiClient
*/
public static ApiClient getParodosAPiClient()
throws ApiException, MissingRequiredPropertiesException, InterruptedException {
public static ApiClient getParodosAPiClient() throws ApiException, MissingRequiredPropertiesException {
ApiClient apiClient = Configuration.getDefaultApiClient();
String serverIp = Optional.ofNullable(System.getenv("WORKFLOW_SERVICE_HOST")).orElse("localhost");
String serverPort = Optional.ofNullable(System.getenv("SERVER_PORT")).orElse("8080");
Expand Down Expand Up @@ -102,152 +96,66 @@ private static String getCookieValue(List<String> cookieHeaders, String anObject
return token;
}

/**
* Executes a @see FuncExecutor. Waits at most 60 seconds for a successful result of
* an async API invocation.
* @param f the @see FuncExecutor
* @param <T> the type of the function executor
* @return @see AsyncResult
* @throws ApiException if the api invocation fails
* @throws InterruptedException If the async call reaches the waiting timeout
*/
public static <T> T waitAsyncResponse(FuncExecutor<T> f) throws ApiException, InterruptedException {
AsyncResult<T> asyncResult = new AsyncResult<>();
Lock lock = new ReentrantLock();
Condition response = lock.newCondition();
ApiCallback<T> apiCallback = new ApiCallback<T>() {

@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
log.info("onFailure {}", e.getMessage());
try {
Thread.sleep(3000);
f.execute(this);
}
catch (ApiException | InterruptedException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}

@Override
public void onSuccess(T result, int statusCode, Map<String, List<String>> responseHeaders) {
if (f.check(result, statusCode)) {
try {
Thread.sleep(3000);
f.execute(this);
}
catch (ApiException | InterruptedException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}
else {
asyncResult.setStatusCode(statusCode);
asyncResult.setResult(result);
asyncResult.setError(null);
signal();
}
}

@Override
public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
}

@Override
public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
}

private void signal() {
lock.lock();
try {
response.signal();
}
finally {
lock.unlock();
}
}
};
f.execute(apiCallback);
lock.lock();
try {
// should be more than enough
response.await(60, TimeUnit.SECONDS);
if (asyncResult.getError() != null) {
throw new ApiException(
"An error occurred while executing waitAsyncResponse: " + asyncResult.getError());
}
}
finally {
lock.unlock();
}
return asyncResult.getResult();
}

/**
* Invokes @see
* com.redhat.parodos.sdk.api.ProjectAPI#getProjectsAsync(ApiCallback<List<ProjectResponseDTO>>)
* and retries for 60 seconds.
* @param projectApi the Project API
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static void waitProjectStart(ProjectApi projectApi) throws ApiException, InterruptedException {
waitAsyncResponse(new FuncExecutor<List<ProjectResponseDTO>>() {
@Override
public boolean check(List<ProjectResponseDTO> result, int statusCode) {
return statusCode != 200;
}

@Override
public void execute(@NonNull ApiCallback<List<ProjectResponseDTO>> callback) throws ApiException {
projectApi.getProjectsAsync(callback);
}
});
public static void waitProjectStart(ProjectApi projectApi) {
try (var executorService = new RetryExecutorService<Void>()) {
Callable<Void> task = () -> {
projectApi.getProjects();
return null;
};

executorService.submitWithRetry(task);
}
catch (Exception e) {
throw new RuntimeException("Project API is not up and running", e);
}
}

/**
* Invokes @see com.redhat.parodos.sdk.api.WorkflowApi#getStatusAsync(String,
* ApiCallback<WorkFlowStatusResponseDTO>) and retries for 60 seconds.
* @param workflowApi the WorkflowAPI
* @param workFlowExecutionId the workflow execution Id to monitor, as {String}
* @param workFlowExecutionId the workflow execution ID to monitor, as {String}
* @return the workflow status if it's equal to @see
* com.redhat.parodos.workflows.work.WorkStatus#COMPLETED
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId)
throws InterruptedException, ApiException {
return waitWorkflowStatusAsync(workflowApi, workFlowExecutionId,
WorkFlowStatusResponseDTO.StatusEnum.COMPLETED);
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId) {
return waitWorkflowStatusAsync(workflowApi, workFlowExecutionId, StatusEnum.COMPLETED);
}

/**
* Invokes @see com.redhat.parodos.sdk.api.WorkflowApi#getStatusAsync(String,
* ApiCallback<WorkFlowStatusResponseDTO>) and retries for 60 seconds.
* @param workflowApi the WorkflowAPI
* @param workFlowExecutionId the workflow execution Id to monitor, as {String}
* @param status the status to wait for
* @return the workflow status if it's equal to @see
* @param workFlowExecutionId the workflow execution ID to monitor, as {String}
* @param expectedStatus the expectedStatus to wait for
* @return the workflow expectedStatus if it's equal to @see
* com.redhat.parodos.workflows.work.WorkStatus#COMPLETED
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId,
WorkFlowStatusResponseDTO.StatusEnum status) throws InterruptedException, ApiException {

WorkFlowStatusResponseDTO workFlowStatusResponseDTO = waitAsyncResponse(new FuncExecutor<>() {
@Override
public boolean check(WorkFlowStatusResponseDTO result, int statusCode) {
return result.getStatus() != status;
}
StatusEnum expectedStatus) {
WorkFlowStatusResponseDTO result;

try (var executorService = new RetryExecutorService<WorkFlowStatusResponseDTO>()) {
Callable<WorkFlowStatusResponseDTO> task = () -> {
WorkFlowStatusResponseDTO status = workflowApi.getStatus(workFlowExecutionId);
if (status.getStatus() != expectedStatus) {
throw new ApiException("Workflow status is not " + expectedStatus);
}
return status;
};

@Override
public void execute(@NonNull ApiCallback<WorkFlowStatusResponseDTO> callback) throws ApiException {
workflowApi.getStatusAsync(workFlowExecutionId, callback);
}
});
return workFlowStatusResponseDTO;
result = executorService.submitWithRetry(task);
}
catch (Exception e) {
throw new RuntimeException("Workflow status is not " + expectedStatus, e);
}
return result;
}

/**
Expand All @@ -266,38 +174,6 @@ public static ProjectResponseDTO getProjectByNameAndDescription(List<ProjectResp
.findAny().orElse(null);
}

@Data
private static class AsyncResult<T> {

private String error;

T result;

int statusCode;

}

public interface FuncExecutor<T> {

/**
* Defines the @see com.redhat.parodos.sdk.invoker.ApiCallback to execute
* @param callback the
* @throws ApiException If the API callback invocation fails
*/
void execute(@NonNull ApiCallback<T> callback) throws ApiException;

/**
* Define when considering an ApiCallback result as successful.
* @param result the result to check
* @return {true} if it is necessary to continue monitoring the result, {false}
* when it's possible to stop the monitoring.
*/
default boolean check(T result, int statusCode) {
return true;
}

}

/**
* Checks if a project with {projectName} and {projectDescription} exists. Creates a
* new project, if it doesn't exist and asserts that it has been successfully created.
Expand All @@ -308,7 +184,6 @@ default boolean check(T result, int statusCode) {
* @param projectName the project name
* @param projectDescription the project description
* @return The ProjectApiResponse
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API methods invocations fail
*/
public static ProjectResponseDTO getProjectAsync(ApiClient apiClient, String projectName, String projectDescription)
Expand Down