Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26570][statefun] Remote module configuration interpolation #309

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/content/docs/modules/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,36 @@ spec:

A module YAML file can contain multiple YAML documents, separated by `---`, each representing a component to be included in the application.
Each component is defined by a kind typename string and a spec object containing the component's properties.

# Configuration string interpolation
You can use `${placeholders}` inside `spec` elements. These will be replaced by entries from a configuration map, consisting of:
1. System properties
2. Environment variables
3. flink-conf.yaml entries with prefix 'statefun.module.global-config.'
4. Command line args
Comment on lines +67 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more common to give env variables precedence over flink-conf.yaml values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in principal. That said, because globalConfiguration is already an combination of args and flink-conf.yaml entries with the statefun.module.global-config. prefix, there's no easy way to put env variables in between them without affecting other parts of the system.

I made a start in my fork and the number of changes is pretty high for what we gain. Please have a look and let me know if I Should working on the change you mentioned in this comment

FilKarnicki@02cd6a9


where (4) override (3) which override (2) which override (1).

Example:
```yaml
kind: io.statefun.endpoints.v2/http
spec:
functions: com.example/*
urlPathTemplate: ${FUNC_PROTOCOL}://${FUNC_DNS}/{function.name}
---
kind: io.statefun.kafka.v1/ingress
spec:
id: com.example/my-ingress
address: ${KAFKA_ADDRESS}:${KAFKA_PORT}
consumerGroupId: my-consumer-group
topics:
- topic: ${KAFKA_INGRESS_TOPIC}
(...)
properties:
- ssl.truststore.location: ${SSL_TRUSTSTORE_LOCATION}
- ssl.truststore.password: ${SSL_TRUSTSTORE_PASSWORD}
(...)
Comment on lines +76 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the flexibility the templating mechanism gives to the user. The thing I am asking myself is whether it gives too much power so that the user can shoot himself into his foot.

The danger I see is that we add level of indirections that make it harder to reason about what the effective yaml will look like for a user. The underlying problem the issue wants to solve is to pass in information that is only available to the process that runs SF but not the user (e.g. secrets). I am wondering whether there is an alternative to achieve the same but with a bit less power (e.g. allowing substitution only for selected fields (also confusing)). I don't have a perfect answer here. I mainly wanted to hear your and @igalshilman's opinion here.

Maybe one idea could be to log the effective/resolved yaml somewhere so that the user sees what is actually run, if this does not happen already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of logging the effective yaml somewhere. We should probably make it an opt-in kind of a deal, since we don't want to be automatically logging secrets. I'll hold off on coding this until we hear from @igalshilman

```
{{< hint info >}}
Please note that `{function.name}` is not a placeholder to be replaced by entries from the merged configuration. See [url template]({{< ref "docs/modules/http-endpoint" >}})
{{< /hint >}}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withBuildContextFileFromClasspath("ssl/", "ssl/")
.withModuleGlobalConfiguration("MAX_NUM_BATCH_REQUESTS", "10000")
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host
maxNumBatchRequests: 10000
maxNumBatchRequests: ${MAX_NUM_BATCH_REQUESTS}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withBuildContextFileFromClasspath("certs", "/certs/")
.withModuleGlobalConfiguration("TEST_COMMAND_INTERPRETER_FN", "command-interpreter-fn")
.withModuleGlobalConfiguration("TEST_SERVER_PROTOCOL", "https://")
.withModuleGlobalConfiguration("TEST_NUM_BATCH_REQUESTS", "10000")
.withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_PORT", "8000")
// TEST_REMOTE_FUNCTION_HOST placeholder value is taken from docker env variables
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/
COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/
COPY certs/ /opt/statefun/modules/statefun-smoke-e2e/certs/
COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml

ENV TEST_REMOTE_FUNCTION_HOST=remote-function-host
ENV TEST_SERVER_PROTOCOL=WILL-BE-REPLACED-BY-GLOBAL-VARIABLE-FROM-CLI
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: https://remote-function-host:8000
maxNumBatchRequests: 10000
functions: statefun.smoke.e2e/${TEST_COMMAND_INTERPRETER_FN}
urlPathTemplate: ${TEST_SERVER_PROTOCOL}${TEST_REMOTE_FUNCTION_HOST}:${TEST_REMOTE_FUNCTION_PORT}
maxNumBatchRequests: ${TEST_NUM_BATCH_REQUESTS}
transport:
type: io.statefun.transports.v1/async
trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void runWith() throws Throwable {
StatefulFunctionsAppContainers.Builder builder =
StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.withModuleGlobalConfiguration("REMOTE_FUNCTION_HOST", "remote-function-host")
.dependsOn(remoteFunction);

SmokeRunner.run(parameters, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
kind: io.statefun.endpoints.v2/http
spec:
functions: statefun.smoke.e2e/command-interpreter-fn
urlPathTemplate: http://remote-function-host:8000
urlPathTemplate: http://${REMOTE_FUNCTION_HOST}:8000
maxNumBatchRequests: 10000
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@

import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*;
import org.apache.flink.statefun.extensions.ComponentBinder;
import org.apache.flink.statefun.extensions.ComponentJsonObject;
import org.apache.flink.statefun.flink.core.spi.ExtensionResolver;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RemoteModule implements StatefulFunctionModule {

private static final Logger LOG = LoggerFactory.getLogger(RemoteModule.class);
private static final Pattern PLACEHOLDER_REGEX = Pattern.compile("\\$\\{(.*?)\\}");
private final List<JsonNode> componentNodes;

RemoteModule(List<JsonNode> componentNodes) {
Expand All @@ -41,8 +49,16 @@ public final class RemoteModule implements StatefulFunctionModule {

@Override
public void configure(Map<String, String> globalConfiguration, Binder moduleBinder) {
Map<String, String> systemPropsThenEnvVarsThenGlobalConfig =
ParameterTool.fromSystemProperties()
.mergeWith(
ParameterTool.fromMap(System.getenv())
.mergeWith(ParameterTool.fromMap(globalConfiguration)))
.toMap();
parseComponentNodes(componentNodes)
.forEach(component -> bindComponent(component, moduleBinder));
.forEach(
component ->
bindComponent(component, moduleBinder, systemPropsThenEnvVarsThenGlobalConfig));
}

private static List<ComponentJsonObject> parseComponentNodes(
Expand All @@ -53,10 +69,102 @@ private static List<ComponentJsonObject> parseComponentNodes(
.collect(Collectors.toList());
}

private static void bindComponent(ComponentJsonObject component, Binder moduleBinder) {
private static void bindComponent(
ComponentJsonObject component, Binder moduleBinder, Map<String, String> configuration) {

JsonNode resolvedSpec = valueResolutionFunction(configuration).apply(component.specJsonNode());
ComponentJsonObject resolvedComponent = new ComponentJsonObject(component.get(), resolvedSpec);

final ExtensionResolver extensionResolver = getExtensionResolver(moduleBinder);
final ComponentBinder componentBinder =
extensionResolver.resolveExtension(component.binderTypename(), ComponentBinder.class);
componentBinder.bind(component, moduleBinder);
extensionResolver.resolveExtension(
resolvedComponent.binderTypename(), ComponentBinder.class);
componentBinder.bind(resolvedComponent, moduleBinder);
}

private static Function<JsonNode, JsonNode> valueResolutionFunction(Map<String, String> config) {
return value -> {
if (value.isObject()) {
return resolveObject((ObjectNode) value, config);
} else if (value.isArray()) {
return resolveArray((ArrayNode) value, config);
} else if (value.isValueNode()) {
return resolveValueNode((ValueNode) value, config);
}

LOG.warn(
"Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node.");
return value;
};
}

private static Function<Map.Entry<String, JsonNode>, AbstractMap.SimpleEntry<String, JsonNode>>
keyValueResolutionFunction(Map<String, String> config) {
return fieldNameValuePair ->
new AbstractMap.SimpleEntry<>(
fieldNameValuePair.getKey(),
valueResolutionFunction(config).apply(fieldNameValuePair.getValue()));
}

private static ValueNode resolveValueNode(ValueNode node, Map<String, String> config) {
StringBuffer stringBuffer = new StringBuffer();
Matcher placeholderMatcher = PLACEHOLDER_REGEX.matcher(node.asText());
boolean placeholderReplaced = false;

while (placeholderMatcher.find()) {
if (config.containsKey(placeholderMatcher.group(1))) {
placeholderMatcher.appendReplacement(stringBuffer, config.get(placeholderMatcher.group(1)));
placeholderReplaced = true;
} else {
throw new IllegalArgumentException(
String.format(
"Could not resolve placeholder '%s'. An entry for this key was not found in the configuration.",
node.asText()));
}
}

if (placeholderReplaced) {
placeholderMatcher.appendTail(stringBuffer);
return new TextNode(stringBuffer.toString());
}

return node;
}

private static ObjectNode resolveObject(ObjectNode node, Map<String, String> config) {
return getFieldStream(node)
.map(keyValueResolutionFunction(config))
.reduce(
new ObjectNode(JsonNodeFactory.instance),
(accumulatedObjectNode, resolvedFieldNameValueTuple) -> {
accumulatedObjectNode.put(
resolvedFieldNameValueTuple.getKey(), resolvedFieldNameValueTuple.getValue());
return accumulatedObjectNode;
},
(objectNode1, objectNode2) -> {
throw new NotImplementedException("This reduce is not used with parallel streams");
});
}

private static ArrayNode resolveArray(ArrayNode node, Map<String, String> config) {
return getElementStream(node)
.map(valueResolutionFunction(config))
.reduce(
new ArrayNode(JsonNodeFactory.instance),
(accumulatedArrayNode, resolvedValue) -> {
accumulatedArrayNode.add(resolvedValue);
return accumulatedArrayNode;
},
(arrayNode1, arrayNode2) -> {
throw new NotImplementedException("This reduce is not used with parallel streams");
});
}

private static Stream<Map.Entry<String, JsonNode>> getFieldStream(ObjectNode node) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.fields(), 0), false);
}

private static Stream<JsonNode> getElementStream(ArrayNode node) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false);
}
}
Loading