Skip to content

Commit

Permalink
[ISSUE apache#8591] Preliminary support for key commands of LMQ (apac…
Browse files Browse the repository at this point in the history
…he#8590)

* Preliminary support for key commands of LMQ

* Preliminary support for key commands of LMQ

* Optimize some code

* Fix some bugs and UTs for lmq support

* Fix UTs can not pass

* Fix UTs can not pass

* Add some check to prevent NPE
  • Loading branch information
RongtongJin authored and lizhanhui committed Aug 30, 2024
1 parent 9448590 commit 9fb650f
Show file tree
Hide file tree
Showing 19 changed files with 348 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId
Map<Integer, Long> queueOffsetMap = new HashMap<>();

// Reset offset for all queues belonging to the specified topic
TopicConfig topicConfig = brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("Topic " + topic + " does not exist");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
Expand All @@ -43,6 +44,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -199,7 +201,7 @@ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryT
if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp,
boundaryType, timeoutMillis);
boundaryType, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
Expand Down Expand Up @@ -277,13 +279,20 @@ public MessageExt viewMessage(String topic, String msgId)
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
return queryMessage(null, topic, key, maxNum, begin, end, false);
}

public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {

return queryMessage(topic, uniqKey, maxNum, begin, end, true);
return queryMessage(null, topic, uniqKey, maxNum, begin, end, true);
}

public QueryResult queryMessageByUniqKey(String clusterName, String topic, String uniqKey, int maxNum, long begin,
long end)
throws MQClientException, InterruptedException {

return queryMessage(clusterName, topic, uniqKey, maxNum, begin, end, true);
}

public MessageExt queryMessageByUniqKey(String topic,
Expand Down Expand Up @@ -311,25 +320,29 @@ public MessageExt queryMessageByUniqKey(String clusterName, String topic,
}
}

protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end,
public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
return queryMessage(null, topic, key, maxNum, begin, end, isUniqKey);
}
boolean isLmq = MixAll.isLmq(topic);

String routeTopic = topic;
// if topic is lmq ,then use clusterName as lmq parent topic
// Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer
if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"))
&& !StringUtils.isEmpty(clusterName)) {
routeTopic = clusterName;
}

protected QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end,
boolean isUniqKey) throws MQClientException,
InterruptedException {
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic);
topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic);
}

if (topicRouteData != null) {
List<String> brokerAddrs = new LinkedList<>();
for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (clusterName != null && !clusterName.isEmpty()
if (!isLmq && clusterName != null && !clusterName.isEmpty()
&& !clusterName.equals(brokerData.getCluster())) {
continue;
}
Expand All @@ -347,7 +360,11 @@ protected QueryResult queryMessage(String clusterName, String topic, String key,
for (String addr : brokerAddrs) {
try {
QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
requestHeader.setTopic(topic);
if (isLmq) {
requestHeader.setTopic(clusterName);
} else {
requestHeader.setTopic(topic);
}
requestHeader.setKey(key);
requestHeader.setMaxNum(maxNum);
requestHeader.setBeginTimestamp(begin);
Expand Down Expand Up @@ -436,7 +453,7 @@ public void operationFail(Throwable throwable) {
String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
for (String k : keyArray) {
// both topic and key must be equal at the same time
if (Objects.equals(key, k) && Objects.equals(topic, msgTopic)) {
if (Objects.equals(key, k) && (isLmq || Objects.equals(topic, msgTopic))) {
matched = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void assertQueryMessage() throws InterruptedException, MQClientException,
callback.operationSucceed(response);
return null;
}).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(), any(InvokeCallback.class), any());
QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 100, 1L, 50L, false);
QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 100, 1L, 50L);
assertNotNull(actual);
assertEquals(1, actual.getMessageList().size());
assertEquals(defaultTopic, actual.getMessageList().get(0).getTopic());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
package org.apache.rocketmq.example.lmq;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand Down Expand Up @@ -47,6 +47,7 @@ public static void main(String[] args) throws MQClientException, InterruptedExce
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setKeys("Key" + i);
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH /* "INNER_MULTI_DISPATCH" */,
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, LMQ_TOPIC_1, LMQ_TOPIC_2) /* "%LMQ%123,%LMQ%456" */);
SendResult sendResult = producer.send(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
package org.apache.rocketmq.example.lmq;

import java.util.Arrays;
import java.util.HashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
package org.apache.rocketmq.example.lmq;

import com.google.common.collect.Lists;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.simple;
package org.apache.rocketmq.example.lmq;

import com.google.common.collect.Lists;
import java.util.HashMap;
Expand Down
Loading

0 comments on commit 9fb650f

Please sign in to comment.