Skip to content

Commit

Permalink
[ISSUE apache#8695] fix DefaultLitePullConsumer PullThreadNums Parame…
Browse files Browse the repository at this point in the history
…ter not effective. (apache#8696)
  • Loading branch information
luozongle01 authored Sep 14, 2024
1 parent 57d9f8d commit f872b68
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ private enum SubscriptionType {
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
Expand Down Expand Up @@ -293,6 +289,8 @@ public synchronized void start() throws MQClientException {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}

initScheduledThreadPoolExecutor();

initMQClientFactory();

initRebalanceImpl();
Expand Down Expand Up @@ -324,6 +322,13 @@ public synchronized void start() throws MQClientException {
}
}

private void initScheduledThreadPoolExecutor() {
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
this.defaultLitePullConsumer.getPullThreadNums(),
new ThreadFactoryImpl("PullMsgThread-" + this.defaultLitePullConsumer.getConsumerGroup())
);
}

private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
Expand Down

0 comments on commit f872b68

Please sign in to comment.