Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Improve waiting strategy for Integration tests
Browse files Browse the repository at this point in the history
Instead of relying on okhttp async implementation,
we use executor service to invoke recurring requests
 to the service till completion.

This is designed to decrease the CPU load even lower
while maintaining more readable and easier-to-maintain code.

Signed-off-by: Moti Asayag <masayag@redhat.com>
  • Loading branch information
masayag committed Jun 6, 2023
1 parent 02a1e12 commit 1ecfab2
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class EscalationFlowTest {
private static final String WORKFLOW_NAME = "workflowStartingCheckingAndEscalation";

@Test
public void runEscalationFlow() throws ApiException, InterruptedException {
public void runEscalationFlow() throws ApiException {
log.info("******** Running The Escalation workFlow ********");
TestComponents components = new WorkFlowTestBuilder().withDefaultProject().withWorkFlowDefinition(WORKFLOW_NAME)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SimpleRollbackWorkFlowTest {
private static final String WORKFLOW_NAME = "simpleFailedWorkFlow" + WorkFlowConstants.INFRASTRUCTURE_WORKFLOW;

@Test
public void runRollbackWorkFlow() throws ApiException, InterruptedException {
public void runRollbackWorkFlow() throws ApiException {
log.info("******** Running The Simple WorkFlow ********");
TestComponents components = new WorkFlowTestBuilder().withDefaultProject()
.withWorkFlowDefinition(WORKFLOW_NAME, getWorkFlowDefinitionResponseConsumer()).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.redhat.parodos.sdkutils;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class RetryExecutorService<T> implements AutoCloseable {

private final ExecutorService executor;

public RetryExecutorService() {
executor = Executors.newFixedThreadPool(1);
}

/**
* Submit a task to the executor service, retrying on failure until the task succeeds
* @param task The task to submit
* @return The result of the task
*/
public T submitWithRetry(Callable<T> task) {
// @formatter:off
return submitWithRetry(task, () -> {}, () -> {}, 10 * 60 * 1000, 5000);
// @formatter:on
}

/**
* Submit a task to the executor service, retrying on failure until the task
* @param task The task to submit
* @param onSuccess A callback to invoke when the task succeeds
* @param onFailure A callback to invoke when the task fails
* @param maxRetryTime The maximum time to retry the task for
* @param retryDelay The delay between retries
* @return The result of the task
*/
public T submitWithRetry(Callable<T> task, Runnable onSuccess, Runnable onFailure, long maxRetryTime,
long retryDelay) {
Future<T> future = executor.submit(() -> {
long startTime = System.currentTimeMillis();
long endTime = startTime + maxRetryTime;

while (System.currentTimeMillis() < endTime) {
try {
T result = task.call();
onSuccess.run();
return result; // Success, no need to retry
}
catch (Exception e) {
// Task failed, invoke onFailure callback
onFailure.run();

// Sleep for the retry delay
try {
// FIXME: This is a blocking call, we should use a non-blocking
// sleep
Thread.sleep(retryDelay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return null; // Interrupted, exit the task
}
}
}

return null; // Retry limit reached
});

try {
return future.get();
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
executor.shutdown();
boolean awaited = executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (!awaited) {
throw new RuntimeException("Failed to await termination of executor service");
}
}

}
203 changes: 39 additions & 164 deletions sdk-utils/src/main/java/com/redhat/parodos/sdkutils/SdkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.Callable;
import java.util.stream.Stream;

import javax.annotation.Nullable;
Expand All @@ -16,17 +13,15 @@
import com.redhat.parodos.sdk.api.LoginApi;
import com.redhat.parodos.sdk.api.ProjectApi;
import com.redhat.parodos.sdk.api.WorkflowApi;
import com.redhat.parodos.sdk.invoker.ApiCallback;
import com.redhat.parodos.sdk.invoker.ApiClient;
import com.redhat.parodos.sdk.invoker.ApiException;
import com.redhat.parodos.sdk.invoker.ApiResponse;
import com.redhat.parodos.sdk.invoker.Configuration;
import com.redhat.parodos.sdk.model.ProjectRequestDTO;
import com.redhat.parodos.sdk.model.ProjectResponseDTO;
import com.redhat.parodos.sdk.model.WorkFlowStatusResponseDTO;
import com.redhat.parodos.sdk.model.WorkFlowStatusResponseDTO.StatusEnum;
import com.redhat.parodos.workflow.utils.CredUtils;
import lombok.Data;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.springframework.core.env.MissingRequiredPropertiesException;
Expand All @@ -46,8 +41,7 @@ private SdkUtils() {
* in environment variables.
* @return the ApiClient
*/
public static ApiClient getParodosAPiClient()
throws ApiException, MissingRequiredPropertiesException, InterruptedException {
public static ApiClient getParodosAPiClient() throws ApiException, MissingRequiredPropertiesException {
ApiClient apiClient = Configuration.getDefaultApiClient();
String serverIp = Optional.ofNullable(System.getenv("SERVER_IP")).orElse("localhost");
String serverPort = Optional.ofNullable(System.getenv("SERVER_PORT")).orElse("8080");
Expand All @@ -57,7 +51,7 @@ public static ApiClient getParodosAPiClient()
}

int port = Integer.parseInt(serverPort);
if (port <= 0 && port > 65535) {
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("serverPort must be > 0 && <= 65535");
}

Expand Down Expand Up @@ -102,152 +96,66 @@ private static String getCookieValue(List<String> cookieHeaders, String anObject
return token;
}

/**
* Executes a @see FuncExecutor. Waits at most 60 seconds for a successful result of
* an async API invocation.
* @param f the @see FuncExecutor
* @param <T> the type of the function executor
* @return @see AsyncResult
* @throws ApiException if the api invocation fails
* @throws InterruptedException If the async call reaches the waiting timeout
*/
public static <T> T waitAsyncResponse(FuncExecutor<T> f) throws ApiException, InterruptedException {
AsyncResult<T> asyncResult = new AsyncResult<>();
Lock lock = new ReentrantLock();
Condition response = lock.newCondition();
ApiCallback<T> apiCallback = new ApiCallback<T>() {

@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
log.info("onFailure {}", e.getMessage());
try {
Thread.sleep(3000);
f.execute(this);
}
catch (ApiException | InterruptedException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}

@Override
public void onSuccess(T result, int statusCode, Map<String, List<String>> responseHeaders) {
if (f.check(result, statusCode)) {
try {
Thread.sleep(3000);
f.execute(this);
}
catch (ApiException | InterruptedException apie) {
asyncResult.setError(apie.getMessage());
signal();
}
}
else {
asyncResult.setStatusCode(statusCode);
asyncResult.setResult(result);
asyncResult.setError(null);
signal();
}
}

@Override
public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
}

@Override
public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
}

private void signal() {
lock.lock();
try {
response.signal();
}
finally {
lock.unlock();
}
}
};
f.execute(apiCallback);
lock.lock();
try {
// should be more than enough
response.await(60, TimeUnit.SECONDS);
if (asyncResult.getError() != null) {
throw new ApiException(
"An error occurred while executing waitAsyncResponse: " + asyncResult.getError());
}
}
finally {
lock.unlock();
}
return asyncResult.getResult();
}

/**
* Invokes @see
* com.redhat.parodos.sdk.api.ProjectAPI#getProjectsAsync(ApiCallback<List<ProjectResponseDTO>>)
* and retries for 60 seconds.
* @param projectApi the Project API
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static void waitProjectStart(ProjectApi projectApi) throws ApiException, InterruptedException {
waitAsyncResponse(new FuncExecutor<List<ProjectResponseDTO>>() {
@Override
public boolean check(List<ProjectResponseDTO> result, int statusCode) {
return statusCode != 200;
}

@Override
public void execute(@NonNull ApiCallback<List<ProjectResponseDTO>> callback) throws ApiException {
projectApi.getProjectsAsync(callback);
}
});
public static void waitProjectStart(ProjectApi projectApi) {
try (var executorService = new RetryExecutorService<Void>()) {
Callable<Void> task = () -> {
projectApi.getProjects();
return null;
};

executorService.submitWithRetry(task);
}
catch (Exception e) {
throw new RuntimeException("Project API is not up and running", e);
}
}

/**
* Invokes @see com.redhat.parodos.sdk.api.WorkflowApi#getStatusAsync(String,
* ApiCallback<WorkFlowStatusResponseDTO>) and retries for 60 seconds.
* @param workflowApi the WorkflowAPI
* @param workFlowExecutionId the workflow execution Id to monitor, as {String}
* @param workFlowExecutionId the workflow execution ID to monitor, as {String}
* @return the workflow status if it's equal to @see
* com.redhat.parodos.workflows.work.WorkStatus#COMPLETED
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId)
throws InterruptedException, ApiException {
return waitWorkflowStatusAsync(workflowApi, workFlowExecutionId,
WorkFlowStatusResponseDTO.StatusEnum.COMPLETED);
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId) {
return waitWorkflowStatusAsync(workflowApi, workFlowExecutionId, StatusEnum.COMPLETED);
}

/**
* Invokes @see com.redhat.parodos.sdk.api.WorkflowApi#getStatusAsync(String,
* ApiCallback<WorkFlowStatusResponseDTO>) and retries for 60 seconds.
* @param workflowApi the WorkflowAPI
* @param workFlowExecutionId the workflow execution Id to monitor, as {String}
* @param status the status to wait for
* @return the workflow status if it's equal to @see
* @param workFlowExecutionId the workflow execution ID to monitor, as {String}
* @param expectedStatus the expectedStatus to wait for
* @return the workflow expectedStatus if it's equal to @see
* com.redhat.parodos.workflows.work.WorkStatus#COMPLETED
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API method invocation fails
*/
public static WorkFlowStatusResponseDTO waitWorkflowStatusAsync(WorkflowApi workflowApi, UUID workFlowExecutionId,
WorkFlowStatusResponseDTO.StatusEnum status) throws InterruptedException, ApiException {

WorkFlowStatusResponseDTO workFlowStatusResponseDTO = waitAsyncResponse(new FuncExecutor<>() {
@Override
public boolean check(WorkFlowStatusResponseDTO result, int statusCode) {
return result.getStatus() != status;
}
StatusEnum expectedStatus) {
WorkFlowStatusResponseDTO result;

try (var executorService = new RetryExecutorService<WorkFlowStatusResponseDTO>()) {
Callable<WorkFlowStatusResponseDTO> task = () -> {
WorkFlowStatusResponseDTO status = workflowApi.getStatus(workFlowExecutionId);
if (status.getStatus() != expectedStatus) {
throw new ApiException("Workflow status is not " + expectedStatus);
}
return status;
};

@Override
public void execute(@NonNull ApiCallback<WorkFlowStatusResponseDTO> callback) throws ApiException {
workflowApi.getStatusAsync(workFlowExecutionId, callback);
}
});
return workFlowStatusResponseDTO;
result = executorService.submitWithRetry(task);
}
catch (Exception e) {
throw new RuntimeException("Workflow status is not " + expectedStatus, e);
}
return result;
}

/**
Expand All @@ -266,38 +174,6 @@ public static ProjectResponseDTO getProjectByNameAndDescription(List<ProjectResp
.findAny().orElse(null);
}

@Data
private static class AsyncResult<T> {

private String error;

T result;

int statusCode;

}

public interface FuncExecutor<T> {

/**
* Defines the @see com.redhat.parodos.sdk.invoker.ApiCallback to execute
* @param callback the
* @throws ApiException If the API callback invocation fails
*/
void execute(@NonNull ApiCallback<T> callback) throws ApiException;

/**
* Define when considering an ApiCallback result as successful.
* @param result the result to check
* @return {true} if it is necessary to continue monitoring the result, {false}
* when it's possible to stop the monitoring.
*/
default boolean check(T result, int statusCode) {
return true;
}

}

/**
* Checks if a project with {projectName} and {projectDescription} exists. Creates a
* new project, if it doesn't exist and asserts that it has been successfully created.
Expand All @@ -308,7 +184,6 @@ default boolean check(T result, int statusCode) {
* @param projectName the project name
* @param projectDescription the project description
* @return The ProjectApiResponse
* @throws InterruptedException If the async call reaches the waiting timeout
* @throws ApiException If the API methods invocations fail
*/
public static ProjectResponseDTO getProjectAsync(ApiClient apiClient, String projectName, String projectDescription)
Expand Down

0 comments on commit 1ecfab2

Please sign in to comment.