Skip to content

Commit

Permalink
fix: update subs callback to SpringBoot 3.3 (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
dariuszkuc authored Jun 6, 2024
1 parent 2b17eaf commit 028dd64
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 390 deletions.
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ version = 3.0-SNAPSHOT

# dependencies
annotationsVersion = 24.1.0
graphQLJavaVersion = 22.0
graphQLJavaVersion = 22.1
mockWebServerVersion = 4.12.0
protobufVersion = 4.27.0
slf4jVersion = 2.0.13
springBootVersion = 3.3.0-RC1
springGraphQLVersion = 1.3.0-RC1
springBootVersion = 3.3.0
springGraphQLVersion = 1.3.0
reactorVersion = 3.6.6

# test dependencies
Expand Down
58 changes: 26 additions & 32 deletions spring-subscription-callback/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ implementation("com.apollographql.federation:federation-spring-subscription-call

## Usage

In order to enable HTTP subscription callback protocol support you need to configure `CallbackGraphQlHttpHandler` bean in your
application context.
In order to enable HTTP subscription callback protocol support you need to configure `SubscriptionCallbackHandler` and
`CallbackWebGraphQLInterceptor` beans in your application context.

This library provides support for both WebMVC and WebFlux applications so make sure to instantiate correct flavor of protocol.

* `com.apollographql.subscription.webmvc.CallbackGraphQlHttpHandler` for WebMVC applications
* `com.apollographql.subscription.webflux.CallbackGraphQlHttpHandler` for WebFlux applications
`CallbackWebGraphQLInterceptor` works with both WebMVC and WebFlux applications.

Given a subscription

Expand All @@ -88,32 +85,30 @@ We can enable subscription HTTP callback support using following configuration
public class GraphQLConfiguration {

@Bean
public GraphQlHttpHandler graphQlHttpHandler(WebGraphQlHandler webGraphQlHandler) {
return new CallbackGraphQlHttpHandler(webGraphQlHandler);
public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
return new SubscriptionCallbackHandler(graphQlService);
}

// This interceptor defaults to Ordered#LOWEST_PRECEDENCE order as it should run last in chain
// to allow users to still apply other interceptors that handle common stuff (e.g. extracting
// auth headers, etc).
// You can override this behavior by specifying custom order.
@Bean
public CallbackWebGraphQLInterceptor callbackGraphQlInterceptor(
SubscriptionCallbackHandler callbackHandler) {
return new CallbackWebGraphQLInterceptor(callbackHandler);
}

// regular federation transforms
// see https://docs.spring.io/spring-graphql/reference/federation.html
@Bean
public GraphQlSourceBuilderCustomizer customizer(FederationSchemaFactory factory) {
return builder -> builder.schemaFactory(factory::createGraphQLSchema);
}

// regular federation transform
@Bean
public GraphQlSourceBuilderCustomizer federationTransform() {
DataFetcher<?> entityDataFetcher =
env -> {
List<Map<String, Object>> representations = env.getArgument(_Entity.argumentName);
return representations.stream()
.map(
representation -> {
// TODO implement entity data fetcher logic here
return null;
})
.collect(Collectors.toList());
};

return builder ->
builder.schemaFactory(
(registry, wiring) ->
Federation.transform(registry, wiring)
.fetchEntities(entityDataFetcher)
.resolveEntityType(new ClassNameTypeResolver())
.build());
FederationSchemaFactory federationSchemaFactory() {
return new FederationSchemaFactory();
}
}
```
Expand All @@ -124,9 +119,8 @@ your provided scheduler.

```java
@Bean
public GraphQlHttpHandler graphQlHttpHandler(WebGraphQlHandler webGraphQlHandler) {
public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
Scheduler customScheduler = <provide your custom scheduler>;
SubscriptionCallbackHandler subscriptionHandler = new SubscriptionCallbackHandler(webGraphQlHandler, customScheduler);
return new CallbackGraphQlHttpHandler(webGraphQlHandler, subscriptionHandler);
return new SubscriptionCallbackHandler(graphQlService, customScheduler);
}
```
7 changes: 0 additions & 7 deletions spring-subscription-callback/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ plugins {
id("com.apollographql.federation.java-conventions")
}

repositories {
mavenCentral()
maven {
url = uri("https://repo.spring.io/milestone")
}
}

val annotationsVersion: String by project
val graphQLJavaVersion: String by project
val mockWebServerVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.apollographql.subscription;

import static com.apollographql.subscription.callback.SubscriptionCallback.parseSubscriptionCallbackExtension;
import static com.apollographql.subscription.callback.SubscriptionCallbackHandler.SUBSCRIPTION_PROTOCOL_HEADER;
import static com.apollographql.subscription.callback.SubscriptionCallbackHandler.SUBSCRIPTION_PROTOCOL_HEADER_VALUE;

import com.apollographql.subscription.callback.SubscriptionCallbackHandler;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.Ordered;
import org.springframework.graphql.server.WebGraphQlInterceptor;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.graphql.server.WebGraphQlResponse;
import org.springframework.graphql.server.WebSocketGraphQlRequest;
import org.springframework.graphql.support.DefaultExecutionGraphQlResponse;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ResponseStatusException;
import reactor.core.publisher.Mono;

/**
* Interceptor that provides support for Apollo Subscription Callback Protocol. This interceptor
* defaults to {@link Ordered#LOWEST_PRECEDENCE} order as it should run last in chain to allow users
* to still apply other interceptors that handle common stuff (e.g. extracting auth headers, etc).
* You can override this behavior by specifying custom order.
*
* @see <a
* href="https://www.apollographql.com/docs/router/executing-operations/subscription-callback-protocol">Subscription
* Callback Protocol</a>
*/
public class CallbackWebGraphQLInterceptor implements WebGraphQlInterceptor, Ordered {

private static final Log logger = LogFactory.getLog(CallbackWebGraphQLInterceptor.class);

private final SubscriptionCallbackHandler subscriptionCallbackHandler;
private final int order;

public CallbackWebGraphQLInterceptor(SubscriptionCallbackHandler subscriptionCallbackHandler) {
this(subscriptionCallbackHandler, LOWEST_PRECEDENCE);
}

public CallbackWebGraphQLInterceptor(
SubscriptionCallbackHandler subscriptionCallbackHandler, int order) {
this.subscriptionCallbackHandler = subscriptionCallbackHandler;
this.order = order;
}

@Override
public Mono<WebGraphQlResponse> intercept(WebGraphQlRequest request, Chain chain) {
// in order to correctly handle parsing of ANY requests (i.e. it is valid to define a
// document with query fragments first) we would need to parse it which is a much heavier
// operation, we may opt to do it in the future releases
if (!isWebSocketRequest(request) && request.getDocument().startsWith("subscription")) {
return parseSubscriptionCallbackExtension(request.getExtensions())
.flatMap(
callback -> {
if (logger.isDebugEnabled()) {
logger.debug("Starting subscription using callback: " + callback);
}
return this.subscriptionCallbackHandler
.handleSubscriptionUsingCallback(request, callback)
.map(response -> callbackResponse(request, response));
})
.onErrorResume(
(error) -> {
if (logger.isErrorEnabled()) {
logger.error("Unable to start subscription using callback protocol", error);
}
return Mono.error(new ResponseStatusException(HttpStatus.BAD_REQUEST));
});
} else {
return chain.next(request);
}
}

private boolean isWebSocketRequest(WebGraphQlRequest request) {
return request instanceof WebSocketGraphQlRequest;
}

private WebGraphQlResponse callbackResponse(
WebGraphQlRequest request, ExecutionResult callbackResult) {
var callbackExecutionResponse =
new DefaultExecutionGraphQlResponse(request.toExecutionInput(), callbackResult);
var callbackGraphQLResponse = new WebGraphQlResponse(callbackExecutionResponse);
callbackGraphQLResponse
.getResponseHeaders()
.add(SUBSCRIPTION_PROTOCOL_HEADER, SUBSCRIPTION_PROTOCOL_HEADER_VALUE);
return callbackGraphQLResponse;
}

private WebGraphQlResponse errorCallbackResponse(WebGraphQlRequest request) {
var errorCallbackResult =
ExecutionResult.newExecutionResult()
.addError(
GraphQLError.newError()
.message("Unable to start subscription using callback protocol")
.build())
.build();
return callbackResponse(request, errorCallbackResult);
}

@Override
public int getOrder() {
return order;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.apollographql.subscription.callback;

import com.apollographql.subscription.exception.CallbackInitializationFailedException;
import com.apollographql.subscription.exception.InactiveSubscriptionException;
import com.apollographql.subscription.message.CallbackMessageCheck;
import com.apollographql.subscription.message.CallbackMessageComplete;
Expand All @@ -14,7 +15,7 @@
import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.graphql.server.WebGraphQlHandler;
import org.springframework.graphql.ExecutionGraphQlService;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
Expand All @@ -30,20 +31,20 @@ public class SubscriptionCallbackHandler {
public static final String SUBSCRIPTION_PROTOCOL_HEADER = "subscription-protocol";
public static final String SUBSCRIPTION_PROTOCOL_HEADER_VALUE = "callback/1.0";

private final WebGraphQlHandler graphQlHandler;
private final ExecutionGraphQlService graphQlService;
private final Scheduler scheduler;

public SubscriptionCallbackHandler(WebGraphQlHandler graphQlHandler) {
this(graphQlHandler, Schedulers.boundedElastic());
public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService) {
this(graphQlService, Schedulers.boundedElastic());
}

public SubscriptionCallbackHandler(WebGraphQlHandler graphQlHandler, Scheduler scheduler) {
this.graphQlHandler = graphQlHandler;
public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService, Scheduler scheduler) {
this.graphQlService = graphQlService;
this.scheduler = scheduler;
}

@NotNull
public Mono<Boolean> handleSubscriptionUsingCallback(
public Mono<ExecutionResult> handleSubscriptionUsingCallback(
@NotNull WebGraphQlRequest graphQlRequest, @NotNull SubscriptionCallback callback) {
if (logger.isDebugEnabled()) {
logger.debug("Starting subscription callback: " + callback);
Expand All @@ -62,33 +63,31 @@ public Mono<Boolean> handleSubscriptionUsingCallback(
.exchangeToMono(
checkResponse -> {
var responseStatusCode = checkResponse.statusCode();
var subscriptionProtocol =
checkResponse.headers().header(SUBSCRIPTION_PROTOCOL_HEADER);
// var subscriptionProtocol =
// checkResponse.headers().header(SUBSCRIPTION_PROTOCOL_HEADER);

if (responseStatusCode.is2xxSuccessful()) {
// && !subscriptionProtocol.isEmpty() &&
// && !subscriptionProtocol.isEmpty() &&
// "callback".equals(subscriptionProtocol.get(0)))
if (logger.isDebugEnabled()) {
logger.debug("Subscription callback init successful: " + callback);
}

Flux<SubscritionCallbackMessage> subscription =
startSubscription(client, graphQlRequest, callback);
return Mono.just(true)
return Mono.just(emptyResult())
.publishOn(scheduler)
.doOnSubscribe((subscribed) -> subscription.subscribe());
} else {
if (logger.isWarnEnabled()) {
logger.warn(
"Subscription callback failed initialization: "
+ callback
+ ", server responded with: "
+ responseStatusCode.value());
}
return Mono.just(false);
return Mono.error(
new CallbackInitializationFailedException(
callback, responseStatusCode.value()));
}
})
.onErrorReturn(false);
});
}

private ExecutionResult emptyResult() {
return ExecutionResult.newExecutionResult().data(null).build();
}

@NotNull
Expand All @@ -107,8 +106,8 @@ protected Flux<SubscritionCallbackMessage> startSubscription(

// subscription data flux
Flux<SubscritionCallbackMessage> subscriptionFlux =
this.graphQlHandler
.handleRequest(graphQlRequest)
this.graphQlService
.execute(graphQlRequest)
.flatMapMany(
(subscriptionData) -> {
Flux<Map<String, Object>> responseFlux;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.apollographql.subscription.exception;

import com.apollographql.subscription.callback.SubscriptionCallback;

/** Exception thrown when callback initialization fails. */
public class CallbackInitializationFailedException extends RuntimeException {

public CallbackInitializationFailedException(SubscriptionCallback callback, int statusCode) {
super(
"Subscription callback failed initialization: "
+ callback
+ ", server responded with: "
+ statusCode);
}
}
Loading

0 comments on commit 028dd64

Please sign in to comment.