Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent pinned CarrierThreads on JDK-21 while using Virtual Threads #1119

Merged
merged 2 commits into from
Sep 25, 2023

Conversation

rjbaucells
Copy link
Contributor

Proposed Changes

This PR replaces some of the synchronized() blocks with ReentrantLock equivalents to prevent pinned CarrierThreads when using JDK-21 Virtual Threads.

When creating a Connection using the ConnectionFactory a consumer application can configure the new connection to use a custom ExecutorService instance:

// create connection factory
var factory = new ConnectionFactory();

// ...

// create executor service (using a new virtual thread / task)
var executorService = Executors.newVirtualThreadPerTaskExecutor();

// create connection
var connection = factory.newConnection(executorService, "my-connection")

RabbitMQ messages received by the application via Channel.basicConsume(...) will execute in a new Virtual Thread created by the provided ExecutorService.

Sending messages via Channel.basicPublish() from one of the Virtual Threads will Pin CarrierThreads preventing the re-use of the Carrier Thread to execute another Virtual Thread. See the following traces logged by using the -Djdk.tracePinnedThreads=full command line argument to the java command:

Thread[#36,ForkJoinPool-1-worker-1,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.park(VirtualThread.java:592)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2639)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
    java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
    java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
    java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
    java.base/jdk.internal.misc.InternalLock.lock(InternalLock.java:74)
    java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:157)
    java.base/java.io.DataOutputStream.writeByte(DataOutputStream.java:161)
    com.rabbitmq.client.impl.Frame.writeTo(Frame.java:193)
    com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:195) <== monitors:1
    com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:634)
    com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:133) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:458) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:431) <== monitors:1
    com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:713)
    com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:217)
    ...
Thread[#50,ForkJoinPool-1-worker-2,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.park(VirtualThread.java:592)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2639)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
    java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
    java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
    java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
    java.base/jdk.internal.misc.InternalLock.lock(InternalLock.java:74)
    java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:157)
    java.base/java.io.DataOutputStream.writeByte(DataOutputStream.java:161)
    com.rabbitmq.client.impl.Frame.writeTo(Frame.java:193)
    com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:195) <== monitors:1
    com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:634)
    com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:133) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:458) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:431) <== monitors:1
    com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:713)
    com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:217)
    ...
Thread[#36,ForkJoinPool-1-worker-1,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.park(VirtualThread.java:592)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2639)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
    java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
    java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
    java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
    com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:206)
    com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:649)
    com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:133) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:502)
    com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:469)
    com.rabbitmq.client.impl.ChannelN.lambda$basicPublish$1(ChannelN.java:729)
    com.rabbitmq.client.observation.NoOpObservationCollector.publish(NoOpObservationCollector.java:32)
    com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:731)
    com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:217)
    ...
Thread[#66,ForkJoinPool-1-worker-4,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:185)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.park(VirtualThread.java:592)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2639)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
    java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
    java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
    java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
    java.base/jdk.internal.misc.InternalLock.lock(InternalLock.java:74)
    java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:157)
    java.base/java.io.DataOutputStream.writeByte(DataOutputStream.java:161)
    com.rabbitmq.client.impl.Frame.writeTo(Frame.java:193)
    com.rabbitmq.client.impl.SocketFrameHandler.writeFrame(SocketFrameHandler.java:208)
    com.rabbitmq.client.impl.AMQConnection.writeFrame(AMQConnection.java:649)
    com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:133) <== monitors:1
    com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:502)
    com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:469)
    com.rabbitmq.client.impl.ChannelN.lambda$basicPublish$1(ChannelN.java:729)
    com.rabbitmq.client.observation.NoOpObservationCollector.publish(NoOpObservationCollector.java:32)
    com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:731)
    com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:217)
    ...

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bugfix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation (correction or otherwise)
  • Cosmetics (whitespace, appearance)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

@michaelklishin
Copy link
Member

@rjbaucells thank you. This looks like a pretty faithful reimplementation.

@acogoluegnes I assume we can ship this kind of change in 5.x? It does not affect Java 8 compatibility as far as I can tell.

@rjbaucells
Copy link
Contributor Author

Yes, the code is 100% Java 8. The ReentrantLock is in JDK 1.8 and it is the recommended lock structure to replace synchronized(monitor) { IO } statement.

I have been running one of my production servers (in sandbox) with the Virtual Thread Executor and those traces I provided in my previous messages were fixed by the change in this PR.

@michaelklishin michaelklishin merged commit 06346ed into rabbitmq:5.x.x-stable Sep 25, 2023
3 checks passed
@michaelklishin
Copy link
Member

Thank you!

@acogoluegnes acogoluegnes added this to the 5.19.0 milestone Sep 25, 2023
acogoluegnes added a commit that referenced this pull request Sep 25, 2023
Not ReentrantLock.

References #1119
acogoluegnes added a commit that referenced this pull request Sep 25, 2023
Not ReentrantLock.

References #1119

(cherry picked from commit 3f72657)
@rjbaucells rjbaucells deleted the virtual-threads branch September 25, 2023 14:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants