Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compiled plan to flink plan #853

Merged
merged 5 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,50 @@
import com.datasqrl.calcite.SqrlFramework;
import com.datasqrl.calcite.convert.SqlNodeToString;
import com.datasqrl.calcite.convert.SqlToStringFactory;
import com.datasqrl.config.BuildPath;
import com.datasqrl.engine.stream.flink.plan.SqrlToFlinkSqlGenerator;
import com.datasqrl.engine.stream.flink.plan.SqrlToFlinkSqlGenerator.SqlResult;
import com.datasqrl.plan.global.PhysicalDAGPlan.StagePlan;
import com.datasqrl.plan.global.PhysicalDAGPlan.StreamStagePlan;
import com.google.inject.Inject;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.collections.ListUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.sql.parser.ddl.SqlSet;
import org.apache.flink.sql.parser.dml.SqlExecute;
import org.apache.flink.sql.parser.dml.SqlStatementSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.CompiledPlan;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.operations.StatementSetOperation;

/**
*
*/
@AllArgsConstructor(onConstructor_ = @Inject)
@Slf4j
public class FlinkSqlGenerator {

private final SqrlFramework framework;
private final BuildPath buildPath;

public Pair<List<String>, List<SqlNode>> run(StreamStagePlan physicalPlan,
public FlinkSqlGeneratorResult run(StreamStagePlan physicalPlan,
List<StagePlan> stagePlans) {
SqrlToFlinkSqlGenerator sqlPlanner = new SqrlToFlinkSqlGenerator(framework);
SqlResult result = sqlPlanner.plan(physicalPlan.getQueries(), stagePlans);
Expand Down Expand Up @@ -58,7 +76,66 @@ public Pair<List<String>, List<SqlNode>> run(StreamStagePlan physicalPlan,
plan.add(sql);
}
}
return Pair.of(plan, flinkSql);

CompiledPlan compiledPlan = null;
try {
compiledPlan = createCompiledPlan(result, physicalPlan);
Path path = buildPath.getBuildDir().resolve("compiled-plan.json");
compiledPlan.writeToFile(path.toAbsolutePath().toString(),
true);
} catch (Exception e) {
log.warn("Could not prepare compiled plan: " + e.getMessage());
}

return new FlinkSqlGeneratorResult(plan, flinkSql);
}

private CompiledPlan createCompiledPlan(SqlResult result, StreamStagePlan physicalPlan) {
List<SqlNode> stubSchema = result.getStubSchema();
stubSchema = ListUtils.union(stubSchema, result.getQueries());

URL[] urlArray = physicalPlan.getJars().toArray(new URL[0]);
ClassLoader udfClassLoader = new URLClassLoader(urlArray, getClass().getClassLoader());
Map<String, String> config = new HashMap<>();
config.put("pipeline.classpaths", physicalPlan.getJars().stream().map(URL::toString)
.collect(Collectors.joining(",")));
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.createLocalEnvironment(Configuration.fromMap(config));

EnvironmentSettings tEnvConfig = EnvironmentSettings.newInstance()
.withConfiguration(Configuration.fromMap(config))
.withClassLoader(udfClassLoader)
.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, tEnvConfig);
TableResult tableResult = null;
SqlNodeToString sqlNodeToString = SqlToStringFactory.get(Dialect.FLINK);

for (int i = 0; i < stubSchema.size(); i++) {
SqlNode sqlNode = stubSchema.get(i);
String statement = sqlNodeToString.convert(() -> sqlNode).getSql() + ";";

try {
tableResult = tEnv.executeSql(statement);
} catch (Exception e) {
System.out.println("Could not execute statement: " + statement);
throw e;
}
}
SqlStatementSet sqlStatementSet = new SqlStatementSet(result.getInserts(), SqlParserPos.ZERO);
SqlExecute execute = new SqlExecute(sqlStatementSet, SqlParserPos.ZERO);

String insert = sqlNodeToString.convert(() -> execute).getSql() + ";";

TableEnvironmentImpl tEnv1 = (TableEnvironmentImpl) tEnv;

StatementSetOperation parse = (StatementSetOperation)tEnv1.getParser().parse(insert).get(0);

return tEnv1.compilePlan(parse.getOperations());
}

@Value
public class FlinkSqlGeneratorResult {
List<String> plan;
List<SqlNode> flinkSql;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,9 @@
*/
package com.datasqrl.engine;

import com.datasqrl.cmd.EngineKeys;
import com.datasqrl.config.EngineFactory.Type;
import com.datasqrl.engine.database.DatabasePhysicalPlan;
import com.datasqrl.engine.database.QueryTemplate;
import com.datasqrl.engine.database.relational.JDBCPhysicalPlan;
import com.datasqrl.engine.log.kafka.KafkaPhysicalPlan;
import com.datasqrl.engine.pipeline.ExecutionStage;
import com.datasqrl.engine.server.ServerPhysicalPlan;
import com.datasqrl.engine.stream.flink.plan.FlinkStreamPhysicalPlan;
import com.datasqrl.plan.queries.IdentifiedQuery;
import com.datasqrl.util.StreamUtil;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.datasqrl.engine.EngineFeature.STANDARD_STREAM;

import com.datasqrl.actions.FlinkSqlGenerator;
import com.datasqrl.actions.FlinkSqlGenerator.FlinkSqlGeneratorResult;
import com.datasqrl.calcite.SqrlFramework;
import com.datasqrl.config.PackageJson.EngineConfig;
import com.datasqrl.config.EngineFactory.Type;
Expand All @@ -25,8 +26,6 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.lang3.tuple.Pair;

@Slf4j
public abstract class AbstractFlinkStreamEngine extends ExecutionEngine.Base implements
Expand All @@ -37,9 +36,12 @@ public abstract class AbstractFlinkStreamEngine extends ExecutionEngine.Base imp
@Getter
private final EngineConfig config;

public AbstractFlinkStreamEngine(EngineConfig config) {
FlinkSqlGenerator generator;

public AbstractFlinkStreamEngine(EngineConfig config, FlinkSqlGenerator generator) {
super(FlinkEngineFactory.ENGINE_NAME, Type.STREAMS, FLINK_CAPABILITIES);
this.config = config;
this.generator = generator;
}

// @Override
Expand All @@ -54,11 +56,10 @@ public FlinkStreamPhysicalPlan plan(StagePlan stagePlan, List<StageSink> inputs,
Preconditions.checkArgument(inputs.isEmpty());
Preconditions.checkArgument(stagePlan instanceof StreamStagePlan);
StreamStagePlan plan = (StreamStagePlan) stagePlan;
FlinkSqlGenerator generator = new FlinkSqlGenerator(framework);

FlinkSqlGeneratorResult flinkSql = generator.run(plan, stagePlans);

Pair<List<String>, List<SqlNode>> flinkSql = generator.run(plan, stagePlans);
return new FlinkStreamPhysicalPlan(flinkSql.getLeft(), flinkSql.getRight());
return new FlinkStreamPhysicalPlan(flinkSql.getPlan(), flinkSql.getFlinkSql());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package com.datasqrl.engine.stream.flink;

import com.datasqrl.actions.FlinkSqlGenerator;
import com.datasqrl.config.PackageJson;
import com.datasqrl.config.PackageJson.EmptyEngineConfig;
import com.google.inject.Inject;
Expand All @@ -12,8 +13,9 @@
public class LocalFlinkStreamEngineImpl extends AbstractFlinkStreamEngine {

@Inject
public LocalFlinkStreamEngineImpl(PackageJson json) {
public LocalFlinkStreamEngineImpl(PackageJson json, FlinkSqlGenerator generator) {
super(json.getEngines().getEngineConfig(FlinkEngineFactory.ENGINE_NAME)
.orElseGet(()->new EmptyEngineConfig(FlinkEngineFactory.ENGINE_NAME)));
.orElseGet(()->new EmptyEngineConfig(FlinkEngineFactory.ENGINE_NAME)),
generator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datasqrl.engine.stream.StreamPhysicalPlan;
import com.datasqrl.plan.global.PhysicalDAGPlan.StreamStagePlan;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -13,6 +14,7 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.CompiledPlan;

@Getter
public class FlinkStreamPhysicalPlan implements StreamPhysicalPlan {
Expand All @@ -22,7 +24,8 @@ public class FlinkStreamPhysicalPlan implements StreamPhysicalPlan {
private final Set<String> connectors;
private final Set<String> formats;

public FlinkStreamPhysicalPlan( List<String> flinkSql,

public FlinkStreamPhysicalPlan(List<String> flinkSql,
List<SqlNode> sqlNodes) {
this.flinkSql = flinkSql;
this.connectors = extractConnectors(sqlNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.Programs;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.collections.ListUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.sql.parser.ddl.*;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
Expand Down Expand Up @@ -75,6 +76,8 @@ public SqlResult plan(List<? extends Query> stageQueries, List<StagePlan> stageP
checkPreconditions(stageQueries);
List<WriteQuery> writeQueries = applyFlinkCompatibilityRules(stageQueries);
Set<SqlCall> sinksAndSources = extractTableDescriptors(writeQueries);
List<SqlCall> stubSinks = new ArrayList<>();
List<SqlCall> stubSources = extractStubSources(writeQueries);

List<SqlCreateView> queries = new ArrayList<>();
List<RichSqlInsert> inserts = new ArrayList<>();
Expand All @@ -83,21 +86,23 @@ public SqlResult plan(List<? extends Query> stageQueries, List<StagePlan> stageP
for (WriteQuery query : writeQueries) {
TableConfig tableConfig = getTableConfig(query.getSink());
if (query.getType().isState() && tableConfig.getConnectorConfig().getTableType().isStream()) {
log.warn("Attempting to write a stream to a state table. This may fail at runtime.");
// log.warn("Attempting to write a stream to a state table. This may fail at runtime.");
}
FlinkConnectorDataTypeMappingFactory mappingFactory = new FlinkConnectorDataTypeMappingFactory();
Optional<DataTypeMapper> connectorMapping = mappingFactory.getConnectorMapping(tableConfig);
RelNode relNode = applyDowncasting(framework.getQueryPlanner().getRelBuilder(),
query.getExpandedRelNode(), query.getSink(), downcastClassNames, connectorMapping);
Pair<List<SqlCreateView>, RichSqlInsert> result = process(query.getSink().getName(), relNode);
SqlCreateTable sqlCreateTable = registerSinkTable(query.getSink(), relNode, stagePlans);
SqlCreateTable subSink = extractStubSinks(query.getSink(), relNode, stagePlans);
stubSinks.add(subSink);
sinksAndSources.add(sqlCreateTable);
queries.addAll(result.getKey());
inserts.add(result.getValue());
}

List<SqlCreateFunction> functions = extractFunctions(writeQueries, downcastClassNames);
return new SqlResult(sinksAndSources, inserts, queries, functions);
return new SqlResult(sinksAndSources, ListUtils.union(stubSources, stubSinks), inserts, queries, functions);
}

private Pair<List<SqlCreateView>, RichSqlInsert> process(String name, RelNode relNode) {
Expand Down Expand Up @@ -223,6 +228,34 @@ private Set<SqlCall> extractTableDescriptors(List<WriteQuery> queries) {

return sources;
}
private List<SqlCall> extractStubSources(List<WriteQuery> queries) {
Map<String, ImportedRelationalTable> tables = uniqueSourceExtractor.extract(queries);

List<SqlCall> sources = new ArrayList<>();
for (Map.Entry<String, ImportedRelationalTable> entry : tables.entrySet()) {
String tableName = entry.getKey();
ImportedRelationalTable table = entry.getValue();
TableConfig tableConfig = table.getTableSource().getConfiguration();

SqlCreateTable sqlCreateTable = FlinkSqlNodeFactory.createTable(
tableName,
table.getRowType(),
tableConfig.getBase().getPartitionKey(),
tableConfig.getBase().getWatermarkMillis(),
tableConfig.getBase().getTimestampColumn().map(NamePath::parse)
.map(NamePath::getLast)
.map(Name::getDisplay),
Map.of(),
tableConfig.getPrimaryKeyConstraint(),
Map.of("connector", "datagen"),
e -> framework.getQueryPlanner().parseCall(e)
);

sources.add(sqlCreateTable);
}

return sources;
}

private SqlCreateTable registerSinkTable(WriteSink sink, RelNode relNode, List<StagePlan> stagePlans) {
String name;
Expand Down Expand Up @@ -285,6 +318,67 @@ private SqlCreateTable registerSinkTable(WriteSink sink, RelNode relNode, List<S
return sqlCreateTable;
}

private SqlCreateTable extractStubSinks(WriteSink sink, RelNode relNode, List<StagePlan> stagePlans) {
String name;
TableConfig tableConfig;

if (sink instanceof EngineSink) {
EngineSink engineSink = (EngineSink) sink;
TableConfig engineConfig = engineSink.getStage().getEngine().getSinkConfig(engineSink.getNameId());

StagePlan stagePlan = stagePlans.stream()
.filter(f -> f.getStage() == engineSink.getStage())
.findFirst()
.orElseThrow();

TableConfigBuilder configBuilder = engineConfig.toBuilder();

if (engineSink.getStage().supportsFeature(EngineFeature.PARTITIONING)) {
DatabaseStagePlan dbPlan = (DatabaseStagePlan) stagePlan;
String tableId = engineSink.getNameId();
Optional<IndexDefinition> optIndex = dbPlan.getIndexDefinitions().stream()
.filter(idx -> idx.getTableId().equals(tableId))
.filter(idx -> idx.getType().isPartitioned())
.findFirst();
optIndex.ifPresent(partitionIndex -> {
List<String> partitionColumns = partitionIndex.getColumnNames()
.subList(0, partitionIndex.getPartitionOffset());
if (!partitionColumns.isEmpty()) {
configBuilder.setPartitionKey(partitionColumns);
}
});
}

String[] pks = IntStream.of(engineSink.getPrimaryKeys())
.mapToObj(i -> relNode.getRowType().getFieldList().get(i).getName())
.toArray(String[]::new);
configBuilder.setPrimaryKey(pks);

tableConfig = configBuilder.build();
name = engineSink.getNameId();
} else if (sink instanceof ExternalSink) {
ExternalSink externalSink = (ExternalSink) sink;
tableConfig = externalSink.getTableSink().getConfiguration();
name = externalSink.getName();
} else {
throw new RuntimeException("Could not identify write sink type.");
}

SqlCreateTable sqlCreateTable = FlinkSqlNodeFactory.createTable(
name,
relNode.getRowType(),
tableConfig.getBase().getPartitionKey(),
-1,
Optional.empty(),
tableConfig.getMetadataConfig().toMap(),
tableConfig.getPrimaryKeyConstraint(),
Map.of("connector", "blackhole"),
e -> framework.getQueryPlanner().parseCall(e)
);

return sqlCreateTable;
}

private void checkPreconditions(List<? extends Query> queries) {
queries.forEach(query -> Preconditions.checkState(query instanceof WriteQuery,
"Unexpected query type when creating executable plan"));
Expand Down Expand Up @@ -314,8 +408,13 @@ private RelNode applyFlinkCompatibilityRules(RelNode relNode) {
public static class SqlResult {

private Set<SqlCall> sinksSources;
private List<SqlCall> stubSinksSources;
private List<RichSqlInsert> inserts;
private List<SqlCreateView> queries;
private List<SqlCreateFunction> functions;

public List<SqlNode> getStubSchema() {
return ListUtils.union(functions, stubSinksSources);
}
}
}
Loading
Loading