Skip to content

Commit

Permalink
GH-828 Add support for configuring additional routers
Browse files Browse the repository at this point in the history
Resolves #828
  • Loading branch information
olegz committed Mar 24, 2022
1 parent b02fe24 commit dc5128b
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 19 deletions.
56 changes: 54 additions & 2 deletions docs/src/main/asciidoc/spring-cloud-function.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class RoutingFunction implements Function<Object, Object> {
The routing instructions could be communicated in several ways. We support providing instructions via Message headers, System
properties as well as pluggable strategy. So let's look at some of the details

*MessageRoutingCallback*
==== MessageRoutingCallback

The `MessageRoutingCallback` is a strategy to assist with determining the name of the route-to function definition.

Expand Down Expand Up @@ -231,7 +231,7 @@ conflict resolutions in the event multiple mechanisms are used at the same time,
3. Application Properties (Any function)


*Function Filtering*
==== Function Filtering
Filtering is the type of routing where there are only two paths - 'go' or 'discard'. In terms of functions it mean
you only want to invoke a certain function if some condition returns 'true', otherwise you want to discard input.
However, when it comes to discarding input there are many interpretation of what it could mean in the context of your application.
Expand Down Expand Up @@ -261,6 +261,58 @@ due to the nature of the reactive functions which are invoked only once to pass
is handled by the reactor, hence we can not access and/or rely on the routing instructions communicated via individual
values (e.g., Message).

==== Multiple Routers

By default the framework will always have a single routing function configured as described in previous sections. However, there are times when you may need more then one routing function.
In that case you can create your own instance of the `RoutingFunction` bean in addition to the existing one as long as you give it a name other than `functionRouter`.

You can pass `spring.cloud.function.routing-expression` or `spring.cloud.function.definition` to RoutinFunction as key/value pairs in the map.

Here is a simple example

----
@Configuration
protected static class MultipleRouterConfiguration {
@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
----

and a test that demonstrates how it works

`
----
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");
function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}
----

=== Input/Output Enrichment

There are often times when you need to modify or refine an incoming or outgoing Message and to keep your code clean of non-functional concerns. You don’t want to do it inside of your business logic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.function.context.config;

import java.util.Map;
import java.util.function.Function;

import org.apache.commons.logging.Log;
Expand All @@ -34,12 +35,13 @@
import org.springframework.expression.BeanResolver;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.DataBindingPropertyAccessor;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;


/**
* An implementation of Function which acts as a gateway/router by actually
* delegating incoming invocation to a function specified .. .
Expand All @@ -60,6 +62,9 @@ public class RoutingFunction implements Function<Object, Object> {

private final StandardEvaluationContext evalContext = new StandardEvaluationContext();

private final SimpleEvaluationContext headerEvalContext = SimpleEvaluationContext
.forPropertyAccessors(DataBindingPropertyAccessor.forReadOnlyAccess()).build();

private final SpelExpressionParser spelParser = new SpelExpressionParser();

private final FunctionCatalog functionCatalog;
Expand All @@ -72,6 +77,18 @@ public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties funct
this(functionCatalog, functionProperties, null, null);
}

public RoutingFunction(FunctionCatalog functionCatalog, Map<String, String> propertiesMap,
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
this(functionCatalog, extractIntoFunctionProperties(propertiesMap), beanResolver, routingCallback);
}

private static FunctionProperties extractIntoFunctionProperties(Map<String, String> propertiesMap) {
FunctionProperties functionProperties = new FunctionProperties();
functionProperties.setDefinition(propertiesMap.get(FunctionProperties.FUNCTION_DEFINITION));
functionProperties.setRoutingExpression(propertiesMap.get(FunctionProperties.PREFIX + ".routing-expression"));
return functionProperties;
}

public RoutingFunction(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanResolver beanResolver, MessageRoutingCallback routingCallback) {
this.functionCatalog = functionCatalog;
Expand Down Expand Up @@ -124,7 +141,7 @@ private Object route(Object input, boolean originalInputIsPublisher) {
}
}
else if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.routing-expression"))) {
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message);
function = this.functionFromExpression((String) message.getHeaders().get("spring.cloud.function.routing-expression"), message, true);
if (function.isInputTypePublisher()) {
this.assertOriginalInputIsNotPublisher(originalInputIsPublisher);
}
Expand Down Expand Up @@ -193,12 +210,16 @@ private FunctionInvocationWrapper functionFromDefinition(String definition) {
}

private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input) {
return functionFromExpression(routingExpression, input, false);
}

private FunctionInvocationWrapper functionFromExpression(String routingExpression, Object input, boolean isViaHeader) {
Expression expression = spelParser.parseExpression(routingExpression);
if (input instanceof Message) {
input = MessageUtils.toCaseInsensitiveHeadersStructure((Message<?>) input);
}

String functionName = expression.getValue(this.evalContext, input, String.class);
String functionName = isViaHeader ? expression.getValue(this.headerEvalContext, input, String.class) : expression.getValue(this.evalContext, input, String.class);
Assert.hasText(functionName, "Failed to resolve function name based on routing expression '" + functionProperties.getRoutingExpression() + "'");
FunctionInvocationWrapper function = functionCatalog.lookup(functionName);
Assert.notNull(function, "Failed to lookup function to route to based on the expression '"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.cloud.function.context.config;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -24,17 +26,22 @@
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;

/**
*
Expand All @@ -52,13 +59,17 @@ public void before() {
context.close();
}

private FunctionCatalog configureCatalog() {
context = new SpringApplicationBuilder(RoutingFunctionConfiguration.class).run(
private FunctionCatalog configureCatalog(Class<?> configurationClass) {
context = new SpringApplicationBuilder(configurationClass).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.cloud.function.routing.enabled=true");
return context.getBean(FunctionCatalog.class);
}

private FunctionCatalog configureCatalog() {
return configureCatalog(RoutingFunctionConfiguration.class);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testInvocationWithMessageAndHeader() {
Expand Down Expand Up @@ -91,10 +102,7 @@ public void testRoutingReactiveInputWithReactiveFunctionAndDefinitionMessageHead
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));

StepVerifier
.create(resultFlux)
.expectError()
.verify();
StepVerifier.create(resultFlux).expectError().verify();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -106,10 +114,27 @@ public void testRoutingReactiveInputWithReactiveFunctionAndExpressionMessageHead
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
StepVerifier
.create(resultFlux)
.expectError()
.verify();
StepVerifier.create(resultFlux).expectError().verify();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void failWithHeaderProvidedExpressionAccessingRuntime() {
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression",
"T(java.lang.Runtime).getRuntime().exec(\"open -a calculator.app\")")
.build();
try {
function.apply(message);
fail();
}
catch (Exception e) {
assertThat(e.getMessage()).isEqualTo("EL1005E: Type cannot be found 'java.lang.Runtime'");
}

}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -151,7 +176,8 @@ public void testInvocationWithMessageAndRoutingExpressionCaseInsensitive() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testInvocationWithRoutingBeanExpression() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "@reverse.apply(#root.getHeaders().get('func'))");
System.setProperty(FunctionProperties.PREFIX + ".routing-expression",
"@reverse.apply(#root.getHeaders().get('func'))");
FunctionCatalog functionCatalog = this.configureCatalog();
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Expand All @@ -170,16 +196,17 @@ public void testOtherExpectedFailures() {
Assertions.fail();
}
catch (Exception e) {
//ignore
// ignore
}

// non existing function
try {
function.apply(MessageBuilder.withPayload("hello").setHeader(FunctionProperties.PREFIX + ".definition", "blah").build());
function.apply(MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "blah").build());
Assertions.fail();
}
catch (Exception e) {
//ignore
// ignore
}
}

Expand All @@ -197,6 +224,22 @@ public void testInvocationWithMessageComposed() {
assertThat(function.apply(message)).isEqualTo("OLLEH");
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testMultipleRouters() {
System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
assertThat(function).isNotNull();
Message<String> message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("HELLO");

function = functionCatalog.lookup("mySpecialRouter");
assertThat(function).isNotNull();
message = MessageBuilder.withPayload("hello").build();
assertThat(function.apply(message)).isEqualTo("olleh");
}

@EnableAutoConfiguration
@Configuration
protected static class RoutingFunctionConfiguration {
Expand All @@ -216,4 +259,26 @@ public Function<Flux<String>, Flux<String>> echoFlux() {
return f -> f;
}
}

@EnableAutoConfiguration
@Configuration
protected static class MultipleRouterConfiguration {

@Bean
RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
}

@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}

@Bean
public Function<String, String> uppercase() {
return String::toUpperCase;
}
}
}

0 comments on commit dc5128b

Please sign in to comment.