diff --git a/src/main/java/fr/insee/trevas/lab/utils/Utils.java b/src/main/java/fr/insee/trevas/lab/utils/Utils.java index 69d199d..5ae2c86 100644 --- a/src/main/java/fr/insee/trevas/lab/utils/Utils.java +++ b/src/main/java/fr/insee/trevas/lab/utils/Utils.java @@ -41,7 +41,14 @@ public static ScriptEngine initEngineWithSpark(Bindings bindings, SparkSession s public static Bindings getBindings(Bindings input) { Bindings output = new SimpleBindings(); input.forEach((k, v) -> { - if (v instanceof PersistentDataset) output.put(k, v); + if (!k.startsWith("$")) { + if (v instanceof PersistentDataset) { + output.put(k + "$PersistentDataset", v); + } else { + output.put(k, v); + } + } + ; }); return output; } @@ -69,17 +76,26 @@ public static SparkConf loadSparkConfig(String stringPath) { public static Bindings getSparkBindings(Bindings input, Integer limit) { Bindings output = new SimpleBindings(); input.forEach((k, v) -> { - if (v instanceof PersistentDataset) { - fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate(); - if (ds instanceof SparkDataset) { - Dataset sparkDs = ((SparkDataset) ds).getSparkDataset(); + if (!k.startsWith("$")) { + if ((v instanceof PersistentDataset) || (v instanceof SparkDataset)) { + String name = k; + SparkDataset spDs = null; + if (v instanceof PersistentDataset) { + fr.insee.vtl.model.Dataset ds = ((PersistentDataset) v).getDelegate(); + name = name + "$PersistentDataset"; + spDs = (SparkDataset) ds; + } + if (v instanceof SparkDataset) { + spDs = (SparkDataset) v; + } + Dataset sparkDs = (spDs).getSparkDataset(); if (limit != null) { SparkDataset sparkDataset = new SparkDataset(sparkDs.limit(limit)); InMemoryDataset im = new InMemoryDataset( sparkDataset.getDataPoints(), sparkDataset.getDataStructure()); - output.put(k, im); - } else output.put(k, new SparkDataset(sparkDs)); // useless + output.put(name, im); + } else output.put(name, new SparkDataset(sparkDs)); // useless } } }); @@ -90,22 +106,32 @@ public static void writeSparkDatasetsJDBC(Bindings bindings, Map queriesForBindingsToSave ) { queriesForBindingsToSave.forEach((name, values) -> { - SparkDataset dataset = (SparkDataset) bindings.get(name); - Dataset dsSpark = dataset.getSparkDataset(); - String jdbcPrefix = ""; - try { - jdbcPrefix = getJDBCPrefix(values.getDbtype()); - } catch (Exception e) { - e.printStackTrace(); + Object ds = bindings.get(name); + if (!(ds instanceof PersistentDataset)) { + try { + throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate(); + if (dataset instanceof SparkDataset) { + String jdbcPrefix = ""; + try { + Dataset dsSpark = ((SparkDataset) dataset).getSparkDataset(); + jdbcPrefix = getJDBCPrefix(values.getDbtype()); + dsSpark.write() + .mode(SaveMode.Overwrite) + .format("jdbc") + .option("url", jdbcPrefix + values.getUrl()) + .option("dbtable", values.getTable()) + .option("user", values.getUser()) + .option("password", values.getPassword()) + .save(); + } catch (Exception e) { + e.printStackTrace(); + } } - dsSpark.write() - .mode(SaveMode.Overwrite) - .format("jdbc") - .option("url", jdbcPrefix + values.getUrl()) - .option("dbtable", values.getTable()) - .option("user", values.getUser()) - .option("password", values.getPassword()) - .save(); }); } @@ -113,11 +139,21 @@ public static void writeSparkS3Datasets(Bindings bindings, Map { - SparkDataset dataset = (SparkDataset) bindings.get(name); - try { - writeSparkDataset(objectMapper, spark, values, dataset); - } catch (Exception e) { - e.printStackTrace(); + Object ds = bindings.get(name); + if (!(ds instanceof PersistentDataset)) { + try { + throw new Exception(name + " is not a Persistent datatset (affect it with \"<-\")"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + fr.insee.vtl.model.Dataset dataset = ((PersistentDataset) bindings.get(name)).getDelegate(); + if (dataset instanceof SparkDataset) { + try { + writeSparkDataset(objectMapper, spark, values, (SparkDataset) dataset); + } catch (Exception e) { + e.printStackTrace(); + } } }); }