Skip to content

Commit

Permalink
Add max inbound message size to ConnectionFactory
Browse files Browse the repository at this point in the history
To avoid OOM with a very large message.
The default value is 64 MiB.

Fixes #1062

(cherry picked from commit 9ed45fd)

Conflicts:
	src/main/java/com/rabbitmq/client/impl/Frame.java
	src/test/java/com/rabbitmq/client/test/TestUtils.java
  • Loading branch information
acogoluegnes committed Jun 29, 2023
1 parent efd0c09 commit 97590ab
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 45 deletions.
33 changes: 30 additions & 3 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -205,6 +205,13 @@ public class ConnectionFactory implements Cloneable {

private CredentialsRefreshService credentialsRefreshService;

/**
* Maximum body size of inbound (received) messages in bytes.
*
* <p>Default value is 67,108,864 (64 MiB).
*/
private int maxInboundMessageBodySize = 1_048_576 * 64;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -970,11 +977,15 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
this.nioParams.setThreadFactory(getThreadFactory());
}
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContextFactory);
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(
connectionTimeout, nioParams, isSSL(), sslContextFactory,
this.maxInboundMessageBodySize);
}
return this.frameHandlerFactory;
} else {
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory, socketConf, isSSL(), this.shutdownExecutor, sslContextFactory);
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory,
socketConf, isSSL(), this.shutdownExecutor, sslContextFactory,
this.maxInboundMessageBodySize);
}

}
Expand Down Expand Up @@ -1273,6 +1284,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
result.setTrafficListener(trafficListener);
result.setCredentialsRefreshService(credentialsRefreshService);
result.setMaxInboundMessageBodySize(maxInboundMessageBodySize);
return result;
}

Expand Down Expand Up @@ -1556,6 +1568,21 @@ public int getChannelRpcTimeout() {
return channelRpcTimeout;
}

/**
* Maximum body size of inbound (received) messages in bytes.
*
* <p>Default value is 67,108,864 (64 MiB).
*
* @param maxInboundMessageBodySize the maximum size of inbound messages
*/
public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
if (maxInboundMessageBodySize <= 0) {
throw new IllegalArgumentException("Max inbound message body size must be greater than 0: "
+ maxInboundMessageBodySize);
}
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}

/**
* The factory to create SSL contexts.
* This provides more flexibility to create {@link SSLContext}s
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -62,7 +62,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
private final int _channelNumber;

/** Command being assembled */
private AMQCommand _command = new AMQCommand();
private AMQCommand _command;

/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcWrapper _activeRpc = null;
Expand All @@ -76,6 +76,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
private final boolean _checkRpcResponseType;

private final TrafficListener _trafficListener;
private final int maxInboundMessageBodySize;

/**
* Construct a channel on the given connection, with the given channel number.
Expand All @@ -91,6 +92,8 @@ public AMQChannel(AMQConnection connection, int channelNumber) {
this._rpcTimeout = connection.getChannelRpcTimeout();
this._checkRpcResponseType = connection.willCheckRpcResponseType();
this._trafficListener = connection.getTrafficListener();
this.maxInboundMessageBodySize = connection.getMaxInboundMessageBodySize();
this._command = new AMQCommand(this.maxInboundMessageBodySize);
}

/**
Expand All @@ -110,7 +113,7 @@ public int getChannelNumber() {
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
_command = new AMQCommand(this.maxInboundMessageBodySize); // prepare for the next one
handleCompleteInboundCommand(command);
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/AMQCommand.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -44,17 +44,21 @@ public class AMQCommand implements Command {
/** The assembler for this command - synchronised on - contains all the state */
private final CommandAssembler assembler;

AMQCommand(int maxBodyLength) {
this(null, null, null, maxBodyLength);
}

/** Construct a command ready to fill in by reading frames */
public AMQCommand() {
this(null, null, null);
this(null, null, null, Integer.MAX_VALUE);
}

/**
* Construct a command with just a method, and without header or body.
* @param method the wrapped method
*/
public AMQCommand(com.rabbitmq.client.Method method) {
this(method, null, null);
this(method, null, null, Integer.MAX_VALUE);
}

/**
Expand All @@ -64,7 +68,19 @@ public AMQCommand(com.rabbitmq.client.Method method) {
* @param body the message body data
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
}

/**
* Construct a command with a specified method, header and body.
* @param method the wrapped method
* @param contentHeader the wrapped content header
* @param body the message body data
* @param maxBodyLength the maximum size for an inbound message body
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
int maxBodyLength) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
}

/** Public API - {@inheritDoc} */
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public static Map<String, Object> defaultClientProperties() {
private volatile ChannelManager _channelManager;
/** Saved server properties field from connection.start */
private volatile Map<String, Object> _serverProperties;
private final int maxInboundMessageBodySize;

/**
* Protected API - respond, in the driver thread, to a ShutdownSignal.
Expand Down Expand Up @@ -244,6 +245,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics

this.credentialsRefreshService = params.getCredentialsRefreshService();


this._channel0 = createChannel0();

this._channelManager = null;
Expand All @@ -257,6 +259,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler, Metrics
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
this.workPoolTimeout = params.getWorkPoolTimeout();
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
}

AMQChannel createChannel0() {
Expand Down Expand Up @@ -1198,4 +1201,8 @@ public boolean willCheckRpcResponseType() {
public TrafficListener getTrafficListener() {
return trafficListener;
}

int getMaxInboundMessageBodySize() {
return maxInboundMessageBodySize;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
// Copyright (c) 2016-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl;

import com.rabbitmq.client.SocketConfigurator;
Expand All @@ -10,10 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory
protected final int connectionTimeout;
protected final SocketConfigurator configurator;
protected final boolean ssl;
protected final int maxInboundMessageBodySize;

protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, boolean ssl) {
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator,
boolean ssl, int maxInboundMessageBodySize) {
this.connectionTimeout = connectionTimeout;
this.configurator = configurator;
this.ssl = ssl;
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}
}
20 changes: 16 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/CommandAssembler.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -21,6 +21,7 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.UnexpectedFrameError;
import static java.lang.String.format;

/**
* Class responsible for piecing together a command from a series of {@link Frame}s.
Expand Down Expand Up @@ -52,12 +53,16 @@ private enum CAState {
/** No bytes of content body not yet accumulated */
private long remainingBodyBytes;

public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
private final int maxBodyLength;

public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
int maxBodyLength) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList<byte[]>(2);
this.bodyN = new ArrayList<>(2);
this.bodyLength = 0;
this.remainingBodyBytes = 0;
this.maxBodyLength = maxBodyLength;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
Expand Down Expand Up @@ -99,7 +104,14 @@ private void consumeMethodFrame(Frame f) throws IOException {
private void consumeHeaderFrame(Frame f) throws IOException {
if (f.type == AMQP.FRAME_HEADER) {
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
this.remainingBodyBytes = this.contentHeader.getBodySize();
long bodySize = this.contentHeader.getBodySize();
if (bodySize >= this.maxBodyLength) {
throw new IllegalStateException(format(
"Message body is too large (%d), maximum size is %d",
bodySize, this.maxBodyLength
));
}
this.remainingBodyBytes = bodySize;
updateContentBodyState();
} else {
throw new UnexpectedFrameError(f, AMQP.FRAME_HEADER);
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -64,6 +64,8 @@ public class ConnectionParams {

private CredentialsRefreshService credentialsRefreshService;

private int maxInboundMessageBodySize;

public ConnectionParams() {}

public CredentialsProvider getCredentialsProvider() {
Expand Down Expand Up @@ -297,4 +299,12 @@ public void setCredentialsRefreshService(CredentialsRefreshService credentialsRe
public CredentialsRefreshService getCredentialsRefreshService() {
return credentialsRefreshService;
}

public int getMaxInboundMessageBodySize() {
return maxInboundMessageBodySize;
}

public void setMaxInboundMessageBodySize(int maxInboundMessageBodySize) {
this.maxInboundMessageBodySize = maxInboundMessageBodySize;
}
}
11 changes: 9 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/Frame.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -26,6 +26,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import static java.lang.String.format;

/**
* Represents an AMQP wire-protocol frame, with frame type, channel number, and payload bytes.
Expand Down Expand Up @@ -83,7 +84,7 @@ public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset,
*
* @return a new Frame if we read a frame successfully, otherwise null
*/
public static Frame readFrom(DataInputStream is) throws IOException {
public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException {
int type;
int channel;

Expand All @@ -109,6 +110,12 @@ public static Frame readFrom(DataInputStream is) throws IOException {

channel = is.readUnsignedShort();
int payloadSize = is.readInt();
if (payloadSize >= maxPayloadSize) {
throw new IllegalStateException(format(
"Frame body is too large (%d), maximum size is %d",
payloadSize, maxPayloadSize
));
}
byte[] payload = new byte[payloadSize];
is.readFully(payload);

Expand Down
12 changes: 8 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -52,22 +52,26 @@ public class SocketFrameHandler implements FrameHandler {
/** Socket's outputstream - data to the broker - synchronized on */
private final DataOutputStream _outputStream;

private final int maxInboundMessageBodySize;

/** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1;

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket) throws IOException {
this(socket, null);
this(socket, null, Integer.MAX_VALUE);
}

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor,
int maxInboundMessageBodySize) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;
this.maxInboundMessageBodySize = maxInboundMessageBodySize;

_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
Expand Down Expand Up @@ -181,7 +185,7 @@ public void initialize(AMQConnection connection) {
@Override
public Frame readFrame() throws IOException {
synchronized (_inputStream) {
return Frame.readFrom(_inputStream);
return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize);
}
}

Expand Down
Loading

0 comments on commit 97590ab

Please sign in to comment.