From aa7584d2522face93a387d24c3fde24c23da3a77 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 21 Jan 2021 20:16:21 +0000 Subject: [PATCH] Support for Jetty 10 Closes gh-26123 --- .../reactive/JettyClientHttpResponse.java | 52 +++++- .../reactive/JettyHttpHandlerAdapter.java | 64 +++++-- .../Jetty10WebSocketHandlerAdapter.java | 147 ++++++++++++++++ .../socket/client/JettyWebSocketClient.java | 93 ++++++++-- .../support/HandshakeWebSocketService.java | 18 +- .../Jetty10RequestUpgradeStrategy.java | 154 ++++++++++++++++ .../jetty/Jetty10WebSocketHandlerAdapter.java | 137 +++++++++++++++ .../adapter/jetty/JettyWebSocketSession.java | 144 ++++++++++++--- .../client/jetty/JettyWebSocketClient.java | 88 +++++++++- .../jetty/Jetty10RequestUpgradeStrategy.java | 165 ++++++++++++++++++ .../support/AbstractHandshakeHandler.java | 16 +- 11 files changed, 1006 insertions(+), 72 deletions(-) create mode 100644 spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Jetty10WebSocketHandlerAdapter.java create mode 100644 spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/Jetty10RequestUpgradeStrategy.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/Jetty10WebSocketHandlerAdapter.java create mode 100644 spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/Jetty10RequestUpgradeStrategy.java diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java index 80a1260bb6b7..1382ff2d696d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,13 @@ package org.springframework.http.client.reactive; +import java.lang.reflect.Method; import java.net.HttpCookie; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.reactive.client.ReactiveResponse; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -30,9 +32,11 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import org.springframework.util.ReflectionUtils; /** * {@link ClientHttpResponse} implementation for the Jetty ReactiveStreams HTTP client. @@ -46,6 +50,11 @@ class JettyClientHttpResponse implements ClientHttpResponse { private static final Pattern SAMESITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*"); + private static final ClassLoader loader = JettyClientHttpResponse.class.getClassLoader(); + + private static final boolean jetty10Present = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader); + private final ReactiveResponse reactiveResponse; @@ -58,8 +67,11 @@ public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher adapter = new JettyHeadersAdapter(reactiveResponse.getHeaders()); - this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); + MultiValueMap headers = (jetty10Present ? + Jetty10HttpFieldsHelper.getHttpHeaders(reactiveResponse) : + new JettyHeadersAdapter(reactiveResponse.getHeaders())); + + this.headers = HttpHeaders.readOnlyHttpHeaders(headers); } @@ -110,4 +122,38 @@ public HttpHeaders getHeaders() { return this.headers; } + + private static class Jetty10HttpFieldsHelper { + + private static final Method getHeadersMethod; + + private static final Method getNameMethod; + + private static final Method getValueMethod; + + static { + try { + getHeadersMethod = Response.class.getMethod("getHeaders"); + Class type = loader.loadClass("org.eclipse.jetty.http.HttpField"); + getNameMethod = type.getMethod("getName"); + getValueMethod = type.getMethod("getValue"); + } + catch (ClassNotFoundException | NoSuchMethodException ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + public static HttpHeaders getHttpHeaders(ReactiveResponse response) { + HttpHeaders headers = new HttpHeaders(); + Iterable iterator = (Iterable) + ReflectionUtils.invokeMethod(getHeadersMethod, response.getResponse()); + for (Object field : iterator) { + headers.add( + (String) ReflectionUtils.invokeMethod(getNameMethod, field), + (String) ReflectionUtils.invokeMethod(getValueMethod, field)); + } + return headers; + } + } + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java index 22842c35546b..ddf9022da367 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyHttpHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.MultiValueMap; /** @@ -49,6 +50,10 @@ */ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter { + private static final boolean jetty10Present = ClassUtils.isPresent( + "org.eclipse.jetty.http.CookieCutter", JettyHttpHandlerAdapter.class.getClassLoader()); + + public JettyHttpHandlerAdapter(HttpHandler httpHandler) { super(httpHandler); } @@ -58,16 +63,29 @@ public JettyHttpHandlerAdapter(HttpHandler httpHandler) { protected ServletServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context) throws IOException, URISyntaxException { + // TODO: need to compile against Jetty 10 to use HttpFields (class->interface) + if (jetty10Present) { + return super.createRequest(request, context); + } + Assert.notNull(getServletPath(), "Servlet path is not initialized"); - return new JettyServerHttpRequest(request, context, getServletPath(), getDataBufferFactory(), getBufferSize()); + return new JettyServerHttpRequest( + request, context, getServletPath(), getDataBufferFactory(), getBufferSize()); } @Override protected ServletServerHttpResponse createResponse(HttpServletResponse response, AsyncContext context, ServletServerHttpRequest request) throws IOException { - return new JettyServerHttpResponse( - response, context, getDataBufferFactory(), getBufferSize(), request); + // TODO: need to compile against Jetty 10 to use HttpFields (class->interface) + if (jetty10Present) { + return new BaseJettyServerHttpResponse( + response, context, getDataBufferFactory(), getBufferSize(), request); + } + else { + return new JettyServerHttpResponse( + response, context, getDataBufferFactory(), getBufferSize(), request); + } } @@ -87,7 +105,34 @@ private static MultiValueMap createHeaders(HttpServletRequest re } - private static final class JettyServerHttpResponse extends ServletServerHttpResponse { + private static class BaseJettyServerHttpResponse extends ServletServerHttpResponse { + + BaseJettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, + DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) + throws IOException { + + super(response, asyncContext, bufferFactory, bufferSize, request); + } + + BaseJettyServerHttpResponse(HttpHeaders headers, HttpServletResponse response, AsyncContext asyncContext, + DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) + throws IOException { + + super(headers, response, asyncContext, bufferFactory, bufferSize, request); + } + + @Override + protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { + ByteBuffer input = dataBuffer.asByteBuffer(); + int len = input.remaining(); + ServletResponse response = getNativeResponse(); + ((HttpOutput) response.getOutputStream()).write(input); + return len; + } + } + + + private static final class JettyServerHttpResponse extends BaseJettyServerHttpResponse { JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext, DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) @@ -124,15 +169,6 @@ protected void applyHeaders() { response.setContentLengthLong(contentLength); } } - - @Override - protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException { - ByteBuffer input = dataBuffer.asByteBuffer(); - int len = input.remaining(); - ServletResponse response = getNativeResponse(); - ((HttpOutput) response.getOutputStream()).write(input); - return len; - } } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Jetty10WebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Jetty10WebSocketHandlerAdapter.java new file mode 100644 index 000000000000..02358a7bf9d8 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/Jetty10WebSocketHandlerAdapter.java @@ -0,0 +1,147 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketMessage.Type; +import org.springframework.web.reactive.socket.WebSocketSession; + +/** + * Identical to {@link JettyWebSocketHandlerAdapter}, only excluding the + * {@code onWebSocketFrame} method, since the {@link Frame} argument has moved + * to a different package in Jetty 10. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +@WebSocket +public class Jetty10WebSocketHandlerAdapter { + + private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); + + + private final WebSocketHandler delegateHandler; + + private final Function sessionFactory; + + @Nullable + private JettyWebSocketSession delegateSession; + + + public Jetty10WebSocketHandlerAdapter(WebSocketHandler handler, + Function sessionFactory) { + + Assert.notNull(handler, "WebSocketHandler is required"); + Assert.notNull(sessionFactory, "'sessionFactory' is required"); + this.delegateHandler = handler; + this.sessionFactory = sessionFactory; + } + + + @OnWebSocketConnect + public void onWebSocketConnect(Session session) { + this.delegateSession = this.sessionFactory.apply(session); + this.delegateHandler.handle(this.delegateSession) + .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]") + .subscribe(this.delegateSession); + } + + @OnWebSocketMessage + public void onWebSocketText(String message) { + if (this.delegateSession != null) { + WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message); + this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); + } + } + + @OnWebSocketMessage + public void onWebSocketBinary(byte[] message, int offset, int length) { + if (this.delegateSession != null) { + ByteBuffer buffer = ByteBuffer.wrap(message, offset, length); + WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer); + this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); + } + } + +// TODO: onWebSocketFrame can't be declared without compiling against Jetty 10 +// Jetty 10: org.eclipse.jetty.websocket.api.Frame +// Jetty 9: org.eclipse.jetty.websocket.api.extensions.Frame + +// @OnWebSocketFrame +// public void onWebSocketFrame(Frame frame) { +// if (this.delegateSession != null) { +// if (OpCode.PONG == frame.getOpCode()) { +// ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD); +// WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer); +// this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); +// } +// } +// } + + private WebSocketMessage toMessage(Type type, T message) { + WebSocketSession session = this.delegateSession; + Assert.state(session != null, "Cannot create message without a session"); + if (Type.TEXT.equals(type)) { + byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = session.bufferFactory().wrap(bytes); + return new WebSocketMessage(Type.TEXT, buffer); + } + else if (Type.BINARY.equals(type)) { + DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message); + return new WebSocketMessage(Type.BINARY, buffer); + } + else if (Type.PONG.equals(type)) { + DataBuffer buffer = session.bufferFactory().wrap((ByteBuffer) message); + return new WebSocketMessage(Type.PONG, buffer); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message); + } + } + + @OnWebSocketClose + public void onWebSocketClose(int statusCode, String reason) { + if (this.delegateSession != null) { + this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); + } + } + + @OnWebSocketError + public void onWebSocketError(Throwable cause) { + if (this.delegateSession != null) { + this.delegateSession.handleError(cause); + } + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java index 1074837cbeb2..9bbeb08f845b 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,9 @@ package org.springframework.web.reactive.socket.client; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; +import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,9 +34,12 @@ import org.springframework.context.Lifecycle; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpHeaders; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler; +import org.springframework.web.reactive.socket.adapter.Jetty10WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; @@ -53,6 +58,16 @@ */ public class JettyWebSocketClient implements WebSocketClient, Lifecycle { + private static ClassLoader loader = JettyWebSocketClient.class.getClassLoader(); + + private static final boolean jetty10Present; + + static { + jetty10Present = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.client.JettyUpgradeListener", loader); + } + + private static final Log logger = LogFactory.getLog(JettyWebSocketClient.class); @@ -60,6 +75,9 @@ public class JettyWebSocketClient implements WebSocketClient, Lifecycle { private final boolean externallyManaged; + private final UpgradeHelper upgradeHelper = + (jetty10Present ? new Jetty10UpgradeHelper() : new Jetty9UpgradeHelper()); + /** * Default constructor that creates and manages an instance of a Jetty @@ -147,22 +165,19 @@ private Mono executeInternal(URI url, HttpHeaders headers, WebSocketHandle url, ContextWebSocketHandler.decorate(handler, contextView), completionSink); ClientUpgradeRequest request = new ClientUpgradeRequest(); request.setSubProtocols(handler.getSubProtocols()); - UpgradeListener upgradeListener = new DefaultUpgradeListener(headers); - try { - this.jettyClient.connect(jettyHandler, url, request, upgradeListener); - return completionSink.asMono(); - } - catch (IOException ex) { - return Mono.error(ex); - } + return this.upgradeHelper.upgrade( + this.jettyClient, jettyHandler, url, request, headers, completionSink); }); } private Object createHandler(URI url, WebSocketHandler handler, Sinks.Empty completion) { - return new JettyWebSocketHandlerAdapter(handler, session -> { + Function sessionFactory = session -> { HandshakeInfo info = createHandshakeInfo(url, session); return new JettyWebSocketSession(session, info, DefaultDataBufferFactory.sharedInstance, completion); - }); + }; + return (jetty10Present ? + new Jetty10WebSocketHandlerAdapter(handler, sessionFactory) : + new JettyWebSocketHandlerAdapter(handler, sessionFactory)); } private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) { @@ -173,6 +188,34 @@ private HandshakeInfo createHandshakeInfo(URI url, Session jettySession) { } + /** + * Encapsulate incompatible changes between Jetty 9.4 and 10. + */ + private interface UpgradeHelper { + + Mono upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient, + Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers, + Sinks.Empty completionSink); + } + + + private static class Jetty9UpgradeHelper implements UpgradeHelper { + + @Override + public Mono upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient, + Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers, + Sinks.Empty completionSink) { + + try { + jettyClient.connect(jettyHandler, url, request, new DefaultUpgradeListener(headers)); + return completionSink.asMono(); + } + catch (IOException ex) { + return Mono.error(ex); + } + } + } + private static class DefaultUpgradeListener implements UpgradeListener { private final HttpHeaders headers; @@ -192,4 +235,32 @@ public void onHandshakeResponse(UpgradeResponse response) { } } + private static class Jetty10UpgradeHelper implements UpgradeHelper { + + // On Jetty 9 returns Future, on Jetty 10 returns CompletableFuture + private static final Method connectMethod; + + static { + try { + Class type = loader.loadClass("org.eclipse.jetty.websocket.client.WebSocketClient"); + connectMethod = type.getMethod("connect", Object.class, URI.class, ClientUpgradeRequest.class); + } + catch (ClassNotFoundException | NoSuchMethodException ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + @Override + public Mono upgrade(org.eclipse.jetty.websocket.client.WebSocketClient jettyClient, + Object jettyHandler, URI url, ClientUpgradeRequest request, HttpHeaders headers, + Sinks.Empty completionSink) { + + // TODO: pass JettyUpgradeListener argument to set headers from HttpHeaders (like we do for Jetty 9) + // which would require a JDK Proxy since it is new in Jetty 10 + + ReflectionUtils.invokeMethod(connectMethod, jettyClient, jettyHandler, url, request); + return completionSink.asMono(); + } + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java index d497667e4b53..9854e7578d10 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -68,16 +68,19 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { private static final boolean jettyPresent; + private static final boolean jetty10Present; + private static final boolean undertowPresent; private static final boolean reactorNettyPresent; static { - ClassLoader classLoader = HandshakeWebSocketService.class.getClassLoader(); - tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); - jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader); - undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader); - reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", classLoader); + ClassLoader loader = HandshakeWebSocketService.class.getClassLoader(); + tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader); + jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader); + jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader); + undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader); + reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader); } @@ -117,6 +120,9 @@ private static RequestUpgradeStrategy initUpgradeStrategy() { else if (jettyPresent) { className = "JettyRequestUpgradeStrategy"; } + else if (jetty10Present) { + className = "Jetty10RequestUpgradeStrategy"; + } else if (undertowPresent) { className = "UndertowRequestUpgradeStrategy"; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/Jetty10RequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/Jetty10RequestUpgradeStrategy.java new file mode 100644 index 000000000000..c757fe259aa5 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/Jetty10RequestUpgradeStrategy.java @@ -0,0 +1,154 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.server.upgrade; + +import java.lang.reflect.Method; +import java.util.function.Supplier; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import reactor.core.publisher.Mono; + +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.aop.target.EmptyTargetSource; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; +import org.springframework.util.ReflectionUtils; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler; +import org.springframework.web.reactive.socket.adapter.Jetty10WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A {@link RequestUpgradeStrategy} for use with Jetty 10. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +public class Jetty10RequestUpgradeStrategy implements RequestUpgradeStrategy { + + private static final Class webSocketCreatorClass; + + private static final Method getContainerMethod; + + private static final Method upgradeMethod; + + private static final Method setAcceptedSubProtocol; + + static { + ClassLoader loader = Jetty10RequestUpgradeStrategy.class.getClassLoader(); + try { + webSocketCreatorClass = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketCreator"); + + Class type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer"); + getContainerMethod = type.getMethod("getContainer", ServletContext.class); + upgradeMethod = ReflectionUtils.findMethod(type, "upgrade", (Class[]) null); + + type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse"); + setAcceptedSubProtocol = type.getMethod("setAcceptedSubProtocol", String.class); + } + catch (Exception ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + + @Override + public Mono upgrade( + ServerWebExchange exchange, WebSocketHandler handler, + @Nullable String subProtocol, Supplier handshakeInfoFactory) { + + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + + HttpServletRequest servletRequest = ServerHttpRequestDecorator.getNativeRequest(request); + HttpServletResponse servletResponse = ServerHttpResponseDecorator.getNativeResponse(response); + ServletContext servletContext = servletRequest.getServletContext(); + + HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); + DataBufferFactory factory = response.bufferFactory(); + + // Trigger WebFlux preCommit actions and upgrade + return exchange.getResponse().setComplete() + .then(Mono.deferContextual(contextView -> { + Jetty10WebSocketHandlerAdapter adapter = new Jetty10WebSocketHandlerAdapter( + ContextWebSocketHandler.decorate(handler, contextView), + session -> new JettyWebSocketSession(session, handshakeInfo, factory)); + + try { + Object creator = createJettyWebSocketCreator(adapter, subProtocol); + Object container = ReflectionUtils.invokeMethod(getContainerMethod, null, servletContext); + ReflectionUtils.invokeMethod(upgradeMethod, container, creator, servletRequest, servletResponse); + } + catch (Exception ex) { + return Mono.error(ex); + } + return Mono.empty(); + })); + } + + private static Object createJettyWebSocketCreator( + Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) { + + ProxyFactory factory = new ProxyFactory(EmptyTargetSource.INSTANCE); + factory.addInterface(webSocketCreatorClass); + factory.addAdvice(new WebSocketCreatorInterceptor(adapter, protocol)); + return factory.getProxy(); + } + + + /** + * Proxy for a JettyWebSocketCreator to supply the WebSocket handler and set the sub-protocol. + */ + private static class WebSocketCreatorInterceptor implements MethodInterceptor { + + private final Jetty10WebSocketHandlerAdapter adapter; + + @Nullable + private final String protocol; + + + public WebSocketCreatorInterceptor( + Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) { + + this.adapter = adapter; + this.protocol = protocol; + } + + @Nullable + @Override + public Object invoke(@NonNull MethodInvocation invocation) { + if (this.protocol != null) { + ReflectionUtils.invokeMethod( + setAcceptedSubProtocol, invocation.getArguments()[2], this.protocol); + } + return this.adapter; + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/Jetty10WebSocketHandlerAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/Jetty10WebSocketHandlerAdapter.java new file mode 100644 index 000000000000..c6a4c9f19cda --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/Jetty10WebSocketHandlerAdapter.java @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.adapter.jetty; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.extensions.Frame; + +import org.springframework.util.Assert; +import org.springframework.web.socket.BinaryMessage; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; + +/** + * Identical to {@link JettyWebSocketHandlerAdapter}, only excluding the + * {@code onWebSocketFrame} method, since the {@link Frame} argument has moved + * to a different package in Jetty 10. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +@WebSocket +public class Jetty10WebSocketHandlerAdapter { + + private static final Log logger = LogFactory.getLog(Jetty10WebSocketHandlerAdapter.class); + + + private final WebSocketHandler webSocketHandler; + + private final JettyWebSocketSession wsSession; + + + public Jetty10WebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) { + Assert.notNull(webSocketHandler, "WebSocketHandler must not be null"); + Assert.notNull(wsSession, "WebSocketSession must not be null"); + this.webSocketHandler = webSocketHandler; + this.wsSession = wsSession; + } + + + @OnWebSocketConnect + public void onWebSocketConnect(Session session) { + try { + this.wsSession.initializeNativeSession(session); + this.webSocketHandler.afterConnectionEstablished(this.wsSession); + } + catch (Exception ex) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + } + } + + @OnWebSocketMessage + public void onWebSocketText(String payload) { + TextMessage message = new TextMessage(payload); + try { + this.webSocketHandler.handleMessage(this.wsSession, message); + } + catch (Exception ex) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + } + } + + @OnWebSocketMessage + public void onWebSocketBinary(byte[] payload, int offset, int length) { + BinaryMessage message = new BinaryMessage(payload, offset, length, true); + try { + this.webSocketHandler.handleMessage(this.wsSession, message); + } + catch (Exception ex) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + } + } + +// TODO: onWebSocketFrame can't be declared without compiling against Jetty 10 +// Jetty 10: org.eclipse.jetty.websocket.api.Frame +// Jetty 9: org.eclipse.jetty.websocket.api.extensions.Frame + +// @OnWebSocketFrame +// public void onWebSocketFrame(Frame frame) { +// if (OpCode.PONG == frame.getOpCode()) { +// ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; +// PongMessage message = new PongMessage(payload); +// try { +// this.webSocketHandler.handleMessage(this.wsSession, message); +// } +// catch (Exception ex) { +// ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); +// } +// } +// } + + @OnWebSocketClose + public void onWebSocketClose(int statusCode, String reason) { + CloseStatus closeStatus = new CloseStatus(statusCode, reason); + try { + this.webSocketHandler.afterConnectionClosed(this.wsSession, closeStatus); + } + catch (Exception ex) { + if (logger.isWarnEnabled()) { + logger.warn("Unhandled exception after connection closed for " + this, ex); + } + } + } + + @OnWebSocketError + public void onWebSocketError(Throwable cause) { + try { + this.webSocketHandler.handleTransportError(this.wsSession, cause); + } + catch (Exception ex) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketSession.java index ff4b880981ae..31cfc1addeba 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.web.socket.adapter.jetty; import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URI; import java.security.Principal; @@ -27,13 +28,14 @@ import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; +import org.springframework.util.ReflectionUtils; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PingMessage; @@ -54,6 +56,12 @@ */ public class JettyWebSocketSession extends AbstractWebSocketSession { + private static final ClassLoader loader = JettyWebSocketSession.class.getClassLoader(); + + private static final boolean jetty10Present = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader); + + private final String id; @Nullable @@ -71,6 +79,8 @@ public class JettyWebSocketSession extends AbstractWebSocketSession { @Nullable private Principal user; + private final SessionHelper sessionHelper; + /** * Create a new {@link JettyWebSocketSession} instance. @@ -91,6 +101,7 @@ public JettyWebSocketSession(Map attributes, @Nullable Principal super(attributes); this.id = idGenerator.generateId().toString(); this.user = user; + this.sessionHelper = (jetty10Present ? new Jetty10SessionHelper() : new Jetty9SessionHelper()); } @@ -141,28 +152,32 @@ public InetSocketAddress getRemoteAddress() { return getNativeSession().getRemoteAddress(); } + /** + * This method is a no-op for Jetty. As per {@link Session#getPolicy()}, the + * returned {@code WebSocketPolicy} is read-only and changing it has no effect. + */ @Override public void setTextMessageSizeLimit(int messageSizeLimit) { - checkNativeSessionInitialized(); - getNativeSession().getPolicy().setMaxTextMessageSize(messageSizeLimit); } @Override public int getTextMessageSizeLimit() { checkNativeSessionInitialized(); - return getNativeSession().getPolicy().getMaxTextMessageSize(); + return this.sessionHelper.getTextMessageSizeLimit(getNativeSession()); } + /** + * This method is a no-op for Jetty. As per {@link Session#getPolicy()}, the + * returned {@code WebSocketPolicy} is read-only and changing it has no effect. + */ @Override public void setBinaryMessageSizeLimit(int messageSizeLimit) { - checkNativeSessionInitialized(); - getNativeSession().getPolicy().setMaxBinaryMessageSize(messageSizeLimit); } @Override public int getBinaryMessageSizeLimit() { checkNativeSessionInitialized(); - return getNativeSession().getPolicy().getMaxBinaryMessageSize(); + return this.sessionHelper.getBinaryMessageSizeLimit(getNativeSession()); } @Override @@ -178,22 +193,14 @@ public void initializeNativeSession(Session session) { this.uri = session.getUpgradeRequest().getRequestURI(); HttpHeaders headers = new HttpHeaders(); - headers.putAll(session.getUpgradeRequest().getHeaders()); + Map> nativeHeaders = session.getUpgradeRequest().getHeaders(); + if (!CollectionUtils.isEmpty(nativeHeaders)) { + headers.putAll(nativeHeaders); + } this.headers = HttpHeaders.readOnlyHttpHeaders(headers); this.acceptedProtocol = session.getUpgradeResponse().getAcceptedSubProtocol(); - - List jettyExtensions = session.getUpgradeResponse().getExtensions(); - if (!CollectionUtils.isEmpty(jettyExtensions)) { - List extensions = new ArrayList<>(jettyExtensions.size()); - for (ExtensionConfig jettyExtension : jettyExtensions) { - extensions.add(new WebSocketExtension(jettyExtension.getName(), jettyExtension.getParameters())); - } - this.extensions = Collections.unmodifiableList(extensions); - } - else { - this.extensions = Collections.emptyList(); - } + this.extensions = this.sessionHelper.getExtensions(session); if (this.user == null) { this.user = session.getUpgradeRequest().getUserPrincipal(); @@ -221,13 +228,8 @@ protected void sendPongMessage(PongMessage message) throws IOException { getRemoteEndpoint().sendPong(message.getPayload()); } - private RemoteEndpoint getRemoteEndpoint() throws IOException { - try { - return getNativeSession().getRemote(); - } - catch (WebSocketException ex) { - throw new IOException("Unable to obtain RemoteEndpoint in session " + getId(), ex); - } + private RemoteEndpoint getRemoteEndpoint() { + return getNativeSession().getRemote(); } @Override @@ -235,4 +237,90 @@ protected void closeInternal(CloseStatus status) throws IOException { getNativeSession().close(status.getCode(), status.getReason()); } + + /** + * Encapsulate incompatible changes between Jetty 9.4 and 10. + */ + private interface SessionHelper { + + List getExtensions(Session session); + + int getTextMessageSizeLimit(Session session); + + int getBinaryMessageSizeLimit(Session session); + } + + + private static class Jetty9SessionHelper implements SessionHelper { + + @Override + public List getExtensions(Session session) { + List configs = session.getUpgradeResponse().getExtensions(); + if (!CollectionUtils.isEmpty(configs)) { + List result = new ArrayList<>(configs.size()); + for (ExtensionConfig config : configs) { + result.add(new WebSocketExtension(config.getName(), config.getParameters())); + } + return Collections.unmodifiableList(result); + } + return Collections.emptyList(); + } + + @Override + public int getTextMessageSizeLimit(Session session) { + return session.getPolicy().getMaxTextMessageSize(); + } + + @Override + public int getBinaryMessageSizeLimit(Session session) { + return session.getPolicy().getMaxBinaryMessageSize(); + } + } + + + private static class Jetty10SessionHelper implements SessionHelper { + + private static final Method getTextMessageSizeLimitMethod; + + private static final Method getBinaryMessageSizeLimitMethod; + + static { + try { + Class type = loader.loadClass("org.eclipse.jetty.websocket.api.WebSocketPolicy"); + getTextMessageSizeLimitMethod = type.getMethod("getMaxTextMessageSize"); + getBinaryMessageSizeLimitMethod = type.getMethod("getMaxBinaryMessageSize"); + } + catch (ClassNotFoundException | NoSuchMethodException ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + // TODO: Extension info can't be accessed without compiling against Jetty 10 + // Jetty 10: org.eclipse.jetty.websocket.api.ExtensionConfig + // Jetty 9: org.eclipse.jetty.websocket.api.extensions.ExtensionConfig + + @Override + public List getExtensions(Session session) { + return Collections.emptyList(); + } + + // TODO: WebSocketPolicy can't be accessed without compiling against Jetty 10 (class -> interface) + + @Override + @SuppressWarnings("ConstantConditions") + public int getTextMessageSizeLimit(Session session) { + long result = (long) ReflectionUtils.invokeMethod(getTextMessageSizeLimitMethod, session.getPolicy()); + Assert.state(result <= Integer.MAX_VALUE, "textMessageSizeLimit is larger than Integer.MAX_VALUE"); + return (int) result; + } + + @Override + @SuppressWarnings("ConstantConditions") + public int getBinaryMessageSizeLimit(Session session) { + long result = (long) ReflectionUtils.invokeMethod(getBinaryMessageSizeLimitMethod, session.getPolicy()); + Assert.state(result <= Integer.MAX_VALUE, "binaryMessageSizeLimit is larger than Integer.MAX_VALUE"); + return (int) result; + } + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java index 4c8b2d37426a..982bbfa61e44 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/client/jetty/JettyWebSocketClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.web.socket.client.jetty; +import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; import java.security.Principal; import java.util.List; @@ -34,11 +36,14 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; +import org.springframework.util.ReflectionUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask; import org.springframework.web.socket.WebSocketExtension; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.adapter.jetty.Jetty10WebSocketHandlerAdapter; import org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter; import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession; import org.springframework.web.socket.adapter.jetty.WebSocketToJettyExtensionConfigAdapter; @@ -60,11 +65,32 @@ */ public class JettyWebSocketClient extends AbstractWebSocketClient implements Lifecycle { + private static ClassLoader loader = JettyWebSocketClient.class.getClassLoader(); + + private static final boolean jetty10Present; + + private static final Method setHeadersMethod; + + static { + jetty10Present = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.client.JettyUpgradeListener", loader); + try { + setHeadersMethod = ClientUpgradeRequest.class.getMethod("setHeaders", Map.class); + } + catch (NoSuchMethodException ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + private final org.eclipse.jetty.websocket.client.WebSocketClient client; @Nullable private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); + private final UpgradeHelper upgradeHelper = + (jetty10Present ? new Jetty10UpgradeHelper() : new Jetty9UpgradeHelper()); + /** * Default constructor that creates an instance of @@ -148,14 +174,15 @@ public ListenableFuture doHandshakeInternal(WebSocketHandler w request.addExtensions(new WebSocketToJettyExtensionConfigAdapter(e)); } - headers.forEach(request::setHeader); + // Jetty 9: setHeaders declared in UpgradeRequestAdapter base class + // Jetty 10: setHeaders declared in ClientUpgradeRequest + ReflectionUtils.invokeMethod(setHeadersMethod, request, headers); Principal user = getUser(); - final JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user); - final JettyWebSocketHandlerAdapter listener = new JettyWebSocketHandlerAdapter(wsHandler, wsSession); + JettyWebSocketSession wsSession = new JettyWebSocketSession(attributes, user); Callable connectTask = () -> { - Future future = this.client.connect(listener, uri, request); + Future future = this.upgradeHelper.connect(this.client, uri, request, wsHandler, wsSession); future.get(this.client.getConnectTimeout() + 2000, TimeUnit.MILLISECONDS); return wsSession; }; @@ -179,4 +206,55 @@ protected Principal getUser() { return null; } + + /** + * Encapsulate incompatible changes between Jetty 9.4 and 10. + */ + private interface UpgradeHelper { + + Future connect(WebSocketClient client, URI url, ClientUpgradeRequest request, + WebSocketHandler handler, JettyWebSocketSession session) throws IOException; + } + + + private static class Jetty9UpgradeHelper implements UpgradeHelper { + + @Override + public Future connect(WebSocketClient client, URI url, ClientUpgradeRequest request, + WebSocketHandler handler, JettyWebSocketSession session) throws IOException { + + JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(handler, session); + return client.connect(adapter, url, request); + } + } + + + private static class Jetty10UpgradeHelper implements UpgradeHelper { + + // On Jetty 9 returns Future, on Jetty 10 returns CompletableFuture + private static final Method connectMethod; + + static { + try { + Class type = loader.loadClass("org.eclipse.jetty.websocket.client.WebSocketClient"); + connectMethod = type.getMethod("connect", Object.class, URI.class, ClientUpgradeRequest.class); + } + catch (ClassNotFoundException | NoSuchMethodException ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + @Override + @SuppressWarnings({"ConstantConditions", "unchecked"}) + public Future connect(WebSocketClient client, URI url, ClientUpgradeRequest request, + WebSocketHandler handler, JettyWebSocketSession session) { + + Jetty10WebSocketHandlerAdapter adapter = new Jetty10WebSocketHandlerAdapter(handler, session); + + // TODO: pass JettyUpgradeListener argument to set headers from HttpHeaders (like we do for Jetty 9) + // which would require a JDK Proxy since it is new in Jetty 10 + return (Future) ReflectionUtils.invokeMethod(connectMethod, client, adapter, url, request); + } + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/Jetty10RequestUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/Jetty10RequestUpgradeStrategy.java new file mode 100644 index 000000000000..5be9bd925bb3 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/Jetty10RequestUpgradeStrategy.java @@ -0,0 +1,165 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.socket.server.jetty; + +import java.lang.reflect.Method; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.Principal; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.eclipse.jetty.websocket.server.HandshakeRFC6455; + +import org.springframework.aop.framework.ProxyFactory; +import org.springframework.aop.target.EmptyTargetSource; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.http.server.ServletServerHttpRequest; +import org.springframework.http.server.ServletServerHttpResponse; +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; +import org.springframework.web.socket.WebSocketExtension; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.adapter.jetty.Jetty10WebSocketHandlerAdapter; +import org.springframework.web.socket.adapter.jetty.JettyWebSocketSession; +import org.springframework.web.socket.server.HandshakeFailureException; +import org.springframework.web.socket.server.RequestUpgradeStrategy; + +/** + * A {@link RequestUpgradeStrategy} for Jetty 10. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +public class Jetty10RequestUpgradeStrategy implements RequestUpgradeStrategy { + + private static final String[] SUPPORTED_VERSIONS = new String[] { String.valueOf(HandshakeRFC6455.VERSION) }; + + private static final Class webSocketCreatorClass; + + private static final Method getContainerMethod; + + private static final Method upgradeMethod; + + private static final Method setAcceptedSubProtocol; + + static { + ClassLoader loader = Jetty10RequestUpgradeStrategy.class.getClassLoader(); + try { + webSocketCreatorClass = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketCreator"); + + Class type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer"); + getContainerMethod = type.getMethod("getContainer", ServletContext.class); + upgradeMethod = ReflectionUtils.findMethod(type, "upgrade", (Class[]) null); + + type = loader.loadClass("org.eclipse.jetty.websocket.server.JettyServerUpgradeResponse"); + setAcceptedSubProtocol = type.getMethod("setAcceptedSubProtocol", String.class); + } + catch (Exception ex) { + throw new IllegalStateException("No compatible Jetty version found", ex); + } + } + + + @Override + public String[] getSupportedVersions() { + return SUPPORTED_VERSIONS; + } + + @Override + public List getSupportedExtensions(ServerHttpRequest request) { + return Collections.emptyList(); + } + + + @Override + public void upgrade(ServerHttpRequest request, ServerHttpResponse response, + @Nullable String selectedProtocol, List selectedExtensions, + @Nullable Principal user, WebSocketHandler handler, Map attributes) + throws HandshakeFailureException { + + Assert.isInstanceOf(ServletServerHttpRequest.class, request, "ServletServerHttpRequest required"); + HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); + ServletContext servletContext = servletRequest.getServletContext(); + + Assert.isInstanceOf(ServletServerHttpResponse.class, response, "ServletServerHttpResponse required"); + HttpServletResponse servletResponse = ((ServletServerHttpResponse) response).getServletResponse(); + + JettyWebSocketSession session = new JettyWebSocketSession(attributes, user); + Jetty10WebSocketHandlerAdapter handlerAdapter = new Jetty10WebSocketHandlerAdapter(handler, session); + + try { + Object creator = createJettyWebSocketCreator(handlerAdapter, selectedProtocol); + Object container = ReflectionUtils.invokeMethod(getContainerMethod, null, servletContext); + ReflectionUtils.invokeMethod(upgradeMethod, container, creator, servletRequest, servletResponse); + } + catch (UndeclaredThrowableException ex) { + throw new HandshakeFailureException("Failed to upgrade", ex.getUndeclaredThrowable()); + } + catch (Exception ex) { + throw new HandshakeFailureException("Failed to upgrade", ex); + } + } + + private static Object createJettyWebSocketCreator( + Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) { + + ProxyFactory factory = new ProxyFactory(EmptyTargetSource.INSTANCE); + factory.addInterface(webSocketCreatorClass); + factory.addAdvice(new WebSocketCreatorInterceptor(adapter, protocol)); + return factory.getProxy(); + } + + + /** + * Proxy for a JettyWebSocketCreator to supply the WebSocket handler and set the sub-protocol. + */ + private static class WebSocketCreatorInterceptor implements MethodInterceptor { + + private final Jetty10WebSocketHandlerAdapter adapter; + + @Nullable + private final String protocol; + + + public WebSocketCreatorInterceptor( + Jetty10WebSocketHandlerAdapter adapter, @Nullable String protocol) { + + this.adapter = adapter; + this.protocol = protocol; + } + + @Nullable + @Override + public Object invoke(@NonNull MethodInvocation invocation) { + if (this.protocol != null) { + ReflectionUtils.invokeMethod( + setAcceptedSubProtocol, invocation.getArguments()[2], this.protocol); + } + return this.adapter; + } + } + +} diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/AbstractHandshakeHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/AbstractHandshakeHandler.java index 1f62d257b9d8..8ca56429a4e8 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/server/support/AbstractHandshakeHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/support/AbstractHandshakeHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,9 +71,11 @@ */ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle { + private static final boolean tomcatWsPresent; + private static final boolean jettyWsPresent; - private static final boolean tomcatWsPresent; + private static final boolean jetty10WsPresent; private static final boolean undertowWsPresent; @@ -85,10 +87,12 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life static { ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader(); - jettyWsPresent = ClassUtils.isPresent( - "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader); tomcatWsPresent = ClassUtils.isPresent( "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); + jetty10WsPresent = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader); + jettyWsPresent = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader); undertowWsPresent = ClassUtils.isPresent( "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader); glassfishWsPresent = ClassUtils.isPresent( @@ -97,7 +101,6 @@ public abstract class AbstractHandshakeHandler implements HandshakeHandler, Life "weblogic.websocket.tyrus.TyrusServletWriter", classLoader); websphereWsPresent = ClassUtils.isPresent( "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader); - } @@ -137,6 +140,9 @@ private static RequestUpgradeStrategy initRequestUpgradeStrategy() { else if (jettyWsPresent) { className = "org.springframework.web.socket.server.jetty.JettyRequestUpgradeStrategy"; } + else if (jetty10WsPresent) { + className = "org.springframework.web.socket.server.jetty.Jetty10RequestUpgradeStrategy"; + } else if (undertowWsPresent) { className = "org.springframework.web.socket.server.standard.UndertowRequestUpgradeStrategy"; }