Skip to content

Commit

Permalink
Merge pull request #64 from InseeFrLab/0.5.0
Browse files Browse the repository at this point in the history
0.5.0
  • Loading branch information
NicoLaval authored Jan 24, 2024
2 parents ed68af3 + c707e23 commit 61768db
Showing 1 changed file with 63 additions and 27 deletions.
90 changes: 63 additions & 27 deletions src/main/java/fr/insee/trevas/lab/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public static ScriptEngine initEngineWithSpark(Bindings bindings, SparkSession s
public static Bindings getBindings(Bindings input) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) output.put(k, v);
if (!k.startsWith("$")) {
if (v instanceof PersistentDataset) {
output.put(k + "$PersistentDataset", v);
} else {
output.put(k, v);
}
}
;
});
return output;
}
Expand Down Expand Up @@ -69,17 +76,26 @@ public static SparkConf loadSparkConfig(String stringPath) {
public static Bindings getSparkBindings(Bindings input, Integer limit) {
Bindings output = new SimpleBindings();
input.forEach((k, v) -> {
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
if (ds instanceof SparkDataset) {
Dataset<Row> sparkDs = ((SparkDataset) ds).getSparkDataset();
if (!k.startsWith("$")) {
if ((v instanceof PersistentDataset) || (v instanceof SparkDataset)) {
String name = k;
SparkDataset spDs = null;
if (v instanceof PersistentDataset) {
fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate();
name = name + "$PersistentDataset";
spDs = (SparkDataset) ds;
}
if (v instanceof SparkDataset) {
spDs = (SparkDataset) v;
}
Dataset<Row> sparkDs = (spDs).getSparkDataset();
if (limit != null) {
SparkDataset sparkDataset = new SparkDataset(sparkDs.limit(limit));
InMemoryDataset im = new InMemoryDataset(
sparkDataset.getDataPoints(),
sparkDataset.getDataStructure());
output.put(k, im);
} else output.put(k, new SparkDataset(sparkDs)); // useless
output.put(name, im);
} else output.put(name, new SparkDataset(sparkDs)); // useless
}
}
});
Expand All @@ -90,34 +106,54 @@ public static void writeSparkDatasetsJDBC(Bindings bindings,
Map<String, QueriesForBindingsToSave> queriesForBindingsToSave
) {
queriesForBindingsToSave.forEach((name, values) -> {
SparkDataset dataset = (SparkDataset) bindings.get(name);
Dataset<Row> dsSpark = dataset.getSparkDataset();
String jdbcPrefix = "";
try {
jdbcPrefix = getJDBCPrefix(values.getDbtype());
} catch (Exception e) {
e.printStackTrace();
Object ds = bindings.get(name);
if (!(ds instanceof PersistentDataset)) {
try {
throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate();
if (dataset instanceof SparkDataset) {
String jdbcPrefix = "";
try {
Dataset<Row> dsSpark = ((SparkDataset) dataset).getSparkDataset();
jdbcPrefix = getJDBCPrefix(values.getDbtype());
dsSpark.write()
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", jdbcPrefix + values.getUrl())
.option("dbtable", values.getTable())
.option("user", values.getUser())
.option("password", values.getPassword())
.save();
} catch (Exception e) {
e.printStackTrace();
}
}
dsSpark.write()
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", jdbcPrefix + values.getUrl())
.option("dbtable", values.getTable())
.option("user", values.getUser())
.option("password", values.getPassword())
.save();
});
}

public static void writeSparkS3Datasets(Bindings bindings, Map<String, S3ForBindings> s3toSave,
ObjectMapper objectMapper,
SparkSession spark) {
s3toSave.forEach((name, values) -> {
SparkDataset dataset = (SparkDataset) bindings.get(name);
try {
writeSparkDataset(objectMapper, spark, values, dataset);
} catch (Exception e) {
e.printStackTrace();
Object ds = bindings.get(name);
if (!(ds instanceof PersistentDataset)) {
try {
throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate();
if (dataset instanceof SparkDataset) {
try {
writeSparkDataset(objectMapper, spark, values, (SparkDataset) dataset);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Expand Down

0 comments on commit 61768db

Please sign in to comment.