Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace usages of Atomic* for Atomic*FieldUpdater #195

Merged
merged 1 commit into from
Feb 10, 2017

Conversation

sschepens
Copy link
Contributor

Replaced some usages of AtomicInteger, AtomicLong and AtomicReference for their Atomic*FieldUpdater counterpart on classes which get instantiated a lot, this should reduce memory usage for the broker and clients which use lots of topics and consumers.

@merlimat
Copy link
Contributor

merlimat commented Feb 8, 2017

Change looks good, though there are few tests failing in travis build. eg:

2017-02-08 17:29:48,710 - INFO  - [main:TestListener@53] - @@@@@@@@@@@@@ STARTING TEST - com.yahoo.pulsar.broker.service.ResendRequestTest / testExclusiveCumulativeAckedNormalTopic -- attrs: []
2017-02-08 17:30:19,760 - ERROR - [main:TestListener@41] - ------------ Test Failed - 

@sschepens
Copy link
Contributor Author

@merlimat couldn't reproduce this errors on my machine, but I did realize that Travis is running extremely slow, maybe this is producing the failures?

@saandrews
Copy link
Contributor

Do you know of any performance impact in switching to this, since it uses reflection?
For the build issue, Can you restart and see if it succeeds? Travis build env is super slow.

@sschepens
Copy link
Contributor Author

sschepens commented Feb 8, 2017

@saandrews it shouldn't have any performance impact that I know of, in fact it should benefit performance, since it reduces memory usage, and so gc pauses.

I cannot restart the build in Travis, maybe @merlimat can

Edit: I'm going to run a comparison of tests time of this branch against master on my machine to see if I get any significant difference.

@merlimat
Copy link
Contributor

merlimat commented Feb 8, 2017

Do you know of any performance impact in switching to this, since it uses reflection?

@saandrews The reflection is only used at static load time of the class. Using the updater is actually faster because is treated as an intrinsic operation by jvm (apart from the spared allocation).

@rdhabalia
Copy link
Contributor

@sschepens

I cannot restart the build in Travis, maybe @merlimat can

After restarting the build: It took around 49 mins and then it has been terminated as it reached max time-limit of build-job: https://api.travis-ci.org/jobs/199668485/log.txt?deansi=true
Do you want to check it or I can trigger another time.

@sschepens
Copy link
Contributor Author

@rdhabalia I will indeed check it

@sschepens
Copy link
Contributor Author

@saandrews @merlimat @rdhabalia the issue seemed to be that I accidentally defaulted OwnedBundles to isActive = false.
Tests should run now.

@sschepens
Copy link
Contributor Author

sschepens commented Feb 9, 2017

@merlimat @rdhabalia I'm investigating test failues.
Apparently the issue seems to be that we're creating spies of Producer
This creates a new instance of a class, and so when updating the state it actually updates the internal state of the spy and not of the original producer.

Maybe we can change the test to not spy on the producer, or have all changes to state go through methods so they get proxied to the original instance.

What do you guys think?

EDIT: this did not happen before, because the spy got a copy of the AtomicReference and modified the same object.

@sschepens sschepens force-pushed the atomic_field_updaters branch 4 times, most recently from 9d6f715 to 9c17842 Compare February 9, 2017 19:15
Replaced some usages of AtomicInteger, AtomicLong and AtomicReference
for their Atomic*FieldUpdater counterpart on classes which get
instantiated a lot, this should reduce memory usage for the broker
and clients which use lots of topics and consumers.
@sschepens
Copy link
Contributor Author

@merlimat @rdhabalia I managed to fix the failing tests making spies proxy method calls to original instances, please check the tests I modified

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 9, 2017
@merlimat merlimat added this to the 1.17 milestone Feb 9, 2017
Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit fc9b34c into apache:master Feb 10, 2017
@sschepens sschepens deleted the atomic_field_updaters branch February 10, 2017 01:53
rdhabalia pushed a commit that referenced this pull request Feb 24, 2017
Replaced some usages of AtomicInteger, AtomicLong and AtomicReference
for their Atomic*FieldUpdater counterpart on classes which get
instantiated a lot, this should reduce memory usage for the broker
and clients which use lots of topics and consumers.
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
Add Kafka client docker images that cover version 1.x.0 to 2.y.0 (x = 0 to 1 y = 0 to 6) for integration tests.
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
Master issue: apache#138

This PR adds tests for images of apache#195 .

In addition, it fixes the local test error when running tests on MacOS. Because the containers cannot connect to the host's KoP or Kafka service listened on 127.0.0.1.



* Fix local tests error by listening to site local address

* Add tests for Kafka Java clients

* Remove the comments
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Jun 27, 2022
### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" apache#195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" apache#157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
merlimat pushed a commit that referenced this pull request Jun 28, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
merlimat pushed a commit that referenced this pull request Jun 28, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
nicoloboschi referenced this pull request in datastax/pulsar Jun 28, 2022
…pache#16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 4bdaa32)
congbobo184 pushed a commit that referenced this pull request Nov 10, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 291fedc)
congbobo184 pushed a commit that referenced this pull request Nov 26, 2022
…16243)

### Motivation

While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will
affect the message publish and subsequent consumer creation.

```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry  [0x0000700019642000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
	- waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable  [0x00007000171d5000]
   java.lang.Thread.State: RUNNABLE
	at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360)
	at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234)
	at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307)
	at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896)
	at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888)
	- locked <0x00001000158237d8> (a java.lang.Object)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
	- locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
	- locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
	at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187)
	at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309)
	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
	at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source)
	at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684)
	at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662)
	at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168)
	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
```

### Modification

- Sort the consumer list only if the new consumer with high priority than the last element in the consumer list,
  this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).

(cherry picked from commit 291fedc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants