Skip to content

Commit

Permalink
validate that minargs is correct
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Sep 30, 2024
1 parent 4e98278 commit 1a87834
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public String triggerArchival(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkCommand.ARCHIVE.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, Integer.toString(minCommits), Integer.toString(maxCommits),
SparkMain.addAppArgs(sparkLauncher, SparkCommand.ARCHIVE, master, sparkMemory, Integer.toString(minCommits), Integer.toString(maxCommits),
Integer.toString(retained), Boolean.toString(enableMetadata), HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ public String bootstrap(

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);

String cmd = SparkCommand.BOOTSTRAP.toString();

sparkLauncher.addAppArgs(cmd, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.BOOTSTRAP, master, sparkMemory, tableName, tableType, targetPath, srcPath, rowKeyField,
partitionPathField, String.valueOf(parallelism), schemaProviderClass, bootstrapIndexClass, selectorClass,
keyGeneratorClass, fullBootstrapInputProvider, payloadClass, String.valueOf(enableHiveSync), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ public String runClean(
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);

String cmd = SparkMain.SparkCommand.CLEAN.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, HoodieCLI.basePath, propsFilePath);
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.CLEAN, master, sparkMemory, HoodieCLI.basePath, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public String scheduleClustering(
// First get a clustering instant time and pass it to spark launcher for scheduling clustering
String clusteringInstantTime = client.createNewInstantTime();

sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.CLUSTERING_SCHEDULE, master, sparkMemory,
HoodieCLI.basePath, client.getTableConfig().getTableName(), clusteringInstantTime, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
Expand Down Expand Up @@ -101,7 +101,7 @@ public String runClustering(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_RUN.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.CLUSTERING_RUN, master, sparkMemory,
HoodieCLI.basePath, client.getTableConfig().getTableName(), clusteringInstantTime,
parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Expand Down Expand Up @@ -138,7 +138,7 @@ public String runClustering(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE, master, sparkMemory,
HoodieCLI.basePath, client.getTableConfig().getTableName(), parallelism, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ public String scheduleCompact(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkCommand.COMPACT_SCHEDULE.toString();
sparkLauncher.addAppArgs(cmd, master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_SCHEDULE, master, sparkMemory, HoodieCLI.basePath,
client.getTableConfig().getTableName(), compactionInstantTime, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
Expand Down Expand Up @@ -256,7 +255,7 @@ public String compact(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_RUN, master, sparkMemory, HoodieCLI.basePath,
client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Expand Down Expand Up @@ -291,7 +290,7 @@ public String compact(
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE, master, sparkMemory, HoodieCLI.basePath,
client.getTableConfig().getTableName(), parallelism, schemaFilePath,
retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Expand Down Expand Up @@ -469,7 +468,7 @@ public String validateCompaction(
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_VALIDATE, master, sparkMemory, HoodieCLI.basePath,
compactionInstant, outputPathStr, parallelism);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down Expand Up @@ -533,7 +532,7 @@ public String unscheduleCompaction(
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_UNSCHEDULE_PLAN, master, sparkMemory, HoodieCLI.basePath,
compactionInstant, outputPathStr, parallelism, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
Expand Down Expand Up @@ -577,7 +576,7 @@ public String unscheduleCompactFile(
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_UNSCHEDULE_FILE, master, sparkMemory, HoodieCLI.basePath,
fileId, partitionPath, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
Expand Down Expand Up @@ -622,7 +621,7 @@ public String repairCompaction(
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(convertJavaPropertiesToScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), master, sparkMemory, HoodieCLI.basePath,
SparkMain.addAppArgs(sparkLauncher, SparkCommand.COMPACT_REPAIR, master, sparkMemory, HoodieCLI.basePath,
compactionInstant, outputPathStr, parallelism, Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public String convert(

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);

String cmd = SparkCommand.IMPORT.toString();
SparkCommand cmd = SparkCommand.IMPORT;
if (useUpsert) {
cmd = SparkCommand.UPSERT.toString();
cmd = SparkCommand.UPSERT;
}

sparkLauncher.addAppArgs(cmd, master, sparkMemory, srcPath, targetPath, tableName, tableType, rowKeyField,
SparkMain.addAppArgs(sparkLauncher, cmd, master, sparkMemory, srcPath, targetPath, tableName, tableType, rowKeyField,
partitionPathField, parallelism, schemaFilePath, retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public String deleteMarker(
help = "Spark executor memory") final String sparkMemory)
throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_MARKER.toString(), master, sparkMemory, instantTime,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.DELETE_MARKER, master, sparkMemory, instantTime,
HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public String deduplicate(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.DEDUPLICATE, master, sparkMemory,
duplicatedPartitionPath, repairedOutputPath, HoodieCLI.basePath, String.valueOf(dryRun), dedupeType);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down Expand Up @@ -301,7 +301,7 @@ public String repairDeprecatePartition(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.REPAIR_DEPRECATED_PARTITION.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.REPAIR_DEPRECATED_PARTITION, master, sparkMemory,
HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down Expand Up @@ -329,7 +329,7 @@ public String renamePartition(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.RENAME_PARTITION.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.RENAME_PARTITION, master, sparkMemory,
HoodieCLI.basePath, oldPartition, newPartition);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public String rollbackCommit(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.ROLLBACK, master, sparkMemory, instantTime,
HoodieCLI.basePath, rollbackUsingMarkers);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String savepoint(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.SAVEPOINT.toString(), master, sparkMemory, commitTime,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.SAVEPOINT, master, sparkMemory, commitTime,
user, comments, HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down Expand Up @@ -114,7 +114,7 @@ public String rollbackToSavepoint(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), master, sparkMemory,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT, master, sparkMemory,
instantTime, HoodieCLI.basePath, lazyFailedWritesCleanPolicy);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down Expand Up @@ -148,7 +148,7 @@ public String deleteSavepoint(
}

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_SAVEPOINT.toString(), master, sparkMemory, instantTime,
SparkMain.addAppArgs(sparkLauncher, SparkMain.SparkCommand.DELETE_SAVEPOINT, master, sparkMemory, instantTime,
HoodieCLI.basePath);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -133,6 +134,13 @@ String getPropsFilePath(String[] args) {
}
}

public static void addAppArgs(SparkLauncher sparkLauncher, SparkMain.SparkCommand cmd, String... args) {
//cmd is going to be the first arg so that is why it is minArgsCount - 1
ValidationUtils.checkArgument(args.length == cmd.minArgsCount - 1, "App args does not match minArgsCount");
sparkLauncher.addAppArgs(cmd.toString());
sparkLauncher.addAppArgs(args);
}

public static void main(String[] args) {
ValidationUtils.checkArgument(args.length >= 4);
final String commandString = args[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public String upgradeHoodieTable(

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String toVersionName = getHoodieTableVersionName(toVersion, true);
sparkLauncher.addAppArgs(SparkCommand.UPGRADE.toString(), master, sparkMemory, HoodieCLI.basePath, toVersionName);
SparkMain.addAppArgs(sparkLauncher, SparkCommand.UPGRADE, master, sparkMemory, HoodieCLI.basePath, toVersionName);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
Expand All @@ -71,7 +71,7 @@ public String downgradeHoodieTable(

SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String toVersionName = getHoodieTableVersionName(toVersion, false);
sparkLauncher.addAppArgs(SparkCommand.DOWNGRADE.toString(), master, sparkMemory, HoodieCLI.basePath, toVersionName);
SparkMain.addAppArgs(sparkLauncher, SparkCommand.DOWNGRADE, master, sparkMemory, HoodieCLI.basePath, toVersionName);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
Expand Down

0 comments on commit 1a87834

Please sign in to comment.