From 039327578f478ca84314fcf47752c777793abe24 Mon Sep 17 00:00:00 2001 From: Rico Chiu Date: Sun, 19 Mar 2023 23:41:49 -0700 Subject: [PATCH] Fix nullpointerException in distributed cmd Cherry-pick of existing commit. orig-pr: Alluxio/alluxio#16591 orig-commit: Alluxio/alluxio@8da4f5ac1d8367c9e9283fcbcdae74f0f17fd5f0 orig-commit-author: jja725 pr-link: Alluxio/alluxio#17120 change-id: cid-5bc8c07499eb952d1fda6f132a1e3f0d002a018e --- .../fs/command/AbstractDistributedJobCommand.java | 9 ++++----- .../alluxio/cli/fs/command/DistributedCpCommand.java | 8 ++++---- .../alluxio/cli/fs/command/DistributedLoadUtils.java | 12 ++++++------ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/shell/src/main/java/alluxio/cli/fs/command/AbstractDistributedJobCommand.java b/shell/src/main/java/alluxio/cli/fs/command/AbstractDistributedJobCommand.java index 21c87f39912f..1148fca13831 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/AbstractDistributedJobCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/AbstractDistributedJobCommand.java @@ -16,6 +16,7 @@ import alluxio.cli.util.DistributedCommandUtil; import alluxio.client.file.FileSystemContext; import alluxio.client.job.JobMasterClient; +import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.job.CmdConfig; import alluxio.job.wire.Status; import alluxio.util.CommonUtils; @@ -83,14 +84,12 @@ protected void drain() { } } - protected Long submit(CmdConfig cmdConfig) { - Long jobControlId = null; + protected long submit(CmdConfig cmdConfig) { try { - jobControlId = mClient.submit(cmdConfig); + return mClient.submit(cmdConfig); } catch (IOException e) { - e.printStackTrace(); + throw AlluxioRuntimeException.from(e); } - return jobControlId; } /** diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedCpCommand.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedCpCommand.java index f2c3e5be12ab..76d8259605e7 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedCpCommand.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedCpCommand.java @@ -138,15 +138,15 @@ public int run(CommandLine cl) throws AlluxioException, IOException { int batchSize = FileSystemShellUtils.getIntArg(cl, BATCH_SIZE_OPTION, defaultBatchSize); System.out.println("Please wait for command submission to finish.."); - Long jobControlId = distributedCp(srcPath, dstPath, overwrite, batchSize); + long jobControlId = distributedCp(srcPath, dstPath, overwrite, batchSize); if (!async) { System.out.format("Submitted successfully, jobControlId = %s%n" - + "Waiting for the command to finish ...%n", jobControlId.toString()); + + "Waiting for the command to finish ...%n", jobControlId); waitForCmd(jobControlId); postProcessing(jobControlId); } else { System.out.format("Submitted migrate job successfully, jobControlId = %s%n", - jobControlId.toString()); + jobControlId); } Set failures = getFailedFiles(); @@ -157,7 +157,7 @@ public int run(CommandLine cl) throws AlluxioException, IOException { return 0; } - private Long distributedCp(AlluxioURI srcPath, AlluxioURI dstPath, + private long distributedCp(AlluxioURI srcPath, AlluxioURI dstPath, boolean overwrite, int batchSize) { CmdConfig cmdConfig = new MigrateCliConfig(srcPath.getPath(), dstPath.getPath(), mWriteType, overwrite, batchSize); diff --git a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadUtils.java b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadUtils.java index 0caadfe0a458..2bc5cdb0a17d 100644 --- a/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadUtils.java +++ b/shell/src/main/java/alluxio/cli/fs/command/DistributedLoadUtils.java @@ -331,19 +331,19 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command System.out.println("Please wait for command submission to finish.."); - Long jobControlId; + long jobControlId; if (!cl.hasOption(INDEX_FILE.getLongOpt())) { AlluxioURI path = new AlluxioURI(pathStr); jobControlId = DistributedLoadUtils.runDistLoad(command, path, replication, batchSize, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache); if (!async) { System.out.format("Submitted successfully, jobControlId = %s%n" - + "Waiting for the command to finish ...%n", jobControlId.toString()); + + "Waiting for the command to finish ...%n", jobControlId); command.waitForCmd(jobControlId); command.postProcessing(jobControlId); } else { System.out.format("Submitted distLoad job successfully, jobControlId = %s%n", - jobControlId.toString()); + jobControlId); } } else { try (BufferedReader reader = new BufferedReader(new FileReader(pathStr))) { @@ -353,12 +353,12 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache); if (!async) { System.out.format("Submitted successfully, jobControlId = %s%n" - + "Waiting for the command to finish ...%n", jobControlId.toString()); + + "Waiting for the command to finish ...%n", jobControlId); command.waitForCmd(jobControlId); command.postProcessing(jobControlId); } else { System.out.format("Submitted distLoad job successfully, jobControlId = %s%n", - jobControlId.toString()); + jobControlId); } } } @@ -384,7 +384,7 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command * @param directCache use direct cache request or cache through read * @return job Control ID */ - public static Long runDistLoad(AbstractDistributedJobCommand command, AlluxioURI filePath, + public static long runDistLoad(AbstractDistributedJobCommand command, AlluxioURI filePath, int replication, int batchSize, Set workerSet, Set excludedWorkerSet, Set localityIds, Set excludedLocalityIds, boolean directCache) {