Skip to content

Commit

Permalink
Fix nullpointerException in distributed cmd
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: Alluxio#16591
orig-commit: Alluxio/alluxio@8da4f5a
orig-commit-author: jja725 <jja725@gmail.com>

pr-link: Alluxio#17120
change-id: cid-5bc8c07499eb952d1fda6f132a1e3f0d002a018e
  • Loading branch information
Xenorith authored Mar 20, 2023
1 parent cbd6314 commit 0393275
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.cli.util.DistributedCommandUtil;
import alluxio.client.file.FileSystemContext;
import alluxio.client.job.JobMasterClient;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.job.CmdConfig;
import alluxio.job.wire.Status;
import alluxio.util.CommonUtils;
Expand Down Expand Up @@ -83,14 +84,12 @@ protected void drain() {
}
}

protected Long submit(CmdConfig cmdConfig) {
Long jobControlId = null;
protected long submit(CmdConfig cmdConfig) {
try {
jobControlId = mClient.submit(cmdConfig);
return mClient.submit(cmdConfig);
} catch (IOException e) {
e.printStackTrace();
throw AlluxioRuntimeException.from(e);
}
return jobControlId;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
int batchSize = FileSystemShellUtils.getIntArg(cl, BATCH_SIZE_OPTION, defaultBatchSize);
System.out.println("Please wait for command submission to finish..");

Long jobControlId = distributedCp(srcPath, dstPath, overwrite, batchSize);
long jobControlId = distributedCp(srcPath, dstPath, overwrite, batchSize);
if (!async) {
System.out.format("Submitted successfully, jobControlId = %s%n"
+ "Waiting for the command to finish ...%n", jobControlId.toString());
+ "Waiting for the command to finish ...%n", jobControlId);
waitForCmd(jobControlId);
postProcessing(jobControlId);
} else {
System.out.format("Submitted migrate job successfully, jobControlId = %s%n",
jobControlId.toString());
jobControlId);
}

Set<String> failures = getFailedFiles();
Expand All @@ -157,7 +157,7 @@ public int run(CommandLine cl) throws AlluxioException, IOException {
return 0;
}

private Long distributedCp(AlluxioURI srcPath, AlluxioURI dstPath,
private long distributedCp(AlluxioURI srcPath, AlluxioURI dstPath,
boolean overwrite, int batchSize) {
CmdConfig cmdConfig = new MigrateCliConfig(srcPath.getPath(),
dstPath.getPath(), mWriteType, overwrite, batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,19 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command

System.out.println("Please wait for command submission to finish..");

Long jobControlId;
long jobControlId;
if (!cl.hasOption(INDEX_FILE.getLongOpt())) {
AlluxioURI path = new AlluxioURI(pathStr);
jobControlId = DistributedLoadUtils.runDistLoad(command, path, replication, batchSize,
workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache);
if (!async) {
System.out.format("Submitted successfully, jobControlId = %s%n"
+ "Waiting for the command to finish ...%n", jobControlId.toString());
+ "Waiting for the command to finish ...%n", jobControlId);
command.waitForCmd(jobControlId);
command.postProcessing(jobControlId);
} else {
System.out.format("Submitted distLoad job successfully, jobControlId = %s%n",
jobControlId.toString());
jobControlId);
}
} else {
try (BufferedReader reader = new BufferedReader(new FileReader(pathStr))) {
Expand All @@ -353,12 +353,12 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command
workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache);
if (!async) {
System.out.format("Submitted successfully, jobControlId = %s%n"
+ "Waiting for the command to finish ...%n", jobControlId.toString());
+ "Waiting for the command to finish ...%n", jobControlId);
command.waitForCmd(jobControlId);
command.postProcessing(jobControlId);
} else {
System.out.format("Submitted distLoad job successfully, jobControlId = %s%n",
jobControlId.toString());
jobControlId);
}
}
}
Expand All @@ -384,7 +384,7 @@ public static int distributedLoad(AbstractDistributedJobCommand command, Command
* @param directCache use direct cache request or cache through read
* @return job Control ID
*/
public static Long runDistLoad(AbstractDistributedJobCommand command, AlluxioURI filePath,
public static long runDistLoad(AbstractDistributedJobCommand command, AlluxioURI filePath,
int replication, int batchSize, Set<String> workerSet,
Set<String> excludedWorkerSet, Set<String> localityIds,
Set<String> excludedLocalityIds, boolean directCache) {
Expand Down

0 comments on commit 0393275

Please sign in to comment.