From 3aeca63127d0f404555f2d8ca7b414e9b54e62a4 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Wed, 21 Jun 2023 17:00:43 -0400 Subject: [PATCH] Release Netty ByteBuf after it is consumed by Tyrus. Issue #7002. Signed-off-by: Santiago Pericasgeertsen --- webserver/websocket/pom.xml | 4 ++++ .../webserver/websocket/WebSocketHandler.java | 14 ++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/webserver/websocket/pom.xml b/webserver/websocket/pom.xml index e81bd0bf3db..35ce6a67a75 100644 --- a/webserver/websocket/pom.xml +++ b/webserver/websocket/pom.xml @@ -29,6 +29,10 @@ helidon-webserver-websocket Helidon WebServer WebSocket + + -Dio.netty.leakDetectionLevel=paranoid + + io.helidon.webserver diff --git a/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java b/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java index 971aa2d6b42..0a81fd80371 100644 --- a/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java +++ b/webserver/websocket/src/main/java/io/helidon/webserver/websocket/WebSocketHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Oracle and/or its affiliates. + * Copyright (c) 2022, 2023 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,7 +69,7 @@ class WebSocketHandler extends SimpleChannelInboundHandler { private final TyrusServerContainer tyrusServerContainer; private volatile Connection connection; private final WebSocketEngine.UpgradeInfo upgradeInfo; - private final BufferedEmittingPublisher emitter; + private final BufferedEmittingPublisher emitter; WebSocketHandler(ChannelHandlerContext ctx, String path, FullHttpRequest upgradeRequest, @@ -154,16 +154,18 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf byteBuf) { - emitter.emit(byteBuf.copy().nioBuffer()); + emitter.emit(byteBuf.copy()); } } - private void sendBytesToTyrus(ChannelHandlerContext ctx, ByteBuffer nioBuffer) { + private void sendBytesToTyrus(ChannelHandlerContext ctx, ByteBuf byteBuf) { // Pass all data to Tyrus spi + ByteBuffer nioBuffer = byteBuf.nioBuffer(); int retries = MAX_RETRIES; while (nioBuffer.remaining() > 0 && retries-- > 0) { connection.getReadHandler().handle(nioBuffer); } + byteBuf.release(); // If we can't push all data to Tyrus, cancel and report problem if (retries == 0) { @@ -223,13 +225,13 @@ public void write(ByteBuffer byteBuffer, CompletionHandler completio return ctx; }, webSocketRouting.getExecutorService()).thenAccept(c -> Multi.create(emitter) .observeOn(webSocketRouting.getExecutorService()) - .forEach(byteBuffer -> sendBytesToTyrus(c, byteBuffer)) + .forEach(byteBuf -> sendBytesToTyrus(c, byteBuf)) .onError(this::logError) ); } else { this.connection = upgradeInfo.createConnection(writer, WebSocketHandler::close); Multi.create(emitter) - .forEach(byteBuffer -> sendBytesToTyrus(ctx, byteBuffer)) + .forEach(byteBuf -> sendBytesToTyrus(ctx, byteBuf)) .onError(this::logError); }