Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Add a workflow task that uses JDBC
Browse files Browse the repository at this point in the history
FLPATH-215 https://issues.redhat.com/browse/FLPATH-215

Signed-off-by: Yaron Dayagi <ydayagi@redhat.com>
  • Loading branch information
ydayagi committed May 31, 2023
1 parent d5eab82 commit 96a2170
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 0 deletions.
4 changes: 4 additions & 0 deletions prebuilt-tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
<artifactId>jira-rest-java-client-core</artifactId>
<version>${jira-rest-client.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.redhat.parodos.tasks.jdbc;

import java.util.List;
import java.util.Map;

interface JdbcService {

List<Map<String, Object>> query(String url, String statement);

void update(String url, String statement);

void execute(String url, String statement);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.redhat.parodos.tasks.jdbc;

import java.util.List;
import java.util.Map;

import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;

class JdbcServiceImpl implements JdbcService {

@Override
public List<Map<String, Object>> query(String url, String statement) {
return createJdbcTemplate(url).query(statement, new ColumnMapRowMapper());
}

@Override
public void update(String url, String statement) {
createJdbcTemplate(url).update(statement);
}

@Override
public void execute(String url, String statement) {
createJdbcTemplate(url).execute(statement);
}

private JdbcTemplate createJdbcTemplate(String url) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl(url);

return new JdbcTemplate(dataSource);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.redhat.parodos.tasks.jdbc;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.redhat.parodos.workflow.exception.MissingParameterException;
import com.redhat.parodos.workflow.parameter.WorkParameter;
import com.redhat.parodos.workflow.parameter.WorkParameterType;
import com.redhat.parodos.workflow.task.BaseWorkFlowTask;
import com.redhat.parodos.workflow.task.enums.WorkFlowTaskOutput;
import com.redhat.parodos.workflows.work.DefaultWorkReport;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
import com.redhat.parodos.workflows.work.WorkStatus;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JdbcWorkFlowTask extends BaseWorkFlowTask {

enum OperationType {

QUERY, UPDATE, EXECUTE

}

private JdbcService service;

public JdbcWorkFlowTask() {
this.service = new JdbcServiceImpl();
}

JdbcWorkFlowTask(String beanName, JdbcService service) {
this.setBeanName(beanName);
this.service = service;
}

@Override
public @NonNull List<WorkParameter> getWorkFlowTaskParameters() {
LinkedList<WorkParameter> params = new LinkedList<>();
params.add(WorkParameter.builder().key("url").type(WorkParameterType.TEXT).optional(false).description(
"JDBC URL. E.g. jdbc:postgresql://localhost:5432/service?user=service&password=service123&ssl=true&sslmode=verify-full")
.build());
params.add(WorkParameter.builder().key("operation").type(WorkParameterType.TEXT).optional(false).description(
"Type of operation this statement is performing. One of " + Arrays.toString(OperationType.values()))
.build());
params.add(WorkParameter.builder().key("statement").type(WorkParameterType.TEXT).optional(false)
.description("The database statement to execute. E.g. 'select * from table'").build());
params.add(WorkParameter.builder().key("result-ctx-key").type(WorkParameterType.TEXT).optional(true)
.description("In query operation the result is stored in WorkContext with the provided key").build());
return params;
}

@Override
public @NonNull List<WorkFlowTaskOutput> getWorkFlowTaskOutputs() {
return List.of(WorkFlowTaskOutput.OTHER);
}

@Override
public WorkReport execute(WorkContext workContext) {
String url = "";
try {
url = getRequiredParameterValue("url");
String operation = getRequiredParameterValue("operation");
String statement = getRequiredParameterValue("statement");
String resultCtxKey = getOptionalParameterValue("result-ctx-key", "");

OperationType operationType = OperationType.valueOf(operation.toUpperCase());

switch (operationType) {

case QUERY -> {
List<Map<String, Object>> result = this.service.query(url, statement);
if (!resultCtxKey.isEmpty()) {
workContext.put(resultCtxKey, new ObjectMapper().writeValueAsString(result));
}
}
case UPDATE -> {
this.service.update(url, statement);
}
case EXECUTE -> {
this.service.execute(url, statement);
}
}
}
catch (MissingParameterException | JsonProcessingException e) {
log.error("Jdbc task failed for URL " + url, e);
return new DefaultWorkReport(WorkStatus.FAILED, workContext, e);
}

return new DefaultWorkReport(WorkStatus.COMPLETED, workContext);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# JDBC WorkFlow task

## Motivation
Provide a task that can be used for CRUD operations via JDBC

## Input
- URL - a JDBC URL that has all required connection properties. E.g. jdbc:postgresql://localhost:5432/service?user=service&password=service123&ssl=true&sslmode=verify-full.
- Statement - the textual message/request/query that is passed to the JDBC driver. E.g. "select * from items".
- Operation type - operations are divided into 3 types. Execute is for statements such as 'drop table', 'create user'. Update is for statements such as insert, update, delete records (e.g. rows in table). Query is for retrieving records/items/info. E.g. SQL select.
- Context key for result - the context key where result of query operation is stored.

## Result
A failure to perform the operation will result in failed task execution. Only operation of type query returns a result. The result is stored with the context key that is passed as input. The result is a JSON representation of the result set.

## Configuration
The task uses the JDBC driver specified in the URL. E.g. mysql. In order to load the driver you need to add it as a maven dependency. E.g.
```
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.redhat.parodos.tasks.jdbc;

import com.redhat.parodos.workflow.context.WorkContextDelegate;
import com.redhat.parodos.workflow.exception.MissingParameterException;
import com.redhat.parodos.workflows.work.WorkContext;
import com.redhat.parodos.workflows.work.WorkReport;
import com.redhat.parodos.workflows.work.WorkStatus;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class JdbcWorkFlowTaskTest {

private final JdbcService service = mock(JdbcService.class);

private final JdbcWorkFlowTask task = new JdbcWorkFlowTask("Test", service);

@Test
public void missingArgs() {
WorkContext ctx = createWorkContext(null);
task.preExecute(ctx);
WorkReport result = task.execute(ctx);
assertEquals(WorkStatus.FAILED, result.getStatus());
assertEquals(MissingParameterException.class, result.getError().getClass());
}

@Test
public void query() {
WorkContext ctx = createWorkContext(JdbcWorkFlowTask.OperationType.QUERY.name());
List<Map<String, Object>> resultSet = List.of(Map.of("name", "value"));
String resultJson = "[{\"name\":\"value\"}]";
doReturn(resultSet).when(service).query(any(), any());
task.preExecute(ctx);
WorkReport result = task.execute(ctx);
assertEquals(WorkStatus.COMPLETED, result.getStatus());
assertEquals(resultJson, ctx.get("thekey"));
}

@Test
public void update() {
WorkContext ctx = createWorkContext(JdbcWorkFlowTask.OperationType.UPDATE.name());
task.preExecute(ctx);
WorkReport result = task.execute(ctx);
assertEquals(WorkStatus.COMPLETED, result.getStatus());
verify(service, times(1)).update("theurl", "thestatement");
}

@Test
public void execute() {
WorkContext ctx = createWorkContext(JdbcWorkFlowTask.OperationType.EXECUTE.name());
task.preExecute(ctx);
WorkReport result = task.execute(ctx);
assertEquals(WorkStatus.COMPLETED, result.getStatus());
verify(service, times(1)).execute("theurl", "thestatement");
}

private WorkContext createWorkContext(String operation) {
WorkContext ctx = new WorkContext();
HashMap<String, String> map = new HashMap<>();
map.put("url", "theurl");
if (operation != null) {
map.put("operation", operation);
}
map.put("statement", "thestatement");
map.put("result-ctx-key", "thekey");

WorkContextDelegate.write(ctx, WorkContextDelegate.ProcessType.WORKFLOW_TASK_EXECUTION, task.getName(),
WorkContextDelegate.Resource.ARGUMENTS, map);
return ctx;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.redhat.parodos.examples;

import com.redhat.parodos.tasks.jdbc.JdbcWorkFlowTask;
import com.redhat.parodos.workflow.annotation.Infrastructure;
import com.redhat.parodos.workflows.workflow.SequentialFlow;
import com.redhat.parodos.workflows.workflow.WorkFlow;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Configuration
@Profile("jdbc")
public class JdbcWorkFlowConfiguration {

@Bean
JdbcWorkFlowTask jdbcTask() {
return new JdbcWorkFlowTask();
}

@Bean
@Infrastructure
WorkFlow jdbcWorkFlow(JdbcWorkFlowTask jdbcTask) {
return SequentialFlow.Builder.aNewSequentialFlow().named("jdbcWorkFlow").execute(jdbcTask).build();
}

}
Loading

0 comments on commit 96a2170

Please sign in to comment.