Skip to content

Commit

Permalink
simplify integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
dariuszkuc committed Jun 3, 2024
1 parent affc4c0 commit 908a466
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 477 deletions.
10 changes: 8 additions & 2 deletions spring-subscription-callback/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,14 @@ We can enable subscription HTTP callback support using following configuration
public class GraphQLConfiguration {

@Bean
public GraphQlHttpHandler graphQlHttpHandler(WebGraphQlHandler webGraphQlHandler) {
return new CallbackGraphQlHttpHandler(webGraphQlHandler);
public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
return new SubscriptionCallbackHandler(graphQlService);
}

@Bean
public CallbackWebGraphQLInterceptor callbackGraphQlInterceptor(
SubscriptionCallbackHandler callbackHandler) {
return new CallbackWebGraphQLInterceptor(callbackHandler);
}

// regular federation transform
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.apollographql.subscription;

import static com.apollographql.subscription.callback.SubscriptionCallback.parseSubscriptionCallbackExtension;
import static com.apollographql.subscription.callback.SubscriptionCallbackHandler.SUBSCRIPTION_PROTOCOL_HEADER;
import static com.apollographql.subscription.callback.SubscriptionCallbackHandler.SUBSCRIPTION_PROTOCOL_HEADER_VALUE;

import com.apollographql.subscription.callback.SubscriptionCallbackHandler;
import graphql.ExecutionResult;
import graphql.GraphQLError;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.graphql.server.WebGraphQlInterceptor;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.graphql.server.WebGraphQlResponse;
import org.springframework.graphql.server.WebSocketGraphQlRequest;
import org.springframework.graphql.support.DefaultExecutionGraphQlResponse;
import reactor.core.publisher.Mono;

public class CallbackWebGraphQLInterceptor implements WebGraphQlInterceptor {

private static final Log logger = LogFactory.getLog(CallbackWebGraphQLInterceptor.class);

private final SubscriptionCallbackHandler subscriptionCallbackHandler;

public CallbackWebGraphQLInterceptor(SubscriptionCallbackHandler subscriptionCallbackHandler) {
this.subscriptionCallbackHandler = subscriptionCallbackHandler;
}

@Override
public Mono<WebGraphQlResponse> intercept(WebGraphQlRequest request, Chain chain) {
// in order to correctly handle parsing of ANY requests (i.e. it is valid to define a
// document with query fragments first) we would need to parse it which is a much heavier
// operation, we may opt to do it in the future releases
if (!isWebSocketRequest(request) && request.getDocument().startsWith("subscription")) {
return parseSubscriptionCallbackExtension(request.getExtensions())
.flatMap(
callback -> {
if (logger.isDebugEnabled()) {
logger.debug("Starting subscription using callback: " + callback);
}
return this.subscriptionCallbackHandler
.handleSubscriptionUsingCallback(request, callback)
.map(response -> callbackResponse(request, response));
})
.onErrorResume(
(error) -> {
if (logger.isErrorEnabled()) {
logger.error("Unable to start subscription using callback protocol", error);
}
return Mono.just(errorCallbackResponse(request));
});
} else {
return chain.next(request);
}
}

private boolean isWebSocketRequest(WebGraphQlRequest request) {
return request instanceof WebSocketGraphQlRequest;
}

private WebGraphQlResponse callbackResponse(
WebGraphQlRequest request, ExecutionResult callbackResult) {
var callbackExecutionResponse =
new DefaultExecutionGraphQlResponse(request.toExecutionInput(), callbackResult);
var callbackGraphQLResponse = new WebGraphQlResponse(callbackExecutionResponse);
callbackGraphQLResponse
.getResponseHeaders()
.add(SUBSCRIPTION_PROTOCOL_HEADER, SUBSCRIPTION_PROTOCOL_HEADER_VALUE);
return callbackGraphQLResponse;
}

private WebGraphQlResponse errorCallbackResponse(WebGraphQlRequest request) {
var errorCallbackResult =
ExecutionResult.newExecutionResult()
.addError(
GraphQLError.newError()
.message("Unable to start subscription using callback protocol")
.build())
.build();
return callbackResponse(request, errorCallbackResult);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.apollographql.subscription.callback;

import com.apollographql.subscription.exception.CallbackInitializationFailedException;
import com.apollographql.subscription.exception.InactiveSubscriptionException;
import com.apollographql.subscription.message.CallbackMessageCheck;
import com.apollographql.subscription.message.CallbackMessageComplete;
Expand All @@ -14,7 +15,7 @@
import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.graphql.server.WebGraphQlHandler;
import org.springframework.graphql.ExecutionGraphQlService;
import org.springframework.graphql.server.WebGraphQlRequest;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
Expand All @@ -30,20 +31,20 @@ public class SubscriptionCallbackHandler {
public static final String SUBSCRIPTION_PROTOCOL_HEADER = "subscription-protocol";
public static final String SUBSCRIPTION_PROTOCOL_HEADER_VALUE = "callback/1.0";

private final WebGraphQlHandler graphQlHandler;
private final ExecutionGraphQlService graphQlService;
private final Scheduler scheduler;

public SubscriptionCallbackHandler(WebGraphQlHandler graphQlHandler) {
this(graphQlHandler, Schedulers.boundedElastic());
public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService) {
this(graphQlService, Schedulers.boundedElastic());
}

public SubscriptionCallbackHandler(WebGraphQlHandler graphQlHandler, Scheduler scheduler) {
this.graphQlHandler = graphQlHandler;
public SubscriptionCallbackHandler(ExecutionGraphQlService graphQlService, Scheduler scheduler) {
this.graphQlService = graphQlService;
this.scheduler = scheduler;
}

@NotNull
public Mono<Boolean> handleSubscriptionUsingCallback(
public Mono<ExecutionResult> handleSubscriptionUsingCallback(
@NotNull WebGraphQlRequest graphQlRequest, @NotNull SubscriptionCallback callback) {
if (logger.isDebugEnabled()) {
logger.debug("Starting subscription callback: " + callback);
Expand All @@ -62,33 +63,31 @@ public Mono<Boolean> handleSubscriptionUsingCallback(
.exchangeToMono(
checkResponse -> {
var responseStatusCode = checkResponse.statusCode();
var subscriptionProtocol =
checkResponse.headers().header(SUBSCRIPTION_PROTOCOL_HEADER);
// var subscriptionProtocol =
// checkResponse.headers().header(SUBSCRIPTION_PROTOCOL_HEADER);

if (responseStatusCode.is2xxSuccessful()) {
// && !subscriptionProtocol.isEmpty() &&
// && !subscriptionProtocol.isEmpty() &&
// "callback".equals(subscriptionProtocol.get(0)))
if (logger.isDebugEnabled()) {
logger.debug("Subscription callback init successful: " + callback);
}

Flux<SubscritionCallbackMessage> subscription =
startSubscription(client, graphQlRequest, callback);
return Mono.just(true)
return Mono.just(emptyResult())
.publishOn(scheduler)
.doOnSubscribe((subscribed) -> subscription.subscribe());
} else {
if (logger.isWarnEnabled()) {
logger.warn(
"Subscription callback failed initialization: "
+ callback
+ ", server responded with: "
+ responseStatusCode.value());
}
return Mono.just(false);
return Mono.error(
new CallbackInitializationFailedException(
callback, responseStatusCode.value()));
}
})
.onErrorReturn(false);
});
}

private ExecutionResult emptyResult() {
return ExecutionResult.newExecutionResult().data(null).build();
}

@NotNull
Expand All @@ -107,8 +106,8 @@ protected Flux<SubscritionCallbackMessage> startSubscription(

// subscription data flux
Flux<SubscritionCallbackMessage> subscriptionFlux =
this.graphQlHandler
.handleRequest(graphQlRequest)
this.graphQlService
.execute(graphQlRequest)
.flatMapMany(
(subscriptionData) -> {
Flux<Map<String, Object>> responseFlux;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.apollographql.subscription.exception;

import com.apollographql.subscription.callback.SubscriptionCallback;

/** Exception thrown when callback initialization fails. */
public class CallbackInitializationFailedException extends RuntimeException {

public CallbackInitializationFailedException(SubscriptionCallback callback, int statusCode) {
super(
"Subscription callback failed initialization: "
+ callback
+ ", server responded with: "
+ statusCode);
}
}
Loading

0 comments on commit 908a466

Please sign in to comment.