diff --git a/webserver/websocket/pom.xml b/webserver/websocket/pom.xml index e81bd0bf3db..35ce6a67a75 100644 --- a/webserver/websocket/pom.xml +++ b/webserver/websocket/pom.xml @@ -29,6 +29,10 @@ helidon-webserver-websocket Helidon WebServer WebSocket + + -Dio.netty.leakDetectionLevel=paranoid + + io.helidon.webserver diff --git a/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java b/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java index 971aa2d6b42..951d3ddb6b9 100644 --- a/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java +++ b/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. + * Copyright (c) 2022, 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,23 +15,10 @@ */ package io.helidon.webserver.websocket; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.Collectors; - import io.helidon.common.http.Parameters; import io.helidon.common.http.UriComponent; import io.helidon.common.reactive.BufferedEmittingPublisher; import io.helidon.common.reactive.Multi; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; @@ -39,6 +26,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.util.ReferenceCountUtil; import jakarta.websocket.CloseReason; import jakarta.websocket.DeploymentException; import jakarta.websocket.Extension; @@ -52,6 +40,18 @@ import org.glassfish.tyrus.spi.WebSocketEngine; import org.glassfish.tyrus.spi.Writer; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + import static jakarta.websocket.CloseReason.CloseCodes.UNEXPECTED_CONDITION; class WebSocketHandler extends SimpleChannelInboundHandler { @@ -69,12 +69,12 @@ class WebSocketHandler extends SimpleChannelInboundHandler { private final TyrusServerContainer tyrusServerContainer; private volatile Connection connection; private final WebSocketEngine.UpgradeInfo upgradeInfo; - private final BufferedEmittingPublisher emitter; + private final BufferedEmittingPublisher emitter; WebSocketHandler(ChannelHandlerContext ctx, String path, - FullHttpRequest upgradeRequest, - HttpHeaders upgradeResponseHeaders, - WebSocketRouting webSocketRouting) { + FullHttpRequest upgradeRequest, + HttpHeaders upgradeResponseHeaders, + WebSocketRouting webSocketRouting) { int k = path.indexOf('?'); if (k > 0) { this.path = path.substring(0, k); @@ -154,23 +154,28 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf byteBuf) { - emitter.emit(byteBuf.copy().nioBuffer()); + emitter.emit(byteBuf.copy()); } } - private void sendBytesToTyrus(ChannelHandlerContext ctx, ByteBuffer nioBuffer) { - // Pass all data to Tyrus spi - int retries = MAX_RETRIES; - while (nioBuffer.remaining() > 0 && retries-- > 0) { - connection.getReadHandler().handle(nioBuffer); - } + private void sendBytesToTyrus(ChannelHandlerContext ctx, ByteBuf byteBuf) { + try { + ByteBuffer nioBuffer = byteBuf.nioBuffer(); + // Pass all data to Tyrus spi + int retries = MAX_RETRIES; + while (nioBuffer.remaining() > 0 && retries-- > 0) { + connection.getReadHandler().handle(nioBuffer); + } - // If we can't push all data to Tyrus, cancel and report problem - if (retries == 0) { - ctx.close(); - connection.close( - new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not consume all data after " + MAX_RETRIES + " retries") - ); + // If we can't push all data to Tyrus, cancel and report problem + if (retries == 0) { + ctx.close(); + connection.close( + new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not consume all data after " + MAX_RETRIES + " retries") + ); + } + } finally { + ReferenceCountUtil.release(byteBuf); } } @@ -223,20 +228,20 @@ public void write(ByteBuffer byteBuffer, CompletionHandler completio return ctx; }, webSocketRouting.getExecutorService()).thenAccept(c -> Multi.create(emitter) .observeOn(webSocketRouting.getExecutorService()) - .forEach(byteBuffer -> sendBytesToTyrus(c, byteBuffer)) + .forEach(byteBuf -> sendBytesToTyrus(c, byteBuf)) .onError(this::logError) ); } else { this.connection = upgradeInfo.createConnection(writer, WebSocketHandler::close); Multi.create(emitter) - .forEach(byteBuffer -> sendBytesToTyrus(ctx, byteBuffer)) + .forEach(byteBuf -> sendBytesToTyrus(ctx, byteBuf)) .onError(this::logError); } } - private void logError(Throwable throwable){ + private void logError(Throwable throwable) { LOGGER.log(Level.SEVERE, "WS handler ERROR ", throwable); }