diff --git a/.github/workflows/build-config.yml b/.github/workflows/build-config.yml index 990dd7b33..b05a6e8b4 100644 --- a/.github/workflows/build-config.yml +++ b/.github/workflows/build-config.yml @@ -33,14 +33,14 @@ jobs: - name: Run Docker container and execute tests working-directory: ./sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/conference run: | - docker run -i -p 8888:8888 -p 8081:8081 --rm -v $PWD:/build sqrl-test test conference.sqrl conference.graphqls --snapshot snapshots-conference --tests tests-conference + docker run -i --rm -v $PWD:/build sqrl-test test conference.sqrl conference.graphqls --snapshot snapshots-conference --tests tests-conference + continue-on-error: false + + - name: Test UDF + working-directory: ./sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf + run: | + docker run -i --rm -v $PWD:/build sqrl-test test myudf.sqrl --snapshot snapshots-myudf --tests tests-myudf continue-on-error: false -# -# - name: Run Docker container and execute tests -# working-directory: ./sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/duckdb -# run: | -# docker run -i -p 8888:8888 -p 8081:8081 --rm -v $PWD:/build sqrl-test test duckdb.sqrl --snapshot snapshots-duckdb --tests tests-duckdb -# continue-on-error: false - name: Check Docker return code run: | diff --git a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java b/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java index a3ed7c3ac..013ea5cc2 100644 --- a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java +++ b/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/FullUsecasesIT.java @@ -161,9 +161,9 @@ public void testUseCase(UseCaseTestParameter param) { Map env = new HashMap<>(); env.putAll(System.getenv()); - env.put("EXECUTION_MODE", "local"); env.putAll(containerHook.getEnv()); env.put("DATA_PATH", rootDir.resolve("build/deploy/flink/data").toAbsolutePath().toString()); + env.put("UDF_PATH", rootDir.resolve("build/deploy/flink/lib").toAbsolutePath().toString()); //Run the test TestEnvContext context = TestEnvContext.builder() @@ -229,7 +229,7 @@ public void testUseCase(UseCaseTestParameter param) { @MethodSource("useCaseProvider") @Disabled public void runTestNumber(UseCaseTestParameter param) { - int i = -1; + int i = 31; testNo++; System.out.println(testNo + ":" + param); if (i == testNo) { diff --git a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/UdfIT.java b/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/UdfIT.java deleted file mode 100644 index 5138ed59a..000000000 --- a/sqrl-testing/sqrl-integration-tests/src/test/java/com/datasqrl/UdfIT.java +++ /dev/null @@ -1,195 +0,0 @@ -package com.datasqrl; - -import static org.junit.jupiter.api.Assertions.fail; - -import com.datasqrl.cmd.RootCommand; -import com.datasqrl.cmd.StatusHook; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.BufferedReader; -import java.io.File; -import java.io.InputStreamReader; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.file.Path; -import java.util.Map; -import lombok.SneakyThrows; -import org.apache.flink.table.api.TableResult; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.redpanda.RedpandaContainer; -import org.testcontainers.utility.DockerImageName; - -/** - * Requires an externally running flink cluster to submit - */ -@Testcontainers -@Disabled //todo add remote flink cluster -public class UdfIT { - @Container - private PostgreSQLContainer testDatabase = - new PostgreSQLContainer(DockerImageName.parse("ankane/pgvector:v0.5.0") - .asCompatibleSubstituteFor("postgres")) - .withDatabaseName("foo") - .withUsername("foo") - .withPassword("secret") - .withDatabaseName("datasqrl"); - - @Container - RedpandaContainer container = - new RedpandaContainer("docker.redpanda.com/redpandadata/redpanda:v23.1.2"); - - private static final String GRAPHQL_ENDPOINT = "http://localhost:8888/graphql"; - - //These tests don't use any external sources/sink. Only create table statements and export to log. - @SneakyThrows - @Test - public void test() { - buildUdf(); - Path path = Path.of( - "/Users/henneberger/sqrl/sqrl-testing/sqrl-integration-tests/src/test/resources/udf"); - - execute(path, StatusHook.NONE,"compile", "myudf.sqrl"); - Map env = Map.of( - "JDBC_URL", testDatabase.getJdbcUrl(), - "PGHOST", testDatabase.getHost(), - "PGUSER", testDatabase.getUsername(), - "JDBC_USERNAME", testDatabase.getUsername(), - "JDBC_PASSWORD", testDatabase.getPassword(), - "PGPORT", testDatabase.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT).toString(), - "PGPASSWORD", testDatabase.getPassword(), - "PGDATABASE", testDatabase.getDatabaseName(), - "UDF_JAR_DIR", - "/Users/henneberger/sqrl/sqrl-testing/sqrl-integration-tests/src/test/resources/udf/build/deploy/flink/lib", - "PROPERTIES_BOOTSTRAP_SERVERS", container.getBootstrapServers() - ); - DatasqrlRun run = new DatasqrlRun(path.resolve("build").resolve("plan"), - env); - TableResult run1 = run.run(false); - - int count = 10; -// postGraphQLMutations(count); - getGraphqlQuery(); - run.stop(); - } - - @SneakyThrows - private void postGraphQLMutations(int count) { - HttpClient client = HttpClient.newHttpClient(); - - for (int i = 1; i <= count; i++) { - String mutation = String.format( - "{\"query\":\"mutation MyTable($event: MyTableInput!) { MyTable(event: $event) { id } }\",\"variables\":{\"event\":{\"id\":%d}}}", - i); - - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(GRAPHQL_ENDPOINT)) - .header("Content-Type", "application/json") - .POST(HttpRequest.BodyPublishers.ofString(mutation)) - .build(); - - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - - if (response.statusCode() != 200) { - fail("Failed to post GraphQL mutation: " + response.body()); - } - - System.out.println("Posted GraphQL mutation with id " + i + ": " + response.body()); - } - } - - @SneakyThrows - private void getGraphqlQuery() { - HttpClient client = HttpClient.newHttpClient(); - String query = "query {\n" - + " MyTable {\n" - + " val\n" - + " }\n" - + "}"; - - long startTime = System.currentTimeMillis(); - long timeout = 10000; // 10 seconds timeout - int expectedRecordCount = 10; - int recordCount = 0; - - while (System.currentTimeMillis() - startTime < timeout) { - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(GRAPHQL_ENDPOINT)) - .header("Content-Type", "application/graphql") - .POST(HttpRequest.BodyPublishers.ofString(query)) - .build(); - - HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); - - if (response.statusCode() != 200) { - fail("Failed to post GraphQL query: " + response.body()); - } - - // Parse the response body as JSON - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonResponse = mapper.readTree(response.body()); - - // Navigate through the JSON to get the 'MyTable' data - JsonNode myTableData = jsonResponse.path("data").path("MyTable"); - - if (!myTableData.isArray()) { - continue; - } - - // Check the number of records - recordCount = myTableData.size(); - if (recordCount >= expectedRecordCount) { - System.out.println("Successfully retrieved " + recordCount + " records."); - break; // Exit loop once 10 records are found - } - - // Wait for 1 second before the next request - Thread.sleep(1000); - } - - if (recordCount < expectedRecordCount) { - fail("Failed to retrieve 10 records within the timeout period. Only got " + recordCount); - } - } - - public void buildUdf() { - File projectDir = new File("/Users/henneberger/sqrl/sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myjavafunction"); // Update with your project path - try { - // Create a process builder to run 'gradle build' - ProcessBuilder processBuilder = new ProcessBuilder("gradle", "shadowJar"); - - // Set the working directory to the project root - processBuilder.directory(projectDir); // Update with your project path - - // Start the process - Process process = processBuilder.start(); - - // Capture the output - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - String line; - while ((line = reader.readLine()) != null) { - System.out.println(line); - } - - // Wait for the process to finish - int exitCode = process.waitFor(); - System.out.println("\nBuild finished with exit code: " + exitCode); - } catch (Exception e) { - e.printStackTrace(); - } - } - public static int execute(Path rootDir, StatusHook hook, String... args) { - RootCommand rootCommand = new RootCommand(rootDir, hook); - int exitCode = rootCommand.getCmd().execute(args) + (hook.isSuccess() ? 0 : 1); - if (exitCode != 0) { - fail(); - } - return exitCode; - } - -} diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/myudf--.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/myudf--.txt new file mode 100644 index 000000000..d4ceeefaa --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/com/datasqrl/UseCaseCompileTest/myudf--.txt @@ -0,0 +1,125 @@ +>>>pipeline_explain.txt +=== MyTable +ID: mytable_1 +Type: state +Stage: flink +Primary Key: val +Timestamp : - +Schema: + - val: INTEGER NOT NULL + - myFnc: BIGINT +Plan: +LogicalProject(val=[$0], myFnc=[MyScalarFunction(CAST($0):BIGINT, CAST($0):BIGINT)]) + LogicalValues(tuples=[[{ 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 6 }, { 7 }, { 8 }, { 9 }, { 10 }]]) + +>>>flink.json +{ + "flinkSql" : [ + "CREATE TEMPORARY FUNCTION IF NOT EXISTS `myscalarfunction` AS 'com.myudf.MyScalarFunction' LANGUAGE JAVA;", + "CREATE TEMPORARY TABLE `mytable_1` (\n `val` INTEGER NOT NULL,\n `myFnc` BIGINT,\n PRIMARY KEY (`val`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'mytable_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);", + "CREATE VIEW `table$1`\nAS\nSELECT `val`, MYSCALARFUNCTION(CAST(`val` AS BIGINT), CAST(`val` AS BIGINT)) AS `myFnc`\nFROM (VALUES (1),\n (2),\n (3),\n (4),\n (5),\n (6),\n (7),\n (8),\n (9),\n (10)) AS `t` (`val`);", + "EXECUTE STATEMENT SET BEGIN\nINSERT INTO `mytable_1`\n(SELECT *\n FROM `table$1`)\n;\nEND;" + ], + "connectors" : [ + "jdbc-sqrl" + ], + "formats" : [ ] +} +>>>kafka.json +{ + "topics" : [ ] +} +>>>postgres.json +{ + "ddl" : [ + { + "name" : "mytable_1", + "columns" : [ + "\"val\" INTEGER NOT NULL", + "\"myFnc\" BIGINT " + ], + "primaryKeys" : [ + "\"val\"" + ], + "sql" : "CREATE TABLE IF NOT EXISTS mytable_1 (\"val\" INTEGER NOT NULL,\"myFnc\" BIGINT , PRIMARY KEY (\"val\"));" + } + ], + "views" : [ + { + "name" : "MyTable", + "sql" : "CREATE OR REPLACE VIEW \"MyTable\"(\"val\", \"myFnc\") AS SELECT *\nFROM \"mytable_1\"\nORDER BY \"val\";" + } + ] +} +>>>vertx.json +{ + "model" : { + "coords" : [ + { + "type" : "args", + "parentType" : "Query", + "fieldName" : "MyTable", + "matchs" : [ + { + "arguments" : [ + { + "type" : "variable", + "type" : "variable", + "path" : "val" + }, + { + "type" : "variable", + "type" : "variable", + "path" : "limit" + }, + { + "type" : "variable", + "type" : "variable", + "path" : "offset" + } + ], + "query" : { + "type" : "PagedJdbcQuery", + "type" : "PagedJdbcQuery", + "sql" : "SELECT *\nFROM \"mytable_1\"\nWHERE \"val\" = $1", + "parameters" : [ + { + "type" : "arg", + "type" : "arg", + "path" : "val" + } + ] + } + }, + { + "arguments" : [ + { + "type" : "variable", + "type" : "variable", + "path" : "limit" + }, + { + "type" : "variable", + "type" : "variable", + "path" : "offset" + } + ], + "query" : { + "type" : "PagedJdbcQuery", + "type" : "PagedJdbcQuery", + "sql" : "SELECT *\nFROM \"mytable_1\"\nORDER BY \"val\"", + "parameters" : [ ] + } + } + ] + } + ], + "mutations" : [ ], + "subscriptions" : [ ], + "schema" : { + "type" : "string", + "type" : "string", + "schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\ntype MyTable {\n val: Int!\n myFnc: Float\n}\n\ntype Query {\n MyTable(val: Int, limit: Int = 10, offset: Int = 0): [MyTable!]\n}\n" + } + } +} diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/myudf/myudf.txt b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/myudf/myudf.txt new file mode 100644 index 000000000..101fa224a --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/snapshots/myudf/myudf.txt @@ -0,0 +1,2 @@ +>>>MyTable.graphql +{"data":{"MyTable":[{"val":1,"myFnc":2.0},{"val":2,"myFnc":4.0},{"val":3,"myFnc":6.0},{"val":4,"myFnc":8.0},{"val":5,"myFnc":10.0},{"val":6,"myFnc":12.0},{"val":7,"myFnc":14.0},{"val":8,"myFnc":16.0},{"val":9,"myFnc":18.0},{"val":10,"myFnc":20.0}]}} diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myjavafunction/build.gradle b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/build.gradle similarity index 100% rename from sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myjavafunction/build.gradle rename to sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/build.gradle diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/build/libs/myjavafunction-0.1.0-SNAPSHOT-all.jar b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/build/libs/myjavafunction-0.1.0-SNAPSHOT-all.jar new file mode 100644 index 000000000..a4c01763a Binary files /dev/null and b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/build/libs/myjavafunction-0.1.0-SNAPSHOT-all.jar differ diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myjavafunction/src/main/java/com/myudf/MyScalarFunction.java b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/src/main/java/com/myudf/MyScalarFunction.java similarity index 100% rename from sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myjavafunction/src/main/java/com/myudf/MyScalarFunction.java rename to sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myjavafunction/src/main/java/com/myudf/MyScalarFunction.java diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myudf.sqrl b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myudf.sqrl similarity index 100% rename from sqrl-testing/sqrl-integration-tests/src/test/resources/udf/myudf.sqrl rename to sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/myudf.sqrl diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/snapshots-myudf/MyTable.snapshot b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/snapshots-myudf/MyTable.snapshot new file mode 100644 index 000000000..0d33da274 --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/snapshots-myudf/MyTable.snapshot @@ -0,0 +1 @@ +{"data":{"MyTable":[{"val":1,"myFnc":2.0},{"val":2,"myFnc":4.0},{"val":3,"myFnc":6.0},{"val":4,"myFnc":8.0},{"val":5,"myFnc":10.0},{"val":6,"myFnc":12.0},{"val":7,"myFnc":14.0},{"val":8,"myFnc":16.0},{"val":9,"myFnc":18.0},{"val":10,"myFnc":20.0}]}} \ No newline at end of file diff --git a/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/tests-myudf/MyTable.graphql b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/tests-myudf/MyTable.graphql new file mode 100644 index 000000000..5a03643cc --- /dev/null +++ b/sqrl-testing/sqrl-integration-tests/src/test/resources/usecases/udf/tests-myudf/MyTable.graphql @@ -0,0 +1,6 @@ +query MyTable { + MyTable { + val + myFnc + } +} \ No newline at end of file diff --git a/sqrl-tools/dockerrun.sh b/sqrl-tools/dockerrun.sh index cc368ff78..7782c47b5 100755 --- a/sqrl-tools/dockerrun.sh +++ b/sqrl-tools/dockerrun.sh @@ -4,6 +4,7 @@ cd /build # Todo: there is a target flag we need to parse and set export DATA_PATH=/build/build/deploy/flink/data +export UDF_PATH=/build/build/deploy/flink/lib echo 'Compiling...this takes about 10 seconds' java -jar /opt/sqrl/sqrl-cli.jar ${@} diff --git a/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java b/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java index 226c012bd..40b3d079c 100644 --- a/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java +++ b/sqrl-tools/sqrl-run/src/main/java/com/datasqrl/DatasqrlRun.java @@ -20,6 +20,7 @@ import io.vertx.core.json.JsonObject; import io.vertx.micrometer.MicrometerMetricsOptions; import java.net.URL; +import java.net.URLClassLoader; import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; @@ -32,6 +33,7 @@ import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.JobStatus; @@ -149,6 +151,36 @@ public CompiledPlan compileFlink() { config.putIfAbsent("table.exec.resource.default-parallelism", "1"); config.putIfAbsent("rest.address", "localhost"); config.putIfAbsent("rest.port", "8081"); + config.putIfAbsent("execution.target", "local"); //mini cluster + config.putIfAbsent("execution.attached", "true"); //mini cluster + + String udfPath = getenv("UDF_PATH"); + List jarUrls = new ArrayList<>(); + if (udfPath != null) { + Path udfDir = Path.of(udfPath); + if (udfDir.toFile().exists() && udfDir.toFile().isDirectory()) { + // Iterate over all files in the directory and add JARs to the list + try (var stream = java.nio.file.Files.list(udfDir)) { + stream.filter(file -> file.toString().endsWith(".jar")) + .forEach(file -> { + try { + jarUrls.add(file.toUri().toURL()); + } catch (Exception e) { + log.error("Error adding JAR to classpath: " + file, e); + } + }); + } + } else { +// throw new RuntimeException("UDF_PATH is not a valid directory: " + udfPath); + } + } + + // Add UDF JARs to classpath + URL[] urlArray = jarUrls.toArray(new URL[0]); + ClassLoader udfClassLoader = new URLClassLoader(urlArray, getClass().getClassLoader()); + + config.putIfAbsent("pipeline.classpaths", jarUrls.stream().map(URL::toString) + .collect(Collectors.joining(","))); //Exposed for tests if (env.get("FLINK_RESTART_STRATEGY") != null) { @@ -159,17 +191,17 @@ public CompiledPlan compileFlink() { Configuration configuration = Configuration.fromMap(config); - StreamExecutionEnvironment sEnv; try { - sEnv = StreamExecutionEnvironment.getExecutionEnvironment(configuration); + sEnv = new StreamExecutionEnvironment(configuration, udfClassLoader); } catch (Exception e) { - sEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); + throw e; } EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance() .withConfiguration(configuration) + .withClassLoader(udfClassLoader) .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, tEnvConfig);