Skip to content

Commit

Permalink
[Improvement-16746][seatunnel] pass user defined task parameter to se…
Browse files Browse the repository at this point in the history
…atunnel (#16756)
  • Loading branch information
liunaijie authored Nov 5, 2024
1 parent d19655f commit 8df912a
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
11 changes: 5 additions & 6 deletions docs/docs/en/guide/task/seatunnel.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ Click [here](https://seatunnel.apache.org/) for more information about `Apache S
- SEATUNNEL_ENGINE
- Deployment mode: specify the deployment mode, `cluster` `local`

> Click [here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for more information on the usage of

`Apache SeaTunnel command`
> Click [here](https://seatunnel.apache.org/docs/2.3.3/command/usage) for more information on the usage of Apache SeaTunnel command`
- Custom Configuration: Supports custom configuration or select configuration file from Resource Center

> Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for more information about `Apache
>
>> SeaTunnel config` file
> Click [here](https://seatunnel.apache.org/docs/2.3.3/concept/config) for more information about `Apache SeaTunnel config` file
- Script: Customize configuration information on the task node, including four parts: `env` `source` `transform` `sink`
- Custom Parameters/Global Parameters: When custom parameters/global parameters are defined, the parameters will be passed to the SeaTunnel task, and the parameter value can be dynamically replaced during task execution by referencing the parameter with `${}` in the SeaTunnel task.

> Click [here](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution) for more information on `Apache SeaTunnel variable substitution`
## Task Example

Expand Down
5 changes: 4 additions & 1 deletion docs/docs/zh/guide/task/seatunnel.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
- SEATUNNEL_ENGINE
- 部署方式:指定部署模式,`cluster` `local`

> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/command/usage) 获取更多关于`Apache SeaTunnel command` 使用的信息
- 自定义配置:支持自定义配置或从资源中心选择配置文件

> 点击 [这里](https://seatunnel.apache.org/docs/2.3.3/concept/config) 获取更多关于`Apache SeaTunnel config` 文件介绍
- 脚本:在任务节点那自定义配置信息,包括四部分:`env` `source` `transform` `sink`
- 自定义参数/全局参数: 当定义了自定义参数/全局参数时, 会将该参数传递给SeaTunnel任务, 可以在SeaTunnel任务中通过`${}`引用该参数, 从而在任务运行时动态替换参数值.

> 点击 [这里](https://seatunnel.apache.org/docs/concept/config/#config-variable-substitution) 获取更多关于`Apache SeaTunnel 变量替换` 使用的信息
## 任务样例

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
Expand All @@ -45,6 +46,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -169,9 +171,35 @@ protected List<String> buildOptions() throws Exception {
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(scriptContent, filePath);
args.add(filePath);
args.addAll(generateTaskParameters());
return args;
}

private List<String> generateTaskParameters() {
Map<String, String> variables = new HashMap<>();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
List<Property> propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
if (propertyList != null && !propertyList.isEmpty()) {
for (Property property : propertyList) {
variables.put(property.getProp(), paramsMap.get(property.getProp()).getValue());
}
}
List<Property> localParams = this.seatunnelParameters.getLocalParams();
if (localParams != null && !localParams.isEmpty()) {
for (Property property : localParams) {
if (property.getDirect().equals(Direct.IN)) {
variables.put(property.getProp(), paramsMap.get(property.getProp()).getValue());
}
}
}
List<String> parameters = new ArrayList<>();
variables.forEach((k, v) -> {
parameters.add("-i");
parameters.add(String.format("%s='%s'", k, v));
});
return parameters;
}

private String buildCustomConfigContent() {
log.info("raw custom config content : {}", seatunnelParameters.getRawScript());
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;

import org.apache.commons.io.FileUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -99,6 +104,35 @@ public void testReadConfigFromResourceCenter() throws Exception {
Assertions.assertEquals(expectedCommand, command);
}

@Test
public void testParameterPass() throws Exception {
String taskId = "3456";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setUseCustom(false);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
List<Property> localParam = new ArrayList<>();
Property property = new Property("key1", Direct.IN, DataType.VARCHAR, "value1");
localParam.add(property);
seatunnelParameters.setLocalParams(localParam);
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));

TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
ResourceContext resourceContext = new ResourceContext();
resourceContext.addResourceItem(new ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
taskExecutionContext.setResourceContext(resourceContext);
taskExecutionContext.setPrepareParamsMap(Collections.singletonMap("key1", property));

SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
String command = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand = String.format("--config %s/seatunnel_%s.conf -i key1='value1'", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand, command);
}

private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +
Expand Down

0 comments on commit 8df912a

Please sign in to comment.