From 4f6322884d42baca0988b3238057a22b774a67b9 Mon Sep 17 00:00:00 2001 From: Fil Karnicki Date: Mon, 21 Mar 2022 22:09:00 +0100 Subject: [PATCH] [FLINK-26570][statefun] Remote module configuration interpolation --- docs/content/docs/modules/overview.md | 33 +++++ .../golang/SmokeVerificationGolangE2E.java | 1 + .../test/resources/remote-module/module.yaml | 2 +- .../smoke/java/SmokeVerificationJavaE2E.java | 5 + .../src/test/resources/Dockerfile | 3 + .../test/resources/remote-module/module.yaml | 6 +- .../e2e/smoke/js/SmokeVerificationJsE2E.java | 1 + .../test/resources/remote-module/module.yaml | 2 +- .../flink/core/jsonmodule/RemoteModule.java | 124 +++++++++++++++-- .../core/jsonmodule/RemoteModuleTest.java | 125 +++++++++++++++--- .../moduleWithMissingPlaceholders.yaml | 18 +++ .../remote-module/moduleWithPlaceholders.yaml | 44 ++++++ .../extensions/ComponentJsonObject.java | 6 +- 13 files changed, 339 insertions(+), 31 deletions(-) create mode 100644 statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithMissingPlaceholders.yaml create mode 100644 statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithPlaceholders.yaml diff --git a/docs/content/docs/modules/overview.md b/docs/content/docs/modules/overview.md index d0174b91d..1306b19a1 100644 --- a/docs/content/docs/modules/overview.md +++ b/docs/content/docs/modules/overview.md @@ -61,3 +61,36 @@ spec: A module YAML file can contain multiple YAML documents, separated by `---`, each representing a component to be included in the application. Each component is defined by a kind typename string and a spec object containing the component's properties. + +# Configuration string interpolation +You can use `${placeholders}` inside `spec` elements. These will be replaced by entries from a configuration map, consisting of: +1. System properties +2. Environment variables +3. flink-conf.yaml entries with prefix 'statefun.module.global-config.' +4. Command line args + +where (4) override (3) which override (2) which override (1). + +Example: +```yaml +kind: io.statefun.endpoints.v2/http +spec: + functions: com.example/* + urlPathTemplate: ${FUNC_PROTOCOL}://${FUNC_DNS}/{function.name} +--- +kind: io.statefun.kafka.v1/ingress +spec: + id: com.example/my-ingress + address: ${KAFKA_ADDRESS}:${KAFKA_PORT} + consumerGroupId: my-consumer-group + topics: + - topic: ${KAFKA_INGRESS_TOPIC} + (...) + properties: + - ssl.truststore.location: ${SSL_TRUSTSTORE_LOCATION} + - ssl.truststore.password: ${SSL_TRUSTSTORE_PASSWORD} + (...) +``` +{{< hint info >}} +Please note that `{function.name}` is not a placeholder to be replaced by entries from the merged configuration. See [url template]({{< ref "docs/modules/http-endpoint" >}}) +{{< /hint >}} \ No newline at end of file diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java index 311eb47b2..8acf4140d 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/java/org/apache/flink/statefun/e2e/smoke/golang/SmokeVerificationGolangE2E.java @@ -48,6 +48,7 @@ public void runWith() throws Throwable { StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS) .withBuildContextFileFromClasspath("remote-module", "/remote-module/") .withBuildContextFileFromClasspath("ssl/", "ssl/") + .withModuleGlobalConfiguration("MAX_NUM_BATCH_REQUESTS", "10000") .dependsOn(remoteFunction); SmokeRunner.run(parameters, builder); diff --git a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml index 687146b70..d2146a333 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-golang/src/test/resources/remote-module/module.yaml @@ -17,4 +17,4 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn urlPathTemplate: https://remote-function-host - maxNumBatchRequests: 10000 + maxNumBatchRequests: ${MAX_NUM_BATCH_REQUESTS} diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java index 5437fa0cf..a523b832a 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java +++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java @@ -48,6 +48,11 @@ public void runWith() throws Throwable { StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS) .withBuildContextFileFromClasspath("remote-module", "/remote-module/") .withBuildContextFileFromClasspath("certs", "/certs/") + .withModuleGlobalConfiguration("TEST_COMMAND_INTERPRETER_FN", "command-interpreter-fn") + .withModuleGlobalConfiguration("TEST_SERVER_PROTOCOL", "https://") + .withModuleGlobalConfiguration("TEST_NUM_BATCH_REQUESTS", "10000") + .withModuleGlobalConfiguration("TEST_REMOTE_FUNCTION_PORT", "8000") + // TEST_REMOTE_FUNCTION_HOST placeholder value is taken from docker env variables .dependsOn(remoteFunction); SmokeRunner.run(parameters, builder); diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile index a629f7786..1946326b8 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile +++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/Dockerfile @@ -20,3 +20,6 @@ COPY statefun-smoke-e2e-driver.jar /opt/statefun/modules/statefun-smoke-e2e/ COPY remote-module/ /opt/statefun/modules/statefun-smoke-e2e/ COPY certs/ /opt/statefun/modules/statefun-smoke-e2e/certs/ COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml + +ENV TEST_REMOTE_FUNCTION_HOST=remote-function-host +ENV TEST_SERVER_PROTOCOL=WILL-BE-REPLACED-BY-GLOBAL-VARIABLE-FROM-CLI diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml index 6836ae58a..8fbf011ec 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/resources/remote-module/module.yaml @@ -15,9 +15,9 @@ kind: io.statefun.endpoints.v2/http spec: - functions: statefun.smoke.e2e/command-interpreter-fn - urlPathTemplate: https://remote-function-host:8000 - maxNumBatchRequests: 10000 + functions: statefun.smoke.e2e/${TEST_COMMAND_INTERPRETER_FN} + urlPathTemplate: ${TEST_SERVER_PROTOCOL}${TEST_REMOTE_FUNCTION_HOST}:${TEST_REMOTE_FUNCTION_PORT} + maxNumBatchRequests: ${TEST_NUM_BATCH_REQUESTS} transport: type: io.statefun.transports.v1/async trust_cacerts: file:/opt/statefun/modules/statefun-smoke-e2e/certs/a_ca.pem diff --git a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java index 9ea704b85..ca4da2bc9 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java +++ b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/java/org/apache/flink/statefun/e2e/smoke/js/SmokeVerificationJsE2E.java @@ -47,6 +47,7 @@ public void runWith() throws Throwable { StatefulFunctionsAppContainers.Builder builder = StatefulFunctionsAppContainers.builder("flink-statefun-cluster", NUM_WORKERS) .withBuildContextFileFromClasspath("remote-module", "/remote-module/") + .withModuleGlobalConfiguration("REMOTE_FUNCTION_HOST", "remote-function-host") .dependsOn(remoteFunction); SmokeRunner.run(parameters, builder); diff --git a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml index ab747d865..11575d247 100644 --- a/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml +++ b/statefun-e2e-tests/statefun-smoke-e2e-js/src/test/resources/remote-module/module.yaml @@ -16,5 +16,5 @@ kind: io.statefun.endpoints.v2/http spec: functions: statefun.smoke.e2e/command-interpreter-fn - urlPathTemplate: http://remote-function-host:8000 + urlPathTemplate: http://${REMOTE_FUNCTION_HOST}:8000 maxNumBatchRequests: 10000 \ No newline at end of file diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java index 5aea97a87..b055d8078 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModule.java @@ -20,19 +20,27 @@ import static org.apache.flink.statefun.flink.core.spi.ExtensionResolverAccessor.getExtensionResolver; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.*; import org.apache.flink.statefun.extensions.ComponentBinder; import org.apache.flink.statefun.extensions.ComponentJsonObject; import org.apache.flink.statefun.flink.core.spi.ExtensionResolver; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class RemoteModule implements StatefulFunctionModule { - + private static final Logger LOG = LoggerFactory.getLogger(RemoteModule.class); + private static final Pattern PLACEHOLDER_REGEX = Pattern.compile("\\$\\{(.*?)\\}"); private final List componentNodes; RemoteModule(List componentNodes) { @@ -41,8 +49,16 @@ public final class RemoteModule implements StatefulFunctionModule { @Override public void configure(Map globalConfiguration, Binder moduleBinder) { + Map systemPropsThenEnvVarsThenGlobalConfig = + ParameterTool.fromSystemProperties() + .mergeWith( + ParameterTool.fromMap(System.getenv()) + .mergeWith(ParameterTool.fromMap(globalConfiguration))) + .toMap(); parseComponentNodes(componentNodes) - .forEach(component -> bindComponent(component, moduleBinder)); + .forEach( + component -> + bindComponent(component, moduleBinder, systemPropsThenEnvVarsThenGlobalConfig)); } private static List parseComponentNodes( @@ -53,10 +69,102 @@ private static List parseComponentNodes( .collect(Collectors.toList()); } - private static void bindComponent(ComponentJsonObject component, Binder moduleBinder) { + private static void bindComponent( + ComponentJsonObject component, Binder moduleBinder, Map configuration) { + + JsonNode resolvedSpec = valueResolutionFunction(configuration).apply(component.specJsonNode()); + ComponentJsonObject resolvedComponent = new ComponentJsonObject(component.get(), resolvedSpec); + final ExtensionResolver extensionResolver = getExtensionResolver(moduleBinder); final ComponentBinder componentBinder = - extensionResolver.resolveExtension(component.binderTypename(), ComponentBinder.class); - componentBinder.bind(component, moduleBinder); + extensionResolver.resolveExtension( + resolvedComponent.binderTypename(), ComponentBinder.class); + componentBinder.bind(resolvedComponent, moduleBinder); + } + + private static Function valueResolutionFunction(Map config) { + return value -> { + if (value.isObject()) { + return resolveObject((ObjectNode) value, config); + } else if (value.isArray()) { + return resolveArray((ArrayNode) value, config); + } else if (value.isValueNode()) { + return resolveValueNode((ValueNode) value, config); + } + + LOG.warn( + "Unrecognised type (not in: object, array, value). Skipping ${placeholder} resolution for that node."); + return value; + }; + } + + private static Function, AbstractMap.SimpleEntry> + keyValueResolutionFunction(Map config) { + return fieldNameValuePair -> + new AbstractMap.SimpleEntry<>( + fieldNameValuePair.getKey(), + valueResolutionFunction(config).apply(fieldNameValuePair.getValue())); + } + + private static ValueNode resolveValueNode(ValueNode node, Map config) { + StringBuffer stringBuffer = new StringBuffer(); + Matcher placeholderMatcher = PLACEHOLDER_REGEX.matcher(node.asText()); + boolean placeholderReplaced = false; + + while (placeholderMatcher.find()) { + if (config.containsKey(placeholderMatcher.group(1))) { + placeholderMatcher.appendReplacement(stringBuffer, config.get(placeholderMatcher.group(1))); + placeholderReplaced = true; + } else { + throw new IllegalArgumentException( + String.format( + "Could not resolve placeholder '%s'. An entry for this key was not found in the configuration.", + node.asText())); + } + } + + if (placeholderReplaced) { + placeholderMatcher.appendTail(stringBuffer); + return new TextNode(stringBuffer.toString()); + } + + return node; + } + + private static ObjectNode resolveObject(ObjectNode node, Map config) { + return getFieldStream(node) + .map(keyValueResolutionFunction(config)) + .reduce( + new ObjectNode(JsonNodeFactory.instance), + (accumulatedObjectNode, resolvedFieldNameValueTuple) -> { + accumulatedObjectNode.put( + resolvedFieldNameValueTuple.getKey(), resolvedFieldNameValueTuple.getValue()); + return accumulatedObjectNode; + }, + (objectNode1, objectNode2) -> { + throw new NotImplementedException("This reduce is not used with parallel streams"); + }); + } + + private static ArrayNode resolveArray(ArrayNode node, Map config) { + return getElementStream(node) + .map(valueResolutionFunction(config)) + .reduce( + new ArrayNode(JsonNodeFactory.instance), + (accumulatedArrayNode, resolvedValue) -> { + accumulatedArrayNode.add(resolvedValue); + return accumulatedArrayNode; + }, + (arrayNode1, arrayNode2) -> { + throw new NotImplementedException("This reduce is not used with parallel streams"); + }); + } + + private static Stream> getFieldStream(ObjectNode node) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.fields(), 0), false); + } + + private static Stream getElementStream(ArrayNode node) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(node.elements(), 0), false); } } diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModuleTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModuleTest.java index fa3efa15e..54894cbca 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModuleTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteModuleTest.java @@ -19,37 +19,35 @@ package org.apache.flink.statefun.flink.core.jsonmodule; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; import java.net.URL; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.statefun.extensions.ComponentBinder; import org.apache.flink.statefun.extensions.ComponentJsonObject; import org.apache.flink.statefun.extensions.ExtensionModule; import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse; import org.apache.flink.statefun.flink.core.message.MessageFactoryKey; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; -import org.apache.flink.statefun.sdk.EgressType; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.IngressType; -import org.apache.flink.statefun.sdk.StatefulFunction; -import org.apache.flink.statefun.sdk.StatefulFunctionProvider; -import org.apache.flink.statefun.sdk.TypeName; -import org.apache.flink.statefun.sdk.io.EgressIdentifier; -import org.apache.flink.statefun.sdk.io.EgressSpec; -import org.apache.flink.statefun.sdk.io.IngressIdentifier; -import org.apache.flink.statefun.sdk.io.IngressSpec; +import org.apache.flink.statefun.sdk.*; +import org.apache.flink.statefun.sdk.io.*; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.junit.Test; public final class RemoteModuleTest { + private static final String TEST_CONFIG_KEY_1 = "key1"; + private static final String TEST_CONFIG_KEY_2 = "key2"; + private static final String TEST_CONFIG_VALUE_1 = "foo"; + private static final String TEST_CONFIG_VALUE_2 = "bar"; private final String modulePath = "remote-module/module.yaml"; + private final String moduleWithPlaceholdersPath = "remote-module/moduleWithPlaceholders.yaml"; + private final String moduleWithMissingPlaceholdersPath = + "remote-module/moduleWithMissingPlaceholders.yaml"; @Test public void exampleUsage() { @@ -63,13 +61,105 @@ public void testComponents() { StatefulFunctionModule module = fromPath(modulePath); StatefulFunctionsUniverse universe = emptyUniverse(); - setupUniverse(universe, module, new TestComponentBindersModule()); + setupUniverse(universe, module, new TestComponentBindersModule(), new HashMap<>()); assertThat(universe.functions(), hasKey(TestComponentBinder1.TEST_FUNCTION_TYPE)); assertThat(universe.ingress(), hasKey(TestComponentBinder2.TEST_INGRESS.id())); assertThat(universe.egress(), hasKey(TestComponentBinder3.TEST_EGRESS.id())); } + @Test + public void configuringComponentsShouldResolvePlaceholders() { + final AtomicInteger counter = new AtomicInteger(); + final Map configuration = new HashMap<>(); + configuration.put(TEST_CONFIG_KEY_1, TEST_CONFIG_VALUE_1); + configuration.put(TEST_CONFIG_KEY_2, TEST_CONFIG_VALUE_2); + + final StatefulFunctionModule module = fromPath(moduleWithPlaceholdersPath); + + setupUniverse( + new StatefulFunctionsUniverse( + MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null)), + module, + (globalConfigurations, binder) -> { + binder.bindExtension( + TypeName.parseFrom("com.foo.bar/test.component.1"), + (ComponentBinder) + (component, remoteModuleBinder) -> { + assertThat( + component.specJsonNode().get("static").textValue(), is("staticValue")); + assertThat( + component.specJsonNode().get("placeholder").textValue(), + is(TEST_CONFIG_VALUE_1)); + counter.incrementAndGet(); + }); + binder.bindExtension( + TypeName.parseFrom("com.foo.bar/test.component.2"), + (ComponentBinder) + (component, remoteModuleBinder) -> { + assertThat( + component.specJsonNode().get("front").textValue(), + is(String.format("%sbar", TEST_CONFIG_VALUE_1))); + assertThat( + component.specJsonNode().get("back").textValue(), + is(String.format("foo%s", TEST_CONFIG_VALUE_2))); + assertThat( + component.specJsonNode().get("two").textValue(), + is(String.format("%s%s", TEST_CONFIG_VALUE_1, TEST_CONFIG_VALUE_2))); + assertThat( + component.specJsonNode().get("mixed").textValue(), + is(String.format("a%sb%sc", TEST_CONFIG_VALUE_1, TEST_CONFIG_VALUE_2))); + + ArrayNode arrayNode = (ArrayNode) component.specJsonNode().get("array"); + assertThat(arrayNode.get(0).textValue(), is(TEST_CONFIG_VALUE_1)); + assertThat(arrayNode.get(1).textValue(), is("bar")); + assertThat(arrayNode.get(2).intValue(), is(1000)); + assertThat(arrayNode.get(3).booleanValue(), is(true)); + + ArrayNode arrayNodeWithObjects = + (ArrayNode) component.specJsonNode().get("arrayWithObjects"); + assertThat( + arrayNodeWithObjects.get(0).get("a").textValue(), is(TEST_CONFIG_VALUE_2)); + assertThat(arrayNodeWithObjects.get(1).get("a").textValue(), is("fizz")); + + ArrayNode arrayWithNestedObjects = + (ArrayNode) component.specJsonNode().get("arrayWithNestedObjects"); + assertThat( + arrayWithNestedObjects.get(0).get("a").get("b").textValue(), is("foo")); + assertThat( + arrayWithNestedObjects.get(0).get("a").get("c").textValue(), + is(TEST_CONFIG_VALUE_1)); + counter.incrementAndGet(); + }); + binder.bindExtension( + TypeName.parseFrom("com.foo.bar/test.component.3"), + (ComponentBinder) + (component, remoteModuleBinder) -> { + assertThat(component.specJsonNode().get("anInt").intValue(), is(1)); + assertThat(component.specJsonNode().get("aBool").booleanValue(), is(true)); + counter.incrementAndGet(); + }); + }, + configuration); + + assertThat(counter.get(), is(3)); // ensure all assertions were run + } + + @Test(expected = IllegalArgumentException.class) + public void configuringComponentsWithMissingPlaceholdersShouldFail() { + final StatefulFunctionModule module = fromPath(moduleWithMissingPlaceholdersPath); + + setupUniverse( + new StatefulFunctionsUniverse( + MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null)), + module, + (globalConfigurations, binder) -> + binder.bindExtension( + TypeName.parseFrom("com.foo.bar/test.component.1"), + (ComponentBinder) (component, remoteModuleBinder) -> {}), + new HashMap<>()); + } + private static StatefulFunctionModule fromPath(String path) { URL moduleUrl = RemoteModuleTest.class.getClassLoader().getResource(path); assertThat(moduleUrl, not(nullValue())); @@ -85,8 +175,9 @@ private static StatefulFunctionsUniverse emptyUniverse() { private static void setupUniverse( StatefulFunctionsUniverse universe, StatefulFunctionModule functionModule, - ExtensionModule extensionModule) { - final Map globalConfig = new HashMap<>(); + ExtensionModule extensionModule, + Map globalConfig) { + extensionModule.configure(globalConfig, universe); functionModule.configure(globalConfig, universe); } diff --git a/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithMissingPlaceholders.yaml b/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithMissingPlaceholders.yaml new file mode 100644 index 000000000..75c48f7df --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithMissingPlaceholders.yaml @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: com.foo.bar/test.component.1 +spec: + valueDoesNotExist: ${iDoNotExist} diff --git a/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithPlaceholders.yaml b/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithPlaceholders.yaml new file mode 100644 index 000000000..75420ef54 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/test/resources/remote-module/moduleWithPlaceholders.yaml @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kind: com.foo.bar/test.component.1 +spec: + static: staticValue + placeholder: ${key1} +--- +kind: com.foo.bar/test.component.2 +spec: + front: ${key1}bar + back: foo${key2} + two: ${key1}${key2} + mixed: a${key1}b${key2}c + array: + - ${key1} + - bar + - 1000 + - true + arrayWithObjects: + - a: ${key2} + - a: fizz + arrayWithNestedObjects: + - a: + b: foo + c: ${key1} +--- +kind: com.foo.bar/test.component.3 +spec: + anInt: 1 + aBool: true +--- \ No newline at end of file diff --git a/statefun-flink/statefun-flink-extensions/src/main/java/org/apache/flink/statefun/extensions/ComponentJsonObject.java b/statefun-flink/statefun-flink-extensions/src/main/java/org/apache/flink/statefun/extensions/ComponentJsonObject.java index fa5395707..bcea5d65d 100644 --- a/statefun-flink/statefun-flink-extensions/src/main/java/org/apache/flink/statefun/extensions/ComponentJsonObject.java +++ b/statefun-flink/statefun-flink-extensions/src/main/java/org/apache/flink/statefun/extensions/ComponentJsonObject.java @@ -56,13 +56,17 @@ public final class ComponentJsonObject { private final JsonNode specJsonNode; public ComponentJsonObject(JsonNode jsonNode) { + this(jsonNode, extractSpecJsonNode((ObjectNode) jsonNode)); + } + + public ComponentJsonObject(JsonNode jsonNode, JsonNode specJsonNode) { Objects.requireNonNull(jsonNode); checkIsObject(jsonNode); this.rawObjectNode = (ObjectNode) jsonNode; this.binderTypename = parseBinderTypename(rawObjectNode); - this.specJsonNode = extractSpecJsonNode(rawObjectNode); + this.specJsonNode = specJsonNode; } /**