Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent pinned CarrierThreads on JDK-21 while using Virtual Threads #1119

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

/**
Expand All @@ -54,7 +56,8 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
* so that clients can themselves use the channel to synchronize
* on.
*/
protected final Object _channelMutex = new Object();
protected final ReentrantLock _channelLock = new ReentrantLock();
protected final Condition _channelLockCondition = _channelLock.newCondition();

/** The connection this channel is associated with. */
private final AMQConnection _connection;
Expand Down Expand Up @@ -191,14 +194,17 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
// so it must be a response to an earlier RPC.

if (_checkRpcResponseType) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
// check if this reply command is intended for the current waiting request before calling nextOutstandingRpc()
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
// this reply command is not intended for the current waiting request
// most likely a previous request timed out and this command is the reply for that.
// Throw this reply command away so we don't stop the current request from waiting for its reply
return;
}
} finally {
_channelLock.unlock();
}
}
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
Expand All @@ -220,11 +226,12 @@ public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
}

private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
_channelLockCondition.await();
} catch (InterruptedException e) { //NOSONAR
waitClearedInterruptStatus = true;
// No Sonar: we re-interrupt the thread later
Expand All @@ -234,23 +241,31 @@ private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
} finally {
_channelLock.unlock();
}
}

public boolean isOutstandingRpc()
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
return (_activeRpc != null);
} finally {
_channelLock.unlock();
}
}

public RpcWrapper nextOutstandingRpc()
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
_channelLockCondition.signalAll();
return result;
} finally {
_channelLock.unlock();
}
}

Expand Down Expand Up @@ -344,36 +359,48 @@ private AMQCommand privateRpc(Method m, int timeout)
public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingRpc(m, k);
} finally {
_channelLock.unlock();
}
}

public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
enqueueRpc(k);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}

public void asyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingAsyncRpc(m, future);
} finally {
_channelLock.unlock();
}
}

public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
enqueueAsyncRpc(m, future);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}

Expand Down Expand Up @@ -402,13 +429,16 @@ public void processShutdownSignal(ShutdownSignalException signal,
boolean ignoreClosed,
boolean notifyRpc) {
try {
synchronized (_channelMutex) {
_channelLock.lock();
try {
if (!setShutdownCauseIfOpen(signal)) {
if (!ignoreClosed)
throw new AlreadyClosedException(getCloseReason());
}

_channelMutex.notifyAll();
_channelLockCondition.signalAll();
} finally {
_channelLock.unlock();
}
} finally {
if (notifyRpc)
Expand All @@ -424,30 +454,40 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
}

public void transmit(Method m) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
transmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}

public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingTransmit(c);
} finally {
_channelLock.unlock();
}
}

public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
quiescingTransmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}

public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
_channelLockCondition.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
Expand All @@ -460,6 +500,8 @@ public void quiescingTransmit(AMQCommand c) throws IOException {
}
this._trafficListener.write(c);
c.transmit(this);
} finally {
_channelLock.unlock();
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/AMQCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Command;
Expand All @@ -43,6 +44,7 @@ public class AMQCommand implements Command {

/** The assembler for this command - synchronised on - contains all the state */
private final CommandAssembler assembler;
private final ReentrantLock assemblerLock = new ReentrantLock();

AMQCommand(int maxBodyLength) {
this(null, null, null, maxBodyLength);
Expand Down Expand Up @@ -115,7 +117,8 @@ public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();

synchronized (assembler) {
assemblerLock.lock();
try {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Expand Down Expand Up @@ -145,6 +148,8 @@ public void transmit(AMQChannel channel) throws IOException {
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
} finally {
assemblerLock.unlock();
}

connection.flush();
Expand All @@ -155,7 +160,8 @@ public void transmit(AMQChannel channel) throws IOException {
}

public String toString(boolean suppressBody){
synchronized (assembler) {
assemblerLock.lock();
try {
return new StringBuilder()
.append('{')
.append(this.assembler.getMethod())
Expand All @@ -165,6 +171,8 @@ public String toString(boolean suppressBody){
.append(contentBodyStringBuilder(
this.assembler.getContentBody(), suppressBody))
.append('}').toString();
} finally {
assemblerLock.unlock();
}
}

Expand Down
28 changes: 22 additions & 6 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,13 @@ private void releaseChannel() {
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_channelLock.lock();
try {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
_channelLockCondition.signalAll();
} finally {
_channelLock.unlock();
}
return true;
} else if (method instanceof Basic.Ack) {
Expand Down Expand Up @@ -524,7 +527,8 @@ private void asyncShutdown(Command command) throws IOException {
false,
command.getMethod(),
this);
synchronized (_channelMutex) {
_channelLock.lock();
try {
try {
processShutdownSignal(signal, true, false);
quiescingTransmit(new Channel.CloseOk());
Expand All @@ -533,6 +537,9 @@ private void asyncShutdown(Command command) throws IOException {
notifyOutstandingRpc(signal);
}
}
finally {
_channelLock.unlock();
}
notifyListeners();
}

Expand Down Expand Up @@ -612,9 +619,12 @@ public AMQCommand transformReply(AMQCommand command) {
try {
// Synchronize the block below to avoid race conditions in case
// connection wants to send Connection-CloseOK
synchronized (_channelMutex) {
_channelLock.lock();
try {
startProcessShutdownSignal(signal, !initiatedByApplication, true);
quiescingRpc(reason, k);
} finally {
_channelLock.unlock();
}

// Now that we're in quiescing state, channel.close was sent and
Expand Down Expand Up @@ -1602,16 +1612,22 @@ public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOEx

@Override
public void enqueueRpc(RpcContinuation k) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
super.enqueueRpc(k);
dispatcher.setUnlimited(true);
} finally {
_channelLock.unlock();
}
}

@Override
protected void markRpcFinished() {
synchronized (_channelMutex) {
_channelLock.lock();
try {
dispatcher.setUnlimited(false);
} finally {
_channelLock.unlock();
}
}

Expand Down
Loading