Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive messaging Emitter stops working correctly in dev and test modes #40118

Closed
johnaohara opened this issue Apr 17, 2024 · 30 comments
Closed

Comments

@johnaohara
Copy link
Member

Describe the bug

We have an application that uses AMQ broker for async message processing and were experiencing test failures in our test suite where messages are not being passed to the AMQ broker via quarkus-smallrye-reactive-messaging-amqp client

Our test suite uses an AMQ broker that is automatically provisioned by dev services

What I noticed was when we called org.eclipse.microprofile.reactive.messaging.Emitter.send() with a msg payload, the messages were being enqueued in a buffer, but not delivered to the underlying AMQ client. Therefore the messages were not delivered to the broker.

In order to reproduce this issue, I created a sample application from code.quarkus.io, just selecting the Messaging - AMQP Connector [quarkus-messaging-amqp] extension.

I found that if I start that application in dev mode the messages are processed as expected, but if i restart dev mode 3-4 times (by pressing s in the dev console) the messages are no longer delivered to the broker and they are buffered in a queue, in the same way our test suites behaves.

There appears to be a race/bug where the requested counter in io.smallrye.mutiny.operators.multi.builders.BufferItemMultiEmitter is set to 0 during a restart and in test mode and prevents the call to drain() from emitting the messages

This does not appear to happen on all machines. I see this issue in Fedora 39 on x86_64 , but our CI environment (github) or Mac M2 does not demonstrate this behaviour.

Expected behavior

The messages in the sample app to be outputted every time quarkus is restarted in dev mode:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-04-17 12:31:03,893 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-out
2024-04-17 12:31:03,917 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:03,929 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:03,932 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.9.3) started in 6.970s. 
2024-04-17 12:31:03,933 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-04-17 12:31:03,933 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, smallrye-context-propagation, vertx]
2024-04-17 12:31:04,176 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:04,186 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:04,250 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16203: AMQP Receiver listening address words
>> HELLO
>> WITH
>> QUARKUS
>> MESSAGING
>> MESSAGE

Actual behavior

After 1-2 restarts, the messages are no longer dispatched to the AMQ broker:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-04-17 12:31:03,893 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-out
2024-04-17 12:31:03,917 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:03,929 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:03,932 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.9.3) started in 6.970s. 
2024-04-17 12:31:03,933 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-04-17 12:31:03,933 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, smallrye-context-propagation, vertx]
2024-04-17 12:31:04,176 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:04,186 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:04,250 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16203: AMQP Receiver listening address words
>> HELLO
>> WITH
>> QUARKUS
>> MESSAGING
>> MESSAGE
2024-04-17 12:31:07,800 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-04-17 12:31:07,808 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.006s
2024-04-17 12:31:07,886 INFO  [io.qua.sma.dep.processor] (build-2) Configuring the channel 'words-in' to be managed by the connector 'smallrye-amqp'
2024-04-17 12:31:07,886 INFO  [io.qua.sma.dep.processor] (build-2) Configuring the channel 'words-out' to be managed by the connector 'smallrye-amqp'
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-04-17 12:31:08,071 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-in
2024-04-17 12:31:08,072 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-out
2024-04-17 12:31:08,074 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:08,075 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:08,076 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.9.3) started in 0.266s. 
2024-04-17 12:31:08,076 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-04-17 12:31:08,076 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, smallrye-context-propagation, vertx]
2024-04-17 12:31:08,076 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.278s 
2024-04-17 12:31:08,082 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-1) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:08,083 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-1) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:08,086 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-1) SRMSG16203: AMQP Receiver listening address words
2024-04-17 12:31:09,292 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-04-17 12:31:09,294 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.001s
2024-04-17 12:31:09,375 INFO  [io.qua.sma.dep.processor] (build-19) Configuring the channel 'words-in' to be managed by the connector 'smallrye-amqp'
2024-04-17 12:31:09,375 INFO  [io.qua.sma.dep.processor] (build-19) Configuring the channel 'words-out' to be managed by the connector 'smallrye-amqp'
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-04-17 12:31:09,528 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-in
2024-04-17 12:31:09,529 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:33036 for channel words-out
2024-04-17 12:31:09,531 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:09,532 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-04-17 12:31:09,533 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.9.3) started in 0.238s. 
2024-04-17 12:31:09,533 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-04-17 12:31:09,533 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, smallrye-context-propagation, vertx]
2024-04-17 12:31:09,533 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.243s 
2024-04-17 12:31:09,539 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-2) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:09,540 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-2) SRMSG16213: Connection with AMQP broker established
2024-04-17 12:31:09,543 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-2) SRMSG16203: AMQP Receiver listening address words
2024-04-17 12:31:12,554 INFO  [io.quarkus] (Shutdown thread) amq-client-msg-buffer stopped in 0.002s

How to Reproduce?

  1. Create a new Application from code.quakus.io
  2. Select Messaging - AMQP Connector [quarkus-messaging-amqp] extension
  3. Download and start application in dev mode
  4. restart the application in dev mode by pressing s in the terminal

Output of uname -a or ver

Linux fedora 6.8.4-200.fc39.x86_64 #1 SMP PREEMPT_DYNAMIC Thu Apr 4 20:45:21 UTC 2024 x86_64 GNU/Linux

Output of java -version

openjdk version "21.0.1" 2023-10-17 LTS OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode, sharing)

Quarkus version or git rev

No response

Build tool (ie. output of mvnw --version or gradlew --version)

3.9.3

Additional information

No response

Copy link

quarkus-bot bot commented Apr 17, 2024

/cc @cescoffier (reactive-messaging), @ozangunalp (reactive-messaging)

@franz1981
Copy link
Contributor

franz1981 commented Apr 19, 2024

@jponge too

thanks @johnaohara for reporting, I see you already have searched where it doesn't make progress

@franz1981
Copy link
Contributor

I've tried to reproduce it locally on 2 different machines with no success; I have to try with a new laptop and see if I'm lucky

@cescoffier
Copy link
Member

I don't believe we changed anything. @ozangunalp any idea?

@ozangunalp
Copy link
Contributor

@johnaohara Thanks for the detailed explanation.

There weren't any changes to the emitter, except the Mutiny update :)

In AMQP you need to have a listener for the address for produced messages to get delivered, this may happen if during restart producer sends messages before the consumer starts listening.
But you are saying that emitted messages are enqueued in the buffer and not sent to the underlying client, so there must be something else.

@MikkoKauhanen
Copy link

We have similar problem where our applications emitted messages are enqueued into the buffer of BufferItemMultiEmitter class queue but those are not emitted to the downstream AmqpCreditBasedSender. This problem seems to happen rarely after there has been connection issues to message broker Amazon MQ (Active MQ variant)

After a lot of debugging I believe the issue is that the BufferItemMultiEmitter.requested atomic long is decremented while emitting the messages from the queue to downstream to zero but the AmqpCreditBasedSender.requested atomic long is not in sync and has a bigger value.

The AmqpCreditBasedSender.onNext(Message<?> message) only requests new messages once the requested field value is zero.

.subscribe().with(
                        tuple -> {
                            if (tuple != null) { // Serialization issue
                                subscriber.onNext(tuple.getItem2());
                                if (requested.decrementAndGet() == 0) { // no more credit, request more
                                    onNoMoreCredit(tuple.getItem1());
                                }
                            }
                        },
                        subscriber::onError);

Why the AmqpCreditBasedSender.requested gets out of sync seems to happen when there is connection issue after the AmqpSenderImpl.doSend method has invoked sender.send(updated.unwrap(), ack) The ack handler did not get invoked for this message that triggered the doSend.

One way I can reproduce the issue is to suspend the application in ProtonSenderImpl method:

@Override
  public ProtonDelivery send(byte[] tag, Message message, Handler<ProtonDelivery> onUpdated) {
    if (anonymousSender && message.getAddress() == null) {
      throw new IllegalArgumentException("Message must have an address when using anonymous sender.");
    }
    // TODO: prevent odd combination of onRecieved callback + SenderSettleMode.SETTLED, or just allow it?

    Delivery delivery = sender().delivery(tag); // start a new delivery..
    ProtonWritableBufferImpl buffer = new ProtonWritableBufferImpl();
    MessageImpl msg = (MessageImpl) message;
    msg.encode(buffer);
    ReadableBuffer encoded = new ProtonReadableBufferImpl(buffer.getBuffer());

    sender().sendNoCopy(encoded);

and stop the activemq and let the application run and start the activeMQ again.

@johnaohara
Copy link
Member Author

I am coming back to this now.

In our case BufferItemMultiEmitter requested is 0 on the first request, I do not even get far enough down the stack to call any methods on AmqpCreditBasedSender

image

There is nothing complicated in the messaging setup atm. I basically have the following configuration in application.properties:

# schema-sync incoming
mp.messaging.incoming.run-upload-in.connector=smallrye-amqp
mp.messaging.incoming.run-upload-in.address=run-upload
mp.messaging.incoming.run-upload-in.durable=true
mp.messaging.incoming.run-upload-in.container-id=horreum-broker
mp.messaging.incoming.run-upload-in.link-name=run-upload
# schema-sync outgoing
mp.messaging.outgoing.run-upload-out.connector=smallrye-amqp
mp.messaging.outgoing.run-upload-out.address=run-upload
mp.messaging.outgoing.run-upload-out.durable=true
mp.messaging.outgoing.run-upload-out.container-id=horreum-broker
mp.messaging.outgoing.run-upload-out.link-name=run-upload

and an org.eclipse.microprofile.reactive.messaging.Emitter:

    @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
    @Channel("run-upload-out")
    Emitter<Integer> runUploadEmitter;

I am calling the emitter with;

runUploadEmitter.send(uploadID);

I don't think this is a recent behaviour. our CI has been failing intermittently for a while with errors related to this. My machine was working again, but is consistently failing atm.

I will spend some more time digging into what is happening

Thanks

@ozangunalp
Copy link
Contributor

I've been spending some time to reproduce this.
I was thinking this may happen if the send acknowledgement never returns from the AMQP broker, due to the reconnection, but I couldn't reproduce it. I think overall having container-id set helps with that.

I can reproduce some reconnection scenarios, there is indeed one of them which throws an IllegalStateException with the message send not allowed after the sender is closed. because of the retry mechanism we apply on reactive messaging.
It does retry the send operation without retrying to establish the connection, and thus fails.

I think #40592 is also a similar problem.

BTW I observe that because of the credit-based flow control, when a reconnection happens the order of outgoing messages are no longer respected, even with or without our retry mechanism.

@johnaohara having requested to 0 would be expected if there isn't any requests from the downstream (AmqpCreditBasedSender). If we understand and fix what is happening on reconnection I think the flow control in mutiny will work correctly.

@johnaohara
Copy link
Member Author

Hi @ozangunalp thank you for looking into this.

It is worth noting that this problem does not just occur for me on restart, if I start our quarkus based app in dev mode (with the amq broker instantiated by dev services), the first invocation fails as well

@ozangunalp
Copy link
Contributor

@johnaohara I can't reproduce the problem with the dev mode, using the code.quarkus.io amqp 1.0 messaging example (which sends messages at startup using an emitter)

@johnaohara
Copy link
Member Author

@ozangunalp it does not happen in all environments, and is not always reproducible. For example, my machine has been "working" (i.e. tests in our project work etc) for the past couple of weeks, but occasionally fails without any changes to the messaging code paths, or upgrades. Our CI fails occasionally often with messaging related tests where messages are not delivered, but if I re-run the tests they pass.

I looks like a race cond at startup, when BaseMultiEmitter.requested initial value is set, but that property is an AtomicLong so is protected by mem barriers, but idk about the other startup logic. I am going to spend some time digging today

@johnaohara
Copy link
Member Author

@ozangunalp I have noticed that we are currently running on an older quarkus release (3.8.4) and smallrye-reactive-messaging-amqp has been upgrade from 4.18.0 to 4.21.0 between quarkus 3.8.4 and 3.10.1

The implementation of io.smallrye.mutiny.operators.multi.builders.BufferItemMultiEmitter.drain() has been re-factored between those releases.

I will try and upgrade to quarkus 3.10.0 and see if we still have the problem.

@ozangunalp
Copy link
Contributor

I've tested the dev mode both on 3.8.4 and 3.10.1 with a raspberrypi (for a change). I couldn't reproduce it.

There were some instances when forcing restart on dev mode, and not using the container-id, the newly created app would create a new queue for the address, therefore some messages were missing. But when I force the container-id, I no longer have that issue.

@johnaohara During your tests, were you able to check on the Artemis UI whether messages are queued and not delivered to the consumer? Or were messages stuck on the emitter buffer?

@johnaohara
Copy link
Member Author

@ozangunalp yes I checked the Aretmis UI and there are not messages in the queue.

I can see the messages are all backed up in memory in BufferItemMultiEmitter.queue

@johnaohara
Copy link
Member Author

Trying to recreate this consistently has been tricky. I have a very simple application that exhibits the behaviour on the machine that tends to fail: https://github.com/johnaohara/quarkus-issue-40118

This application works as expected on my laptop.

I have not been able to recreate the issue where the emitter fails on the first startup of the application, only on restart

desktop:

$ uname -a
Linux fedora 6.8.8-200.fc39.x86_64 #1 SMP PREEMPT_DYNAMIC Sat Apr 27 17:42:13 UTC 2024 x86_64 GNU/Linux

$ java --version
openjdk 21.0.1 2023-10-17 LTS
OpenJDK Runtime Environment Temurin-21.0.1+12 (build 21.0.1+12-LTS)
OpenJDK 64-Bit Server VM Temurin-21.0.1+12 (build 21.0.1+12-LTS, mixed mode, sharing)

$ docker --version
Docker version 24.0.7, build afdd53b

To reproduce;

  1. Build and start that project in dev mode
$ mvn clean package -DksipTests -DskipITs
$ mvn quarkus:dev

  1. Send a http command to the rest api
    $ curl -X POST http://localhost:8080/api
    You should see some messages written to the console that has been sent through the broker

image

  1. Type "s" into the dev mode console to restart the application
  2. Send another http command to the rest api, on the machine that has problems this fails

image

with error message

2024-05-17 09:59:06,104 ERROR [io.sma.rea.mes.amqp] (executor-thread-1) SRMSG16225: Failure reported for channel `run-upload-out`, closing client: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit value due to lack of requests
	at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$ErrorOnOverflowMultiEmitter.onOverflow(EmitterBasedMulti.java:134)
	at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$NoOverflowBaseMultiEmitter.emit(EmitterBasedMulti.java:105)
	at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.onItem(SerializedMultiEmitter.java:50)
	at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.emit(SerializedMultiEmitter.java:140)
	at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:176)
	at io.smallrye.reactive.messaging.providers.extension.EmitterImpl.send(EmitterImpl.java:31)
	at org.acme.MyMessagingApplication.sendMsg(MyMessagingApplication.java:20)
	at org.acme.MyMessagingApplication_ClientProxy.sendMsg(Unknown Source)
	at org.acme.ApiService.testMessaging(ApiService.java:18)
	at org.acme.ApiService$quarkusrestinvoker$testMessaging_c559ec23349c7474752cecba71550f3d34c5016d.invoke(Unknown Source)
	at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
	at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141)
	at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:599)
	at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)


java.lang.Exception: Missing onFailure/onError handler in the subscriber
	at io.smallrye.mutiny.subscription.Subscribers.lambda$static$0(Subscribers.java:18)
	at io.smallrye.mutiny.subscription.Subscribers$CallbackBasedSubscriber.onFailure(Subscribers.java:93)
	at io.smallrye.mutiny.operators.multi.MultiOnFailureInvoke$MultiOnFailureInvokeProcessor.onFailure(MultiOnFailureInvoke.java:50)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73)
	at io.smallrye.reactive.messaging.amqp.AmqpCreditBasedSender.onError(AmqpCreditBasedSender.java:238)
	at io.smallrye.reactive.messaging.providers.helpers.MultiUtils$1.onFailure(MultiUtils.java:83)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onError(MultiSubscriber.java:73)
	at io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor$2.onError(DevModeSupportConnectorFactoryInterceptor.java:115)
	at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onFailure(MultiSubscriberAdapter.java:32)
	at io.smallrye.mutiny.operators.multi.builders.BaseMultiEmitter.failed(BaseMultiEmitter.java:89)
	at io.smallrye.mutiny.operators.multi.builders.BaseMultiEmitter.fail(BaseMultiEmitter.java:78)
	at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$ErrorOnOverflowMultiEmitter.onOverflow(EmitterBasedMulti.java:134)
	at io.smallrye.mutiny.operators.multi.builders.EmitterBasedMulti$NoOverflowBaseMultiEmitter.emit(EmitterBasedMulti.java:105)
	at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.onItem(SerializedMultiEmitter.java:50)
	at io.smallrye.mutiny.operators.multi.builders.SerializedMultiEmitter.emit(SerializedMultiEmitter.java:140)
	at io.smallrye.reactive.messaging.providers.extension.AbstractEmitter.emit(AbstractEmitter.java:176)
	at io.smallrye.reactive.messaging.providers.extension.EmitterImpl.send(EmitterImpl.java:31)
	at org.acme.MyMessagingApplication.sendMsg(MyMessagingApplication.java:20)
	at org.acme.MyMessagingApplication_ClientProxy.sendMsg(Unknown Source)
	at org.acme.ApiService.testMessaging(ApiService.java:18)
	at org.acme.ApiService$quarkusrestinvoker$testMessaging_c559ec23349c7474752cecba71550f3d34c5016d.invoke(Unknown Source)
	at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
	at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141)
	at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
	at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:599)
	at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
	at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
	at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
	at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
	at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.smallrye.mutiny.subscription.BackPressureFailure: Could not emit value due to lack of requests

@johnaohara
Copy link
Member Author

johnaohara commented May 17, 2024

I have managed to trigger the condition on my "working" machine (although only once)

There appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)

I can set a debug breakpoint on setCredit to occasionally trigger the condition in the working env

If I enable debug messages, on the working machine i see the following log;

2024-05-17 11:01:41,062 INFO  [io.qua.sma.dep.processor] (build-7) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:41,072 INFO  [io.qua.sma.dep.processor] (build-7) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:46,292 INFO  [io.qua.sma.rea.amq.dep.AmqpDevServicesProcessor] (build-21) Dev Services for AMQP started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Damqp.host=localhost -Damqp.port=38345 -Damqp.user=admin -Damqp.password=admin
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-05-17 11:01:46,887 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-out
2024-05-17 11:01:46,911 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:46,923 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:46,995 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 6.614s. Listening on: http://localhost:8080
2024-05-17 11:01:46,995 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:01:46,996 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:01:47,094 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:47,102 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:47,134 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000
2024-05-17 11:01:47,137 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:01:48,820 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-05-17 11:01:48,835 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.013s
2024-05-17 11:01:48,945 INFO  [io.qua.sma.dep.processor] (build-35) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:01:48,945 INFO  [io.qua.sma.dep.processor] (build-35) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-05-17 11:01:49,265 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-in
2024-05-17 11:01:49,267 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:38345 for channel run-upload-out
2024-05-17 11:01:49,269 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:49,270 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:01:49,271 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.436s. Listening on: http://localhost:8080
2024-05-17 11:01:49,272 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:01:49,272 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:01:49,273 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.457s 
2024-05-17 11:01:49,289 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:49,291 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:01:49,296 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:01:49,298 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000

on the failing machine:

2024-05-17 11:00:42,126 INFO  [io.qua.sma.dep.processor] (build-14) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:00:42,126 INFO  [io.qua.sma.dep.processor] (build-14) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-05-17 11:00:42,369 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-in
2024-05-17 11:00:42,370 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-out
2024-05-17 11:00:42,372 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:42,373 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:42,374 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.347s. Listening on: http://localhost:8080
2024-05-17 11:00:42,374 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:00:42,374 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:00:42,375 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.362s 
2024-05-17 11:00:42,381 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:42,382 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:42,383 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16222: Retrieved credits for channel `run-upload-out`: 1000
2024-05-17 11:00:42,385 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-3) SRMSG16203: AMQP Receiver listening address run-upload
2024-05-17 11:00:45,034 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Restarting as requested by the user.
2024-05-17 11:00:45,037 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer stopped in 0.002s
2024-05-17 11:00:45,136 INFO  [io.qua.sma.dep.processor] (build-2) Configuring the channel 'run-upload-in' to be managed by the connector 'smallrye-amqp'
2024-05-17 11:00:45,136 INFO  [io.qua.sma.dep.processor] (build-2) Configuring the channel 'run-upload-out' to be managed by the connector 'smallrye-amqp'
__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ 
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \   
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/   
2024-05-17 11:00:45,357 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-in
2024-05-17 11:00:45,358 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16201: AMQP broker configured to localhost:32974 for channel run-upload-out
2024-05-17 11:00:45,360 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:45,361 INFO  [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2024-05-17 11:00:45,361 INFO  [io.quarkus] (Quarkus Main Thread) amq-client-msg-buffer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 3.10.1) started in 0.323s. Listening on: http://localhost:8080
2024-05-17 11:00:45,362 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2024-05-17 11:00:45,362 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, messaging, messaging-amqp, rest, smallrye-context-propagation, vertx]
2024-05-17 11:00:45,362 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 0.329s 
2024-05-17 11:00:45,368 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:45,369 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16213: Connection with AMQP broker established
2024-05-17 11:00:45,370 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16223: No more credit for channel run-upload-out, requesting more credits
2024-05-17 11:00:45,371 INFO  [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16203: AMQP Receiver listening address run-upload

@franz1981
Copy link
Contributor

here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)

In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?

@johnaohara
Copy link
Member Author

here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)

In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?

They do, I am now a little bit further down the stack, looks like on a restart, we are missing a LINK_FLOW proton event during handshake on the machine that is not working

@gemmellr
Copy link
Contributor

here appears to be a race condition on org.apache.qpid.proton.engine.impl.LinkImpl._credit where the property is accessed via getCredit() and addCredit(int credit)

In theory the proton stuff should always run from within their Netty event loop threads, @gemmellr Am i right?

That is correct, its expressly single threaded, so it cant 'race' unless being mis-used.

@gemmellr
Copy link
Contributor

If you are not seeing a flow event, the most likely reason is a flow frame was not sent. I'd be looking at whether you are falling foul of flow control by the broker to block production (by not flow'ing credit to send anything), e.g due to its current memory and/or disk limits configuration (e.g the broker defaults to a 90% max-disk-usage limit if not otherwise configured).

You can more closely see what is actually sent using env variable PN_TRACE_FRM=true (on client and/or broker sides since both are using proton-j underneath) to provoke a protocol trace to stdout.

@johnaohara
Copy link
Member Author

@gemmellr thank you very much for the info.

wrt the "race"; in quarkus dev mode, when the application is restarted, the i/o processing move from one eventloop thread to another. At start up all buffer processing is running on vert.x-eventloop-thread-01, when it is restarted, the vert.x-eventloop-thread-02 thread processors buffers. That is why I was concerned it might be a race cond. I have since ruled this out as a possible cause by verifying that a restart the thread remains consistent.

I have captured the network packets, and i am seeing the flow packet being retruned from the broker, with the expected number of credits. However, this is not being propagated to ProtonTransport.handleSocketBuffer() (in vert.x)

I am still currently digging through the code path to understand why this packet is not handled correctly.

image

@johnaohara
Copy link
Member Author

So this issue is caused by flow control :(

The disk that was mounted into the broker container was 95% full, the first client connection works, but subsequent connections are not allocated and credits to send requests

@ozangunalp Am wondering if we can capture this state and warn users when this is the case? atm it all appears to work, but there are no credits to send messages. This condition appears to be tested for already in BufferItemMultiEmitter.drain() so could we add a user warning when it detects that there are no credits?

@johnaohara
Copy link
Member Author

Thank you very much @gemmellr for pointing me in the right direction!

@ozangunalp
Copy link
Contributor

@johnaohara So it was the disk mounted to the container that's full?

Actually, we've the message (on debug maybe it can be changed): No more credit for channel run-upload-out, requesting more credits
It tries getting credits every 2 seconds by default.

@johnaohara
Copy link
Member Author

johnaohara commented May 17, 2024

yeah, looking back, it was shown as a debug message:

2024-05-17 11:00:45,370 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-4) SRMSG16223: No more credit for channel run-upload-out, requesting more credits

Maybe this could be a warning? idk what other situations where this could be an expected state.

I think if this message had been a warning I would have started looking into that message

@johnaohara
Copy link
Member Author

@johnaohara So it was the disk mounted to the container that's full?

Yes, that was the root cause, it was at 95% of capacity

@ozangunalp
Copy link
Contributor

I think if this message had been a warning I would have started looking into that message

I agree, let's change that to warn.

@gemmellr
Copy link
Contributor

I wouldnt necessarily do that on every check, you may end up repeatedly emitting the warning any time new credit isnt being granted quite as fast as you could send, which may be entirely expected behaviour, at which point youll instead start getting questions about why it is warning in the course of doing what its meant to be doing (as it actually is now...). I'd either establish a larger time over which it is warned, or say it is actually info.

@johnaohara
Copy link
Member Author

I wouldnt necessarily do that on every check, you may end up repeatedly emitting the warning any time new credit isnt being granted quite as fast as you could send, which may be entirely expected behaviour, at which point youll instead start getting questions about why it is warning in the course of doing what its meant to be doing (as it actually is now...). I'd either establish a larger time over which it is warned, or say it is actually info.

I think that is a fair point, esp. when running in prod mode. When running in Dev/test mode, might we want a logging different level?

@ozangunalp
Copy link
Contributor

Changed the warning in smallrye/smallrye-reactive-messaging#2632, integrated in #41851

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants