Skip to content

Commit

Permalink
Put subscriptions behind a feature flag (#801)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Henneberger <git@danielhenneberger.com>
  • Loading branch information
henneberger authored Sep 9, 2024
1 parent ad3fb05 commit 98d4ecb
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 1,745 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public GraphQLSchema generate(ExecutionGoal goal) {
GraphQLSchema.Builder builder = GraphQLSchema.newSchema()
.query(queryType);
if (goal != ExecutionGoal.TEST) {
if (logManager.hasLogEngine()) {
if (logManager.hasLogEngine() && System.getenv().get("ENABLE_SUBSCRIPTIONS") != null) {
Optional<GraphQLObjectType.Builder> subscriptions = createSubscriptionTypes(schema);
subscriptions.map(builder::subscription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ LogicalProject(col1=[$4], time=[$2], productid=[$5], discount=[$8], _id=[$0])
LogicalTableScan(table=[[orders_2]])
LogicalTableFunctionScan(invocation=[Orders.entries($cor0.entries)], rowType=[RecordType(BIGINT productid, BIGINT quantity, DOUBLE unit_price, DOUBLE discount)], elementType=[class [Ljava.lang.Object;])

=== kafka.OrderEntry
ID: orderentry_1_1
Type: export
Stage: flink
Inputs: orderentry_1

=== Orders
ID: orders_2
Type: stream
Expand Down Expand Up @@ -61,33 +55,21 @@ LogicalProject(id=[$0], customerid=[$1], time=[$2], entries=[$3], col1=[/(+($0,
"CREATE TEMPORARY TABLE `orders_1` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `entries` ROW(`productid` BIGINT NOT NULL, `quantity` BIGINT NOT NULL, `unit_price` DOUBLE NOT NULL, `discount` DOUBLE) NOT NULL ARRAY NOT NULL,\n PRIMARY KEY (`id`, `time`) NOT ENFORCED,\n WATERMARK FOR `time` AS `time` - INTERVAL '0.001' SECOND\n) WITH (\n 'format' = 'json',\n 'path' = 'file:/mock',\n 'source.monitor-interval' = '10000',\n 'connector' = 'filesystem',\n 'source.path.regex-pattern' = ''\n);",
"CREATE TEMPORARY TABLE `orderentry_1` (\n `col1` BIGINT NOT NULL,\n `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `productid` BIGINT NOT NULL,\n `discount` DOUBLE,\n `_id` BIGINT NOT NULL,\n PRIMARY KEY (`_id`, `time`, `productid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'orderentry_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `orders_2` (\n `id` BIGINT NOT NULL,\n `customerid` BIGINT NOT NULL,\n `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `entries` RAW('com.datasqrl.json.FlinkJsonType', 'ADFjb20uZGF0YXNxcmwuanNvbi5GbGlua0pzb25UeXBlU2VyaWFsaXplclNuYXBzaG90AAAAAQApY29tLmRhdGFzcXJsLmpzb24uRmxpbmtKc29uVHlwZVNlcmlhbGl6ZXI='),\n `col1` BIGINT NOT NULL,\n PRIMARY KEY (`id`, `time`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'orders_2',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `orderentry_1_1` (\n `col1` BIGINT NOT NULL,\n `time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `productid` BIGINT NOT NULL,\n `discount` DOUBLE,\n `_id` BIGINT NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'orderentry_1',\n 'scan.startup.mode' = 'group-offsets'\n);",
"CREATE VIEW `table$1`\nAS\nSELECT `$cor0`.`col1`, `$cor0`.`time`, `t10`.`productid`, `t10`.`discount`, `$cor0`.`id` AS `_id`\nFROM (SELECT `id`, `customerid`, `time`, `entries`, (`id` + `customerid`) / 2 AS `col1`\n FROM `orders_1`) AS `$cor0`,\n UNNEST(`$cor0`.`entries`) AS `t10` (`productid`, `quantity`, `unit_price`, `discount`);",
"CREATE VIEW `table$2`\nAS\nSELECT `id`, `customerid`, `time`, TOJSON(`entries`) AS `entries`, (`id` + `customerid`) / 2 AS `col1`\nFROM `orders_1`;",
"CREATE VIEW `table$3`\nAS\nSELECT `$cor0`.`col1`, `$cor0`.`time`, `t10`.`productid`, `t10`.`discount`, `$cor0`.`id` AS `_id`\nFROM (SELECT `id`, `customerid`, `time`, `entries`, (`id` + `customerid`) / 2 AS `col1`\n FROM `orders_1`) AS `$cor0`,\n UNNEST(`$cor0`.`entries`) AS `t10` (`productid`, `quantity`, `unit_price`, `discount`);",
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `orderentry_1`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `orders_2`\n (SELECT *\n FROM `table$2`)\n ;\n INSERT INTO `orderentry_1_1`\n (SELECT *\n FROM `table$3`)\n ;\n END;"
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `orderentry_1`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `orders_2`\n (SELECT *\n FROM `table$2`)\n ;\n END;"
],
"connectors" : [
"jdbc-sqrl",
"kafka",
"filesystem"
],
"formats" : [
"flexible-json",
"json"
]
}
>>>kafka.json
{
"topics" : [
{
"name" : "orderentry_1",
"numPartitions" : 1,
"replicationFactor" : 1,
"replicasAssignments" : { },
"config" : { }
}
]
"topics" : [ ]
}
>>>postgres.json
{
Expand Down Expand Up @@ -419,18 +401,11 @@ LogicalProject(id=[$0], customerid=[$1], time=[$2], entries=[$3], col1=[/(+($0,
}
],
"mutations" : [ ],
"subscriptions" : [
{
"fieldName" : "OrderEntry",
"topic" : "orderentry_1",
"sinkConfig" : { },
"filters" : { }
}
],
"subscriptions" : [ ],
"schema" : {
"type" : "string",
"type" : "string",
"schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\ntype OrderEntry {\n col1: Float!\n time: DateTime!\n productid: Float!\n discount: Float\n}\n\ntype OrderEntryResult {\n col1: Float!\n time: DateTime!\n productid: Float!\n discount: Float\n}\n\ntype Orders {\n id: Float!\n customerid: Float!\n time: DateTime!\n col1: Float!\n entries(limit: Int = 10, offset: Int = 0): [entries!]\n}\n\ntype Query {\n OrderEntry(time: DateTime, productid: Float, limit: Int = 10, offset: Int = 0): [OrderEntry!]\n Orders(id: Float, time: DateTime, limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\ntype Subscription {\n OrderEntry: OrderEntryResult!\n}\n\ntype entries {\n productid: Float!\n quantity: Float!\n unit_price: Float!\n discount: Float\n}\n"
"schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\ntype OrderEntry {\n col1: Float!\n time: DateTime!\n productid: Float!\n discount: Float\n}\n\ntype Orders {\n id: Float!\n customerid: Float!\n time: DateTime!\n col1: Float!\n entries(limit: Int = 10, offset: Int = 0): [entries!]\n}\n\ntype Query {\n OrderEntry(time: DateTime, productid: Float, limit: Int = 10, offset: Int = 0): [OrderEntry!]\n Orders(id: Float, time: DateTime, limit: Int = 10, offset: Int = 0): [Orders!]\n}\n\ntype entries {\n productid: Float!\n quantity: Float!\n unit_price: Float!\n discount: Float\n}\n"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ Plan:
LogicalFilter(condition=[ARRAY_CONTAINS(ARRAY(CAST('one'):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL, CAST('two'):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL), $1)])
LogicalTableScan(table=[[product_2]])

=== kafka.ProductFilter1
ID: productfilter1_1_1
Type: export
Stage: flink
Inputs: productfilter1_1

=== ProductFilter2
ID: productfilter2_1
Type: stream
Expand All @@ -54,12 +48,6 @@ Plan:
LogicalFilter(condition=[ARRAY_CONTAINS(split('one, two', ','), $1)])
LogicalTableScan(table=[[product_2]])

=== kafka.ProductFilter2
ID: productfilter2_1_2
Type: export
Stage: flink
Inputs: productfilter2_1

>>>flink.json
{
"flinkSql" : [
Expand All @@ -68,43 +56,22 @@ Inputs: productfilter2_1
"CREATE TEMPORARY TABLE `product_2` (\n `productid` BIGINT NOT NULL,\n `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `category` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`productid`, `name`, `description`, `category`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'product_2',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `productfilter1_1` (\n `productid` BIGINT NOT NULL,\n `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `category` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`productid`, `name`, `description`, `category`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'productfilter1_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `productfilter2_1` (\n `productid` BIGINT NOT NULL,\n `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `category` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`productid`, `name`, `description`, `category`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'productfilter2_1',\n 'url' = '${JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `productfilter1_1_1` (\n `productid` BIGINT NOT NULL,\n `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `category` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'productfilter1_1',\n 'scan.startup.mode' = 'group-offsets'\n);",
"CREATE TEMPORARY TABLE `productfilter2_1_2` (\n `productid` BIGINT NOT NULL,\n `name` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `description` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `category` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `_ingest_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'properties.bootstrap.servers' = '${PROPERTIES_BOOTSTRAP_SERVERS}',\n 'properties.auto.offset.reset' = 'earliest',\n 'connector' = 'kafka',\n 'format' = 'flexible-json',\n 'properties.group.id' = '${PROPERTIES_GROUP_ID}',\n 'topic' = 'productfilter2_1',\n 'scan.startup.mode' = 'group-offsets'\n);",
"CREATE VIEW `table$1`\nAS\nSELECT *\nFROM `product_1`;",
"CREATE VIEW `table$2`\nAS\nSELECT *\nFROM `product_1`\nWHERE ARRAY_CONTAINS(ARRAY[CAST('one' AS VARCHAR(65536) CHARACTER SET `UTF-16LE`), CAST('two' AS VARCHAR(65536) CHARACTER SET `UTF-16LE`)], `name`);",
"CREATE VIEW `table$3`\nAS\nSELECT *\nFROM `product_1`\nWHERE ARRAY_CONTAINS(SPLIT('one, two', ','), `name`);",
"CREATE VIEW `table$4`\nAS\nSELECT *\nFROM `product_1`\nWHERE ARRAY_CONTAINS(ARRAY[CAST('one' AS VARCHAR(65536) CHARACTER SET `UTF-16LE`), CAST('two' AS VARCHAR(65536) CHARACTER SET `UTF-16LE`)], `name`);",
"CREATE VIEW `table$5`\nAS\nSELECT *\nFROM `product_1`\nWHERE ARRAY_CONTAINS(SPLIT('one, two', ','), `name`);",
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `product_2`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `productfilter1_1`\n (SELECT *\n FROM `table$2`)\n ;\n INSERT INTO `productfilter2_1`\n (SELECT *\n FROM `table$3`)\n ;\n INSERT INTO `productfilter1_1_1`\n (SELECT *\n FROM `table$4`)\n ;\n INSERT INTO `productfilter2_1_2`\n (SELECT *\n FROM `table$5`)\n ;\n END;"
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `product_2`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `productfilter1_1`\n (SELECT *\n FROM `table$2`)\n ;\n INSERT INTO `productfilter2_1`\n (SELECT *\n FROM `table$3`)\n ;\n END;"
],
"connectors" : [
"jdbc-sqrl",
"kafka",
"filesystem"
],
"formats" : [
"flexible-json",
"json"
]
}
>>>kafka.json
{
"topics" : [
{
"name" : "productfilter1_1",
"numPartitions" : 1,
"replicationFactor" : 1,
"replicasAssignments" : { },
"config" : { }
},
{
"name" : "productfilter2_1",
"numPartitions" : 1,
"replicationFactor" : 1,
"replicasAssignments" : { },
"config" : { }
}
]
"topics" : [ ]
}
>>>postgres.json
{
Expand Down Expand Up @@ -2313,24 +2280,11 @@ Inputs: productfilter2_1
}
],
"mutations" : [ ],
"subscriptions" : [
{
"fieldName" : "ProductFilter1",
"topic" : "productfilter1_1",
"sinkConfig" : { },
"filters" : { }
},
{
"fieldName" : "ProductFilter2",
"topic" : "productfilter2_1",
"sinkConfig" : { },
"filters" : { }
}
],
"subscriptions" : [ ],
"schema" : {
"type" : "string",
"type" : "string",
"schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\ntype Product {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter1 {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter1Result {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter2 {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter2Result {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype Query {\n Product(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [Product!]\n ProductFilter1(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [ProductFilter1!]\n ProductFilter2(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [ProductFilter2!]\n}\n\ntype Subscription {\n ProductFilter1: ProductFilter1Result!\n ProductFilter2: ProductFilter2Result!\n}\n"
"schema" : "\"An RFC-3339 compliant DateTime Scalar\"\nscalar DateTime\n\ntype Product {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter1 {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype ProductFilter2 {\n productid: Float!\n name: String!\n description: String!\n category: String!\n}\n\ntype Query {\n Product(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [Product!]\n ProductFilter1(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [ProductFilter1!]\n ProductFilter2(productid: Float, name: String, description: String, category: String, limit: Int = 10, offset: Int = 0): [ProductFilter2!]\n}\n"
}
}
}
Loading

0 comments on commit 98d4ecb

Please sign in to comment.