Skip to content

Commit

Permalink
Add support to import from and export to the log engine (#747)
Browse files Browse the repository at this point in the history
* Adding feature "Import from log engine" and corresponding use-case test

* Creating log sink/table when exporting to a table that doesn't exists yet

* rearrange tests

* updating tests after rebase
  • Loading branch information
nandorsoma authored Aug 29, 2024
1 parent cb309bb commit 9253d96
Show file tree
Hide file tree
Showing 18 changed files with 476 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,24 +210,30 @@ public Void visit(SqrlImportDefinition node, Void context) {
}

NamePath path = nameUtil.toNamePath(node.getImportPath().names);
SqrlModule module = moduleLoader.getModule(path.popLast()).orElse(null);

if (module == null) {
throw addError(ErrorCode.GENERIC, node, "Could not find module [%s] at path: [%s]",
path, String.join("/", path.toStringList()));
}
if (path.getFirst().getDisplay().equals("log")) {
Log log = logManager.getLogs().get(path.getLast().getDisplay());
NamespaceObject namespaceObject = createTableResolver.create(log.getSource());
namespaceObject.apply(this, node.getAlias().map(SqlIdentifier::getSimple), framework, errorCollector);
} else {
SqrlModule module = moduleLoader.getModule(path.popLast()).orElse(null);

if (node.getImportPath().isStar()) {
if (module.getNamespaceObjects().isEmpty()) {
addWarn(ErrorLabel.GENERIC, node, "Module is empty: %s", path);
if (module == null) {
throw addError(ErrorCode.GENERIC, node, "Could not find module [%s] at path: [%s]",
path, String.join("/", path.toStringList()));
}

module.getNamespaceObjects().forEach(obj -> obj.apply(this, Optional.empty(), framework, errorCollector));
} else {
Optional<NamespaceObject> namespaceObject = module.getNamespaceObject(path.getLast());
namespaceObject.map(object -> object.apply(this, Optional.of(node.getAlias().map(a -> a.names.get(0))
.orElse(/*retain alias*/path.getLast().getDisplay())), framework, errorCollector))
.orElseThrow(() -> addError(ErrorCode.GENERIC, node, "Object [%s] not found in module: %s", path.getLast(), path));
if (node.getImportPath().isStar()) {
if (module.getNamespaceObjects().isEmpty()) {
addWarn(ErrorLabel.GENERIC, node, "Module is empty: %s", path);
}

module.getNamespaceObjects().forEach(obj -> obj.apply(this, Optional.empty(), framework, errorCollector));
} else {
Optional<NamespaceObject> namespaceObject = module.getNamespaceObject(path.getLast());
namespaceObject.map(object -> object.apply(this, Optional.of(node.getAlias().map(a -> a.names.get(0))
.orElse(/*retain alias*/path.getLast().getDisplay())), framework, errorCollector))
.orElseThrow(() -> addError(ErrorCode.GENERIC, node, "Object [%s] not found in module: %s", path.getLast(), path));
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
>>>pipeline_explain.txt
=== Data
ID: data_2
Type: stream
Stage: streams
Primary Key: ID
Timestamp : TIMESTAMP
Schema:
- _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- ID: BIGINT NOT NULL
- EPOCH_TIMESTAMP: BIGINT NOT NULL
- SOME_VALUE: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- TIMESTAMP: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
- event_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Plan:
LogicalTableScan(table=[[data_1]], hints=[[[WatermarkHint inheritPath:[] options:[4]]]]) hints[WatermarkHint options:[4]]

=== Event.Event
ID: data_2_1
Type: export
Stage: streams
Inputs: data_2

=== ImportedEvent
ID: importedevent_2
Type: state
Stage: streams
Primary Key: _uuid
Timestamp : event_time
Schema:
- _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- ID: BIGINT
- EPOCH_TIMESTAMP: BIGINT
- SOME_VALUE: VARCHAR(2147483647) CHARACTER SET "UTF-16LE"
- TIMESTAMP: TIMESTAMP(6)
- event_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Plan:
LogicalTableScan(table=[[importedevent_1]], hints=[[[WatermarkHint inheritPath:[] options:[5]]]]) hints[WatermarkHint options:[5]]

=== print.LogImportedEvent
ID: importedevent_2_2
Type: export
Stage: streams
Inputs: importedevent_2

>>>flink.json
{
"flinkSql" : [
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `epochmillitotimestamp` AS 'com.datasqrl.time.EpochMilliToTimestamp' LANGUAGE JAVA;",
"CREATE TEMPORARY TABLE `importedevent_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT,\n `EPOCH_TIMESTAMP` BIGINT,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,\n `TIMESTAMP` TIMESTAMP(6),\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`_uuid`) NOT ENFORCED,\n WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'hostname' = 'postgres_log',\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'postgres-cdc',\n 'port' = '5432',\n 'slot.name' = 'flink_slot',\n 'database-name' = 'datasqrl',\n 'schema-name' = 'public',\n 'decoding.plugin.name' = 'pgoutput',\n 'table-name' = 'Event',\n 'debezium.slot.drop_on_stop' = 'false',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `data_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` AS EPOCHMILLITOTIMESTAMP(`EPOCH_TIMESTAMP`),\n `event_time` AS EPOCHMILLITOTIMESTAMP(`EPOCH_TIMESTAMP`),\n PRIMARY KEY (`ID`) NOT ENFORCED,\n WATERMARK FOR `TIMESTAMP` AS `TIMESTAMP` - INTERVAL '0.001' SECOND\n) WITH (\n 'fields.ID.end' = '9',\n 'number-of-rows' = '10',\n 'connector' = 'datagen',\n 'fields.EPOCH_TIMESTAMP.kind' = 'sequence',\n 'fields.EPOCH_TIMESTAMP.end' = '1719319565000',\n 'fields.EPOCH_TIMESTAMP.start' = '1719318565000',\n 'fields.SOME_VALUE.kind' = 'random',\n 'fields.ID.start' = '0',\n 'fields._uuid.kind' = 'random',\n 'fields.ID.kind' = 'sequence'\n);",
"CREATE TEMPORARY TABLE `data_2_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`_uuid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'Event',\n 'url' = '${POSTGRES_LOG_JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `importedevent_2_2` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT,\n `EPOCH_TIMESTAMP` BIGINT,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE`,\n `TIMESTAMP` TIMESTAMP(6),\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'connector' = 'print',\n 'print-identifier' = 'LogImportedEvent'\n);",
"CREATE VIEW `table$1`\nAS\nSELECT *\nFROM `data_1`;",
"CREATE VIEW `table$2`\nAS\nSELECT *\nFROM `importedevent_1`;",
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `data_2_1`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `importedevent_2_2`\n (SELECT *\n FROM `table$2`)\n ;\n END;"
],
"connectors" : [
"print",
"datagen",
"jdbc-sqrl",
"postgres-cdc"
],
"formats" : [ ]
}
>>>postgres_log.json
{
"ddl" : [
{
"name" : "\"Event\"",
"columns" : [
"\"_uuid\" TEXT NOT NULL",
"\"ID\" BIGINT ",
"\"EPOCH_TIMESTAMP\" BIGINT ",
"\"SOME_VALUE\" TEXT ",
"\"TIMESTAMP\" TIMESTAMP WITHOUT TIME ZONE ",
"\"event_time\" TIMESTAMP WITH TIME ZONE NOT NULL"
],
"primaryKeys" : [
"\"_uuid\""
],
"sql" : "CREATE TABLE IF NOT EXISTS \"Event\" (\"_uuid\" TEXT NOT NULL,\"ID\" BIGINT ,\"EPOCH_TIMESTAMP\" BIGINT ,\"SOME_VALUE\" TEXT ,\"TIMESTAMP\" TIMESTAMP WITHOUT TIME ZONE ,\"event_time\" TIMESTAMP WITH TIME ZONE NOT NULL , PRIMARY KEY (\"_uuid\"));"
},
{
"sql" : "CREATE OR REPLACE FUNCTION notify_on_Event_insert()\nRETURNS TRIGGER AS $$\nBEGIN\n PERFORM pg_notify('Event_notify', jsonb_build_object('_uuid', NEW._uuid)::text);\n RETURN NEW;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE TRIGGER insert_notify_trigger\nAFTER INSERT ON \"Event\"\nFOR EACH ROW EXECUTE PROCEDURE notify_on_Event_insert();"
}
],
"queries" : [
{
"listen" : {
"sql" : "LISTEN Event_notify;"
},
"onNotify" : {
"sql" : "SELECT *\nFROM \"Event\"\nWHERE \"_uuid\" = $1"
},
"parameters" : [
"_uuid"
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
>>>pipeline_explain.txt
=== Data
ID: data_2
Type: stream
Stage: streams
Primary Key: ID
Timestamp : TIMESTAMP
Schema:
- _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- ID: BIGINT NOT NULL
- EPOCH_TIMESTAMP: BIGINT NOT NULL
- SOME_VALUE: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- TIMESTAMP: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Plan:
LogicalTableScan(table=[[data_1]], hints=[[[WatermarkHint inheritPath:[] options:[4]]]]) hints[WatermarkHint options:[4]]

=== Event.Event
ID: data_2_1
Type: export
Stage: streams
Inputs: data_2

=== Event
ID: event_2
Type: state
Stage: streams
Primary Key: _uuid
Timestamp : event_time
Schema:
- _uuid: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- ID: BIGINT NOT NULL
- EPOCH_TIMESTAMP: BIGINT NOT NULL
- SOME_VALUE: VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL
- TIMESTAMP: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
- event_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL
Plan:
LogicalTableScan(table=[[event_1]], hints=[[[WatermarkHint inheritPath:[] options:[5]]]]) hints[WatermarkHint options:[5]]

=== print.LogEvent
ID: event_2_2
Type: export
Stage: streams
Inputs: event_2

>>>flink.json
{
"flinkSql" : [
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `epochmillitotimestamp` AS 'com.datasqrl.time.EpochMilliToTimestamp' LANGUAGE JAVA;",
"CREATE TEMPORARY TABLE `event_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`_uuid`) NOT ENFORCED,\n WATERMARK FOR `event_time` AS `event_time` - INTERVAL '0.0' SECOND\n) WITH (\n 'hostname' = 'postgres_log',\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'postgres-cdc',\n 'port' = '5432',\n 'slot.name' = 'flink_slot',\n 'database-name' = 'datasqrl',\n 'schema-name' = 'public',\n 'decoding.plugin.name' = 'pgoutput',\n 'table-name' = 'Event',\n 'debezium.slot.drop_on_stop' = 'false',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `data_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` AS EPOCHMILLITOTIMESTAMP(`EPOCH_TIMESTAMP`),\n PRIMARY KEY (`ID`) NOT ENFORCED,\n WATERMARK FOR `TIMESTAMP` AS `TIMESTAMP` - INTERVAL '0.001' SECOND\n) WITH (\n 'fields.ID.end' = '9',\n 'number-of-rows' = '10',\n 'connector' = 'datagen',\n 'fields.EPOCH_TIMESTAMP.kind' = 'sequence',\n 'fields.EPOCH_TIMESTAMP.end' = '1719319565000',\n 'fields.EPOCH_TIMESTAMP.start' = '1719318565000',\n 'fields.SOME_VALUE.kind' = 'random',\n 'fields.ID.start' = '0',\n 'fields._uuid.kind' = 'random',\n 'fields.ID.kind' = 'sequence'\n);",
"CREATE TEMPORARY TABLE `data_2_1` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n PRIMARY KEY (`_uuid`) NOT ENFORCED\n) WITH (\n 'password' = '${JDBC_PASSWORD}',\n 'connector' = 'jdbc-sqrl',\n 'driver' = 'org.postgresql.Driver',\n 'table-name' = 'Event',\n 'url' = '${POSTGRES_LOG_JDBC_URL}',\n 'username' = '${JDBC_USERNAME}'\n);",
"CREATE TEMPORARY TABLE `event_2_2` (\n `_uuid` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `ID` BIGINT NOT NULL,\n `EPOCH_TIMESTAMP` BIGINT NOT NULL,\n `SOME_VALUE` VARCHAR(2147483647) CHARACTER SET `UTF-16LE` NOT NULL,\n `TIMESTAMP` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL,\n `event_time` TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL\n) WITH (\n 'connector' = 'print',\n 'print-identifier' = 'LogEvent'\n);",
"CREATE VIEW `table$1`\nAS\nSELECT *\nFROM `data_1`;",
"CREATE VIEW `table$2`\nAS\nSELECT *\nFROM `event_1`;",
"EXECUTE STATEMENT SET BEGIN\nINSERT INTO `data_2_1`\n(SELECT *\n FROM `table$1`)\n;\nINSERT INTO `event_2_2`\n (SELECT *\n FROM `table$2`)\n ;\n END;"
],
"connectors" : [
"print",
"datagen",
"jdbc-sqrl",
"postgres-cdc"
],
"formats" : [ ]
}
>>>postgres_log.json
{
"ddl" : [
{
"name" : "\"Event\"",
"columns" : [
"\"_uuid\" TEXT NOT NULL",
"\"ID\" BIGINT NOT NULL",
"\"EPOCH_TIMESTAMP\" BIGINT NOT NULL",
"\"SOME_VALUE\" TEXT NOT NULL",
"\"TIMESTAMP\" TIMESTAMP WITH TIME ZONE NOT NULL",
"\"event_time\" TIMESTAMP WITH TIME ZONE NOT NULL"
],
"primaryKeys" : [
"\"_uuid\""
],
"sql" : "CREATE TABLE IF NOT EXISTS \"Event\" (\"_uuid\" TEXT NOT NULL,\"ID\" BIGINT NOT NULL,\"EPOCH_TIMESTAMP\" BIGINT NOT NULL,\"SOME_VALUE\" TEXT NOT NULL,\"TIMESTAMP\" TIMESTAMP WITH TIME ZONE NOT NULL,\"event_time\" TIMESTAMP WITH TIME ZONE NOT NULL , PRIMARY KEY (\"_uuid\"));"
},
{
"sql" : "CREATE OR REPLACE FUNCTION notify_on_Event_insert()\nRETURNS TRIGGER AS $$\nBEGIN\n PERFORM pg_notify('Event_notify', jsonb_build_object('_uuid', NEW._uuid)::text);\n RETURN NEW;\nEND;\n$$ LANGUAGE plpgsql;\n\nCREATE TRIGGER insert_notify_trigger\nAFTER INSERT ON \"Event\"\nFOR EACH ROW EXECUTE PROCEDURE notify_on_Event_insert();"
}
],
"queries" : [
{
"listen" : {
"sql" : "LISTEN Event_notify;"
},
"onNotify" : {
"sql" : "SELECT *\nFROM \"Event\"\nWHERE \"_uuid\" = $1"
},
"parameters" : [
"_uuid"
]
}
]
}
Loading

0 comments on commit 9253d96

Please sign in to comment.