Skip to content

Commit

Permalink
Merge pull request #1119 from rjbaucells/virtual-threads
Browse files Browse the repository at this point in the history
Prevent pinned CarrierThreads on JDK-21 while using Virtual Threads
  • Loading branch information
michaelklishin authored Sep 25, 2023
2 parents fb64c5f + 8e855a3 commit 06346ed
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 30 deletions.
78 changes: 60 additions & 18 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

/**
Expand All @@ -54,7 +56,8 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
* so that clients can themselves use the channel to synchronize
* on.
*/
protected final Object _channelMutex = new Object();
protected final ReentrantLock _channelLock = new ReentrantLock();
protected final Condition _channelLockCondition = _channelLock.newCondition();

/** The connection this channel is associated with. */
private final AMQConnection _connection;
Expand Down Expand Up @@ -191,14 +194,17 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
// so it must be a response to an earlier RPC.

if (_checkRpcResponseType) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
// check if this reply command is intended for the current waiting request before calling nextOutstandingRpc()
if (_activeRpc != null && !_activeRpc.canHandleReply(command)) {
// this reply command is not intended for the current waiting request
// most likely a previous request timed out and this command is the reply for that.
// Throw this reply command away so we don't stop the current request from waiting for its reply
return;
}
} finally {
_channelLock.unlock();
}
}
final RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
Expand All @@ -220,11 +226,12 @@ public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
}

private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
try {
_channelMutex.wait();
_channelLockCondition.await();
} catch (InterruptedException e) { //NOSONAR
waitClearedInterruptStatus = true;
// No Sonar: we re-interrupt the thread later
Expand All @@ -234,23 +241,31 @@ private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
Thread.currentThread().interrupt();
}
_activeRpc = rpcWrapperSupplier.get();
} finally {
_channelLock.unlock();
}
}

public boolean isOutstandingRpc()
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
return (_activeRpc != null);
} finally {
_channelLock.unlock();
}
}

public RpcWrapper nextOutstandingRpc()
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
_channelLockCondition.signalAll();
return result;
} finally {
_channelLock.unlock();
}
}

Expand Down Expand Up @@ -344,36 +359,48 @@ private AMQCommand privateRpc(Method m, int timeout)
public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingRpc(m, k);
} finally {
_channelLock.unlock();
}
}

public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
enqueueRpc(k);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}

public void asyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingAsyncRpc(m, future);
} finally {
_channelLock.unlock();
}
}

public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
_channelLock.lock();
try {
enqueueAsyncRpc(m, future);
quiescingTransmit(m);
} finally {
_channelLock.unlock();
}
}

Expand Down Expand Up @@ -402,13 +429,16 @@ public void processShutdownSignal(ShutdownSignalException signal,
boolean ignoreClosed,
boolean notifyRpc) {
try {
synchronized (_channelMutex) {
_channelLock.lock();
try {
if (!setShutdownCauseIfOpen(signal)) {
if (!ignoreClosed)
throw new AlreadyClosedException(getCloseReason());
}

_channelMutex.notifyAll();
_channelLockCondition.signalAll();
} finally {
_channelLock.unlock();
}
} finally {
if (notifyRpc)
Expand All @@ -424,30 +454,40 @@ public void notifyOutstandingRpc(ShutdownSignalException signal) {
}

public void transmit(Method m) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
transmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}

public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
ensureIsOpen();
quiescingTransmit(c);
} finally {
_channelLock.unlock();
}
}

public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
quiescingTransmit(new AMQCommand(m));
} finally {
_channelLock.unlock();
}
}

public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
_channelLock.lock();
try {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
_channelLockCondition.await();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
Expand All @@ -460,6 +500,8 @@ public void quiescingTransmit(AMQCommand c) throws IOException {
}
this._trafficListener.write(c);
c.transmit(this);
} finally {
_channelLock.unlock();
}
}

Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/AMQCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Command;
Expand All @@ -43,6 +44,7 @@ public class AMQCommand implements Command {

/** The assembler for this command - synchronised on - contains all the state */
private final CommandAssembler assembler;
private final ReentrantLock assemblerLock = new ReentrantLock();

AMQCommand(int maxBodyLength) {
this(null, null, null, maxBodyLength);
Expand Down Expand Up @@ -115,7 +117,8 @@ public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();

synchronized (assembler) {
assemblerLock.lock();
try {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Expand Down Expand Up @@ -145,6 +148,8 @@ public void transmit(AMQChannel channel) throws IOException {
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
} finally {
assemblerLock.unlock();
}

connection.flush();
Expand All @@ -155,7 +160,8 @@ public void transmit(AMQChannel channel) throws IOException {
}

public String toString(boolean suppressBody){
synchronized (assembler) {
assemblerLock.lock();
try {
return new StringBuilder()
.append('{')
.append(this.assembler.getMethod())
Expand All @@ -165,6 +171,8 @@ public String toString(boolean suppressBody){
.append(contentBodyStringBuilder(
this.assembler.getContentBody(), suppressBody))
.append('}').toString();
} finally {
assemblerLock.unlock();
}
}

Expand Down
28 changes: 22 additions & 6 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,13 @@ private void releaseChannel() {
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_channelLock.lock();
try {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
_channelLockCondition.signalAll();
} finally {
_channelLock.unlock();
}
return true;
} else if (method instanceof Basic.Ack) {
Expand Down Expand Up @@ -524,7 +527,8 @@ private void asyncShutdown(Command command) throws IOException {
false,
command.getMethod(),
this);
synchronized (_channelMutex) {
_channelLock.lock();
try {
try {
processShutdownSignal(signal, true, false);
quiescingTransmit(new Channel.CloseOk());
Expand All @@ -533,6 +537,9 @@ private void asyncShutdown(Command command) throws IOException {
notifyOutstandingRpc(signal);
}
}
finally {
_channelLock.unlock();
}
notifyListeners();
}

Expand Down Expand Up @@ -612,9 +619,12 @@ public AMQCommand transformReply(AMQCommand command) {
try {
// Synchronize the block below to avoid race conditions in case
// connection wants to send Connection-CloseOK
synchronized (_channelMutex) {
_channelLock.lock();
try {
startProcessShutdownSignal(signal, !initiatedByApplication, true);
quiescingRpc(reason, k);
} finally {
_channelLock.unlock();
}

// Now that we're in quiescing state, channel.close was sent and
Expand Down Expand Up @@ -1602,16 +1612,22 @@ public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOEx

@Override
public void enqueueRpc(RpcContinuation k) {
synchronized (_channelMutex) {
_channelLock.lock();
try {
super.enqueueRpc(k);
dispatcher.setUnlimited(true);
} finally {
_channelLock.unlock();
}
}

@Override
protected void markRpcFinished() {
synchronized (_channelMutex) {
_channelLock.lock();
try {
dispatcher.setUnlimited(false);
} finally {
_channelLock.unlock();
}
}

Expand Down
Loading

0 comments on commit 06346ed

Please sign in to comment.