Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve table function based on StorageEngine provided function resolver #1354

Merged
merged 4 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies {
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "skipped", "failed"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore log PASSED test cases.

exceptionFormat "full"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
.collect(Collectors.toList());
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(context.getFunctionProperties(),
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
dataSourceService
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
.getStorageEngine().getFunctions(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
public enum DataSourceType {
PROMETHEUS,
OPENSEARCH,
FILESYSTEM
JDBC
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FILESYSTEM supported has been removed in #1284.

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,6 +35,7 @@
import org.opensearch.sql.expression.system.SystemFunctions;
import org.opensearch.sql.expression.text.TextFunction;
import org.opensearch.sql.expression.window.WindowFunctions;
import org.opensearch.sql.storage.StorageEngine;

/**
* Builtin Function Repository.
Expand All @@ -40,22 +44,20 @@
*
*/
public class BuiltinFunctionRepository {
public static final String DEFAULT_NAMESPACE = "default";

private final Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap;
private final Map<FunctionName, FunctionResolver> functionResolverMap;

/** The singleton instance. */
private static BuiltinFunctionRepository instance;

/**
* Construct a function repository with the given function registered. This is only used in test.
*
* @param namespaceFunctionResolverMap function supported
* @param functionResolverMap function supported
*/
@VisibleForTesting
BuiltinFunctionRepository(
Map<String, Map<FunctionName, FunctionResolver>> namespaceFunctionResolverMap) {
this.namespaceFunctionResolverMap = namespaceFunctionResolverMap;
BuiltinFunctionRepository(Map<FunctionName, FunctionResolver> functionResolverMap) {
this.functionResolverMap = functionResolverMap;
}

/**
Expand Down Expand Up @@ -86,106 +88,86 @@ public static synchronized BuiltinFunctionRepository getInstance() {
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository
* under default namespace.
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(FunctionResolver resolver) {
register(DEFAULT_NAMESPACE, resolver);
functionResolverMap.put(resolver.getFunctionName(), resolver);
}

/**
* Register {@link DefaultFunctionResolver} to the Builtin Function Repository with
* specified namespace.
*
* @param resolver {@link DefaultFunctionResolver} to be registered
*/
public void register(String namespace, FunctionResolver resolver) {
Map<FunctionName, FunctionResolver> functionResolverMap;
if (!namespaceFunctionResolverMap.containsKey(namespace)) {
functionResolverMap = new HashMap<>();
namespaceFunctionResolverMap.put(namespace, functionResolverMap);
}
namespaceFunctionResolverMap.get(namespace).put(resolver.getFunctionName(), resolver);
}

/**
* Compile FunctionExpression under default namespace.
* Compile FunctionExpression using core function resolver.
*
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
FunctionName functionName, List<Expression> expressions) {
return compile(functionProperties, DEFAULT_NAMESPACE, functionName, expressions);
return compile(functionProperties, Collections.emptyList(), functionName, expressions);
}


/**
* Compile FunctionExpression within given namespace.
* Checks for default namespace first and then tries to compile from given namespace.
* Compile FunctionExpression within {@link StorageEngine} provided {@link FunctionResolver}.
*/
public FunctionImplementation compile(FunctionProperties functionProperties,
String namespace,
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionName functionName,
List<Expression> expressions) {
List<String> namespaceList = new ArrayList<>(List.of(DEFAULT_NAMESPACE));
if (!namespace.equals(DEFAULT_NAMESPACE)) {
namespaceList.add(namespace);
}
FunctionBuilder resolvedFunctionBuilder = resolve(
namespaceList, new FunctionSignature(functionName, expressions
.stream().map(Expression::type).collect(Collectors.toList())));
FunctionBuilder resolvedFunctionBuilder =
resolve(
dataSourceFunctionResolver,
new FunctionSignature(
functionName,
expressions.stream().map(Expression::type).collect(Collectors.toList())));
return resolvedFunctionBuilder.apply(functionProperties, expressions);
}

/**
* Resolve the {@link FunctionBuilder} in
* repository under a list of namespaces.
* Returns the First FunctionBuilder found.
* So list of namespaces is also the priority of namespaces.
* Resolve the {@link FunctionBuilder} in repository under a list of namespaces. Returns the First
* FunctionBuilder found. So list of namespaces is also the priority of namespaces.
*
* @param functionSignature {@link FunctionSignature} functionsignature.
* @return Original function builder if it's a cast function or all arguments have expected types
* or otherwise wrap its arguments by cast function as needed.
* or otherwise wrap its arguments by cast function as needed.
*/
public FunctionBuilder
resolve(List<String> namespaces,
FunctionSignature functionSignature) {
FunctionName functionName = functionSignature.getFunctionName();
FunctionBuilder result = null;
for (String namespace : namespaces) {
if (namespaceFunctionResolverMap.containsKey(namespace)
&& namespaceFunctionResolverMap.get(namespace).containsKey(functionName)) {
result = getFunctionBuilder(functionSignature, functionName,
namespaceFunctionResolverMap.get(namespace));
break;
}
}
if (result == null) {
throw new ExpressionEvaluationException(
String.format("unsupported function name: %s", functionName.getFunctionName()));
} else {
return result;
}
@VisibleForTesting
public FunctionBuilder resolve(
Collection<FunctionResolver> dataSourceFunctionResolver,
FunctionSignature functionSignature) {
Map<FunctionName, FunctionResolver> dataSourceFunctionMap = dataSourceFunctionResolver.stream()
.collect(Collectors.toMap(FunctionResolver::getFunctionName, t -> t));

// first, resolve in datasource provide function resolver.
// second, resolve in builtin function resolver.
return resolve(functionSignature, dataSourceFunctionMap)
.or(() -> resolve(functionSignature, functionResolverMap))
.orElseThrow(
() ->
new ExpressionEvaluationException(
String.format(
"unsupported function name: %s", functionSignature.getFunctionName())));
}

private FunctionBuilder getFunctionBuilder(
private Optional<FunctionBuilder> resolve(
FunctionSignature functionSignature,
FunctionName functionName,
Map<FunctionName, FunctionResolver> functionResolverMap) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return funcBuilder;
FunctionName functionName = functionSignature.getFunctionName();
if (functionResolverMap.containsKey(functionName)) {
Pair<FunctionSignature, FunctionBuilder> resolvedSignature =
functionResolverMap.get(functionName).resolve(functionSignature);

List<ExprType> sourceTypes = functionSignature.getParamTypeList();
List<ExprType> targetTypes = resolvedSignature.getKey().getParamTypeList();
FunctionBuilder funcBuilder = resolvedSignature.getValue();
if (isCastFunction(functionName)
|| FunctionSignature.isVarArgFunction(targetTypes)
|| sourceTypes.equals(targetTypes)) {
return Optional.of(funcBuilder);
}
return Optional.of(castArguments(sourceTypes, targetTypes, funcBuilder));
} else {
return Optional.empty();
}
return castArguments(sourceTypes,
targetTypes, funcBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
package org.opensearch.sql.analysis;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.analysis.symbol.SymbolTable;
Expand Down Expand Up @@ -52,6 +56,39 @@ protected StorageEngine storageEngine() {
return (dataSourceSchemaName, tableName) -> table;
}

protected StorageEngine prometheusStorageEngine() {
return new StorageEngine() {
@Override
public Collection<FunctionResolver> getFunctions() {
return Collections.singletonList(
new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(
functionSignature,
(functionProperties, args) ->
new TestTableFunctionImplementation(functionName, args, table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
}

@Override
public Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName) {
return table;
}
};
}

protected Table table() {
return Optional.ofNullable(table).orElseGet(() -> new Table() {
@Override
Expand Down Expand Up @@ -109,30 +146,11 @@ protected Environment<Expression, ExprType> typeEnv() {

protected DataSourceService dataSourceService = dataSourceService();

protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService, table);
protected Analyzer analyzer = analyzer(expressionAnalyzer(), dataSourceService);

protected Analyzer analyzer(ExpressionAnalyzer expressionAnalyzer,
DataSourceService dataSourceService,
Table table) {
DataSourceService dataSourceService) {
BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance();
functionRepository.register("prometheus", new FunctionResolver() {

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
return Pair.of(functionSignature,
(functionProperties, args) -> new TestTableFunctionImplementation(functionName, args,
table));
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of("query_range");
}
});
return new Analyzer(expressionAnalyzer, dataSourceService, functionRepository);
}

Expand All @@ -158,19 +176,24 @@ protected LogicalPlan analyze(UnresolvedPlan unresolvedPlan) {

private class DefaultDataSourceService implements DataSourceService {

private StorageEngine storageEngine = storageEngine();
private final DataSource dataSource
= new DataSource("prometheus", DataSourceType.PROMETHEUS, storageEngine);
private final DataSource opensearchDataSource = new DataSource(DEFAULT_DATASOURCE_NAME,
DataSourceType.OPENSEARCH, storageEngine());
private final DataSource prometheusDataSource
= new DataSource("prometheus", DataSourceType.PROMETHEUS, prometheusStorageEngine());


@Override
public Set<DataSource> getDataSources() {
return ImmutableSet.of(dataSource);
return ImmutableSet.of(opensearchDataSource, prometheusDataSource);
}

@Override
public DataSource getDataSource(String dataSourceName) {
return dataSource;
if ("prometheus".equals(dataSourceName)) {
return prometheusDataSource;
} else {
return opensearchDataSource;
}
}

@Override
Expand Down
Loading