Skip to content

Commit

Permalink
[Fix-16740][SeaTunnel-Task] fix can't submit resource center config f…
Browse files Browse the repository at this point in the history
…ile issue (#16741)
  • Loading branch information
liunaijie authored Oct 31, 2024
1 parent 5f319e5 commit 0719949
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
Expand Down Expand Up @@ -154,26 +155,21 @@ private String buildCommand() throws Exception {

protected List<String> buildOptions() throws Exception {
List<String> args = new ArrayList<>();
args.add(CONFIG_OPTIONS);
String scriptContent;
if (BooleanUtils.isTrue(seatunnelParameters.getUseCustom())) {
args.add(CONFIG_OPTIONS);
args.add(buildCustomConfigCommand());
scriptContent = buildCustomConfigContent();
} else {
seatunnelParameters.getResourceList().forEach(resourceInfo -> {
args.add(CONFIG_OPTIONS);
// TODO: Need further check for refactored resource center
// TODO Currently resourceName is `/xxx.sh`, it has more `/` and needs to be optimized
args.add(resourceInfo.getResourceName().replaceFirst(".*:", ""));
});
String resourceFileName = seatunnelParameters.getResourceList().get(0).getResourceName();
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
scriptContent = FileUtils.readFileToString(
new File(resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
}
return args;
}

protected String buildCustomConfigCommand() throws Exception {
String config = buildCustomConfigContent();
String filePath = buildConfigFilePath();
createConfigFileIfNotExists(config, filePath);

return filePath;
createConfigFileIfNotExists(scriptContent, filePath);
args.add(filePath);
return args;
}

private String buildCustomConfigContent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,91 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.task.seatunnel;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.junit.Test;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;

import org.apache.commons.io.FileUtils;

import java.util.Collections;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public class SeatunnelTaskTest {
private static final String EXECUTE_PATH = "/home";
private static final String TASK_APPID = "9527";

private static final String EXECUTE_PATH = "/tmp";
private static final String RESOURCE_SCRIPT_PATH = "/tmp/demo.conf";

private MockedStatic<FileUtils> mockedStaticFileUtils;

@BeforeEach
public void setUp() {
mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
}

@AfterEach
public void after() {
mockedStaticFileUtils.close();
}

@Test
public void formatDetector() throws Exception{
public void formatDetector() throws Exception {
String taskId = "1234";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setUseCustom(true);
seatunnelParameters.setRawScript(RAW_SCRIPT);

TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(TASK_APPID);
taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));

SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.conf", seatunnelTask.buildCustomConfigCommand());
String command1 = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand1 = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand1, command1);

seatunnelParameters.setRawScript(RAW_SCRIPT_2);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
Assertions.assertEquals("/home/seatunnel_9527.json", seatunnelTask.buildCustomConfigCommand());
String command2 = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand2 = String.format("--config %s/seatunnel_%s.json", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand2, command2);
}

@Test
public void testReadConfigFromResourceCenter() throws Exception {
String taskId = "2345";
SeatunnelParameters seatunnelParameters = new SeatunnelParameters();
seatunnelParameters.setUseCustom(false);
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setResourceName(RESOURCE_SCRIPT_PATH);
seatunnelParameters.setResourceList(Collections.singletonList(resourceInfo));

TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath(EXECUTE_PATH);
taskExecutionContext.setTaskAppId(taskId);
taskExecutionContext.setTaskParams(JSONUtils.toJsonString(seatunnelParameters));
ResourceContext resourceContext = new ResourceContext();
resourceContext.addResourceItem(new ResourceContext.ResourceItem(RESOURCE_SCRIPT_PATH, RESOURCE_SCRIPT_PATH));
taskExecutionContext.setResourceContext(resourceContext);

SeatunnelTask seatunnelTask = new SeatunnelTask(taskExecutionContext);
seatunnelTask.setSeatunnelParameters(seatunnelParameters);
String command = String.join(" ", seatunnelTask.buildOptions());
String expectedCommand = String.format("--config %s/seatunnel_%s.conf", EXECUTE_PATH, taskId);
Assertions.assertEquals(expectedCommand, command);
}

private static final String RAW_SCRIPT = "env {\n" +
" execution.parallelism = 2\n" +
" job.mode = \"BATCH\"\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ export function formatParams(data: INodeData): {
if (data.taskType === 'SEATUNNEL') {
taskParams.startupScript = data.startupScript
taskParams.useCustom = data.useCustom
taskParams.rawScript = data.rawScript
if (!data.useCustom) {
taskParams.rawScript = ''
}
if (data.startupScript?.includes('flink')) {
taskParams.runMode = data.runMode
taskParams.others = data.others
Expand Down

0 comments on commit 0719949

Please sign in to comment.