-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unable to send email when consuming a kafka topic #3223
Comments
@cescoffier I think this one is for you. |
I have simplify the project |
I have found a workaround by using a mailer.send(Mail.withHtml(sendTo, subject, content)).toCompletableFuture().get(); it will fail too (same exception). So I guess we should not lock a kafka consumer. |
I fix this issue by returning a CompletionStage from the ReactiveMailer to the Kafka Consumer. The last commit show it. I guess we could close this issue. But maybe that the documentation could be enrich to explain this behavior ? |
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you! |
@cescoffier can you have a look at this one and see if it's normal or not? |
This prevents a potential deadlock. Fixes quarkusio#3223
This prevents a potential deadlock. Fixes quarkusio#3223
This prevents a potential deadlock. Fixes quarkusio#3223
This prevents a potential deadlock. Fixes quarkusio#3223
This prevents a potential deadlock. Fixes quarkusio#3223
Describe the bug
I want to send a mail from a kafka consumer. However it does not work, the Vertx event loop is contently failing and the mail is not send.
Expected behavior
A mail should be sent when consuming a kafka topic.
Actual behavior
The Vertx event loop infinitely fail by throwing this kind of exception:
AVERTISSEMENT: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2174 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
at io.quarkus.mailer.impl.BlockingMailerImpl.send(BlockingMailerImpl.java:21)
at io.quarkus.mailer.impl.BlockingMailerImpl_ClientProxy.send(BlockingMailerImpl_ClientProxy.zig:147)
at com.damdamdeo.email_notifier.infrastructure.MailerEmailNotification.notify(MailerEmailNotification.java:27)
at com.damdamdeo.email_notifier.infrastructure.MailerEmailNotification_ClientProxy.notify(MailerEmailNotification_ClientProxy.zig:146)
at com.damdamdeo.email_notifier.facade.TodoCreatedEvent.handle(TodoCreatedEvent.java:40)
at com.damdamdeo.email_notifier.facade.EventStoreEventConsumer.onMessage(EventStoreEventConsumer.java:62)
at com.damdamdeo.email_notifier.facade.EventStoreEventConsumer_Subclass.onMessage$$superaccessor4(EventStoreEventConsumer_Subclass.zig:262)
at com.damdamdeo.email_notifier.facade.EventStoreEventConsumer_Subclass$$function$$4.apply(EventStoreEventConsumer_Subclass$$function$$4.zig:51)
at io.quarkus.arc.InvocationContextImpl.interceptorChainCompleted(InvocationContextImpl.java:141)
at io.quarkus.arc.InvocationContextImpl.proceed(InvocationContextImpl.java:161)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:95)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.invokeInOurTx(TransactionalInterceptorBase.java:82)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.doIntercept(TransactionalInterceptorRequired.java:32)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorBase.intercept(TransactionalInterceptorBase.java:51)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:26)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(TransactionalInterceptorRequired_Bean.zig:289)
at io.quarkus.arc.InvocationContextImpl$InterceptorInvocation.invoke(InvocationContextImpl.java:254)
at io.quarkus.arc.InvocationContextImpl.invokeNext(InvocationContextImpl.java:133)
at io.quarkus.arc.InvocationContextImpl.proceed(InvocationContextImpl.java:157)
at com.damdamdeo.email_notifier.facade.EventStoreEventConsumer_Subclass.onMessage(EventStoreEventConsumer_Subclass.zig:107)
at com.damdamdeo.email_notifier.facade.EventStoreEventConsumer_ClientProxy.onMessage(EventStoreEventConsumer_ClientProxy.zig:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.smallrye.reactive.messaging.AbstractMediator.lambda$initialize$0(AbstractMediator.java:48)
at io.smallrye.reactive.messaging.AbstractMediator$$Lambda$610/1726759945.invoke(Unknown Source)
at io.smallrye.reactive.messaging.AbstractMediator.invoke(AbstractMediator.java:61)
at io.smallrye.reactive.messaging.SubscriberMediator.lambda$processMethodReturningACompletionStage$6(SubscriberMediator.java:161)
at io.smallrye.reactive.messaging.SubscriberMediator$$Lambda$612/705957093.apply(Unknown Source)
at io.smallrye.reactive.streams.stages.FlatMapCompletionStageFactory$FlatMapCompletionStage.lambda$apply$0(FlatMapCompletionStageFactory.java:46)
at io.smallrye.reactive.streams.stages.FlatMapCompletionStageFactory$FlatMapCompletionStage$$Lambda$631/955994360.apply(Unknown Source)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:132)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:501)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onNext(FlowableFlatMap.java:665)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.subscriptions.DeferredScalarSubscription.complete(DeferredScalarSubscription.java:117)
at io.reactivex.processors.AsyncProcessor.subscribeActual(AsyncProcessor.java:242)
at io.reactivex.Flowable.subscribe(Flowable.java:14826)
at io.reactivex.Flowable.subscribe(Flowable.java:14773)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:163)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.reactive.streams.utils.ConnectableProcessor.onNext(ConnectableProcessor.java:124)
at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onNext(CompletionSubscriber.java:85)
at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onNext(CompletionSubscriber.java:85)
at io.smallrye.reactive.messaging.SubscriberMediator$1.onNext(SubscriberMediator.java:100)
at io.smallrye.reactive.streams.utils.WrappedSubscriber.onNext(WrappedSubscriber.java:42)
at io.reactivex.subscribers.SafeSubscriber.onNext(SafeSubscriber.java:90)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.subscribers.SerializedSubscriber.onNext(SerializedSubscriber.java:100)
at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenSourceSubscriber.onNext(FlowableRepeatWhen.java:160)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.lambda$onNext$2(ContextPropagatorOnFlowableCreateAction.java:50)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable$$Lambda$656/2107393518.run(Unknown Source)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:215)
at io.smallrye.context.SmallRyeThreadContext$$Lambda$597/1372646511.execute(Unknown Source)
at io.smallrye.context.propagators.rxjava2.ContextPropagatorOnFlowableCreateAction$ContextCapturerFlowable.onNext(ContextPropagatorOnFlowableCreateAction.java:50)
at io.vertx.reactivex.impl.FlowableReadStream.lambda$subscribeActual$2(FlowableReadStream.java:83)
at io.vertx.reactivex.impl.FlowableReadStream$$Lambda$660/1627618479.handle(Unknown Source)
at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$handler$1(KafkaConsumerImpl.java:79)
at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl$$Lambda$661/2000856156.handle(Unknown Source)
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:231)
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:186)
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl$$Lambda$663/1637651402.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.vertx.core.impl.EventLoopContext$$Lambda$664/1742798189.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
To Reproduce
Steps to reproduce the behavior:
docker rm $(docker ps -aq); docker-compose up
should_consume_todo_created_event_and_todo_marked_as_completed_event
inEventStoreEventConsumerTest
Configuration
# Add your application.properties here, if applicable.
Screenshots
(If applicable, add screenshots to help explain your problem.)
Environment (please complete the following information):
uname -a
orver
: Linux arch-anywhere 5.1.6-arch1-1-ARCH Switch to the Maven distributed copy of the SubstrateVM annotations #1 SMP PREEMPT Fri May 31 15:17:53 UTC 2019 x86_64 GNU/Linuxjava -version
: openjdk version "1.8.0_202"Additional context
By running the test a message is produce and send to the
event
topic.A consumer in
EventStoreEventConsumer
consume this message to store it in a Jpa Entity and send an email.The test
should_send_email
prove that mailing is well set up.Is this problem arise because when sending an email a new VertX event is send while we are already in a VertX event by the kafka topic consumer ? Is there a workaround ?
The text was updated successfully, but these errors were encountered: