Skip to content

Commit

Permalink
[ISSUE #8137] Support pop consumption for light message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb authored Aug 27, 2024
1 parent 63b9fbf commit 9e6bbf7
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,14 @@ private Set<MessageQueue> doLoadBalance(final String topic, final String consume
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
Set<MessageQueue> mqSet;
if (MixAll.isLmq(topic)) {
mqSet = new HashSet<>();
mqSet.add(new MessageQueue(
topic, brokerController.getBrokerConfig().getBrokerName(), (int)MixAll.LMQ_QUEUE_ID));
} else {
mqSet = topicRouteInfoManager.getTopicSubscribeInfo(topic);
}
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumerGroup, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
Expand Down Expand Up @@ -126,6 +128,24 @@ public void testSetMessageRequestMode_RetryTopic() throws Exception {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
}

@Test
public void testDoLoadBalance() throws Exception {
Method method = queryAssignmentProcessor.getClass()
.getDeclaredMethod("doLoadBalance", String.class, String.class, String.class, MessageModel.class,
String.class, SetMessageRequestModeRequestBody.class, ChannelHandlerContext.class);
method.setAccessible(true);

Set<MessageQueue> mqs1 = (Set<MessageQueue>) method.invoke(
queryAssignmentProcessor, MixAll.LMQ_PREFIX + topic, group, "127.0.0.1", MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely().getName(), new SetMessageRequestModeRequestBody(), handlerContext);
Set<MessageQueue> mqs2 = (Set<MessageQueue>) method.invoke(
queryAssignmentProcessor, MixAll.LMQ_PREFIX + topic, group, "127.0.0.2", MessageModel.CLUSTERING,
new AllocateMessageQueueAveragely().getName(), new SetMessageRequestModeRequestBody(), handlerContext);

assertThat(mqs1).hasSize(1);
assertThat(mqs2).isEmpty();
}

@Test
public void testAllocate4Pop() {
testAllocate4Pop(new AllocateMessageQueueAveragely());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class LMQProducer {
public static final String PRODUCER_GROUP = "ProducerGroupName";

public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";

public static final String TOPIC = "TopicLMQParent";

public static final String TAG = "TagA";

public static final String LMQ_TOPIC_1 = MixAll.LMQ_PREFIX + "123";

public static final String LMQ_TOPIC_2 = MixAll.LMQ_PREFIX + "456";

public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

// Uncomment the following line while debugging, namesrvAddr should be set to your local address
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH /* "INNER_MULTI_DISPATCH" */,
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, LMQ_TOPIC_1, LMQ_TOPIC_2) /* "%LMQ%123,%LMQ%456" */);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}

producer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

@SuppressWarnings("deprecation")
public class LMQPullConsumer {
public static final String BROKER_NAME = "broker-a";

public static final String CONSUMER_GROUP = "CID_LMQ_PULL_1";

public static final String TOPIC = "TopicLMQParent";

public static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "123";

public static final String NAMESRV_ADDR = "127.0.0.1:9876";

public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setRegisterTopics(new HashSet<>(Arrays.asList(TOPIC)));
consumer.start();

// use parent topic to fill up broker addr table
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
.updateTopicRouteInfoFromNameServer(TOPIC);

final MessageQueue lmq = new MessageQueue(LMQ_TOPIC, BROKER_NAME, (int) MixAll.LMQ_QUEUE_ID);
long offset = consumer.minOffset(lmq);

consumer.pullBlockIfNotFound(lmq, "*", offset, 32, new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
List<MessageExt> list = pullResult.getMsgFoundList();
if (list == null || list.isEmpty()) {
return;
}

for (MessageExt msg : list) {
System.out.printf("%s Pull New Messages: %s %n", Thread.currentThread().getName(), msg);
}
}

@Override
public void onException(Throwable e) {

}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;

public class LMQPushConsumer {
public static final String CLUSTER_NAME = "DefaultCluster";

public static final String BROKER_NAME = "broker-a";

public static final String TOPIC = "TopicLMQParent";

public static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "123";

public static final String CONSUMER_GROUP = "CID_LMQ_1";

public static final String NAMESRV_ADDR = "127.0.0.1:9876";

public static final HashMap<Long, String> BROKER_ADDR_MAP = new HashMap<Long, String>() {
{
put(MixAll.MASTER_ID, "127.0.0.1:10911");
}
};

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(LMQ_TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

// use parent topic to fill up broker addr table
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(TOPIC);

final TopicRouteData topicRouteData = new TopicRouteData();
final BrokerData brokerData = new BrokerData();
brokerData.setCluster(CLUSTER_NAME);
brokerData.setBrokerName(BROKER_NAME);
brokerData.setBrokerAddrs(BROKER_ADDR_MAP);
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
// compensate LMQ topic route for MQClientInstance#findBrokerAddrByTopic
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getTopicRouteTable().put(LMQ_TOPIC, topicRouteData);
// compensate for RebalanceImpl#topicSubscribeInfoTable
consumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(LMQ_TOPIC,
new HashSet<>(Arrays.asList(new MessageQueue(LMQ_TOPIC, BROKER_NAME, (int) MixAll.LMQ_QUEUE_ID))));
// re-balance immediately to start pulling messages
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().doRebalance();

System.out.printf("Consumer Started.%n");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;

import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

public class LMQPushPopConsumer {
public static final String CLUSTER_NAME = "DefaultCluster";

public static final String BROKER_NAME = "broker-a";

public static final String TOPIC = "TopicLMQParent";

public static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "456";

public static final String NAMESRV_ADDR = "127.0.0.1:9876";

public static final String CONSUMER_GROUP = "CID_LMQ_POP_1";

public static final HashMap<Long, String> BROKER_ADDR_MAP = new HashMap<Long, String>() {
{
put(MixAll.MASTER_ID, "127.0.0.1:10911");
}
};

public static void main(String[] args) throws Exception {
switchPop();

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.subscribe(LMQ_TOPIC, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// use server side rebalance
consumer.setClientRebalance(false);
consumer.start();

// use parent topic to fill up broker addr table
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(TOPIC);

final TopicRouteData topicRouteData = new TopicRouteData();
final BrokerData brokerData = new BrokerData();
brokerData.setCluster(CLUSTER_NAME);
brokerData.setBrokerName(BROKER_NAME);
brokerData.setBrokerAddrs(BROKER_ADDR_MAP);
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
// compensate LMQ topic route for MQClientInstance#findBrokerAddrByTopic
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getTopicRouteTable().put(LMQ_TOPIC, topicRouteData);
// re-balance immediately to start pulling messages
consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().doRebalance();

System.out.printf("Consumer Started.%n");
}

private static void switchPop() throws Exception {
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setNamesrvAddr(NAMESRV_ADDR);
mqAdminExt.start();
List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(TOPIC).getBrokerDatas();
for (BrokerData brokerData : brokerDatas) {
Set<String> brokerAddrs = new HashSet<>(brokerData.getBrokerAddrs().values());
for (String brokerAddr : brokerAddrs) {
mqAdminExt.setMessageRequestMode(brokerAddr, LMQ_TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8,
3_000);
}
}
}
}

0 comments on commit 9e6bbf7

Please sign in to comment.