diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 3039cf5c97ce..28bd25491453 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2062,7 +2062,7 @@ private RemotingCommand resetOffsetInner(String topic, String group, int queueId Map queueOffsetMap = new HashMap<>(); // Reset offset for all queues belonging to the specified topic - TopicConfig topicConfig = brokerController.getTopicConfigManager().getTopicConfigTable().get(topic); + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("Topic " + topic + " does not exist"); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index bcfe29bd4f66..c1e3ee33dc1d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -43,6 +44,7 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -199,7 +201,7 @@ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType boundaryT if (brokerAddr != null) { try { return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, timestamp, - boundaryType, timeoutMillis); + boundaryType, timeoutMillis); } catch (Exception e) { throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e); } @@ -277,13 +279,20 @@ public MessageExt viewMessage(String topic, String msgId) public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - return queryMessage(topic, key, maxNum, begin, end, false); + return queryMessage(null, topic, key, maxNum, begin, end, false); } public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - return queryMessage(topic, uniqKey, maxNum, begin, end, true); + return queryMessage(null, topic, uniqKey, maxNum, begin, end, true); + } + + public QueryResult queryMessageByUniqKey(String clusterName, String topic, String uniqKey, int maxNum, long begin, + long end) + throws MQClientException, InterruptedException { + + return queryMessage(clusterName, topic, uniqKey, maxNum, begin, end, true); } public MessageExt queryMessageByUniqKey(String topic, @@ -311,25 +320,29 @@ public MessageExt queryMessageByUniqKey(String clusterName, String topic, } } - protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, + public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException, InterruptedException { - return queryMessage(null, topic, key, maxNum, begin, end, isUniqKey); - } + boolean isLmq = MixAll.isLmq(topic); + + String routeTopic = topic; + // if topic is lmq ,then use clusterName as lmq parent topic + // Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer + if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) + && !StringUtils.isEmpty(clusterName)) { + routeTopic = clusterName; + } - protected QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end, - boolean isUniqKey) throws MQClientException, - InterruptedException { - TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); + TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic); if (null == topicRouteData) { - this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(routeTopic); + topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(routeTopic); } if (topicRouteData != null) { List brokerAddrs = new LinkedList<>(); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - if (clusterName != null && !clusterName.isEmpty() + if (!isLmq && clusterName != null && !clusterName.isEmpty() && !clusterName.equals(brokerData.getCluster())) { continue; } @@ -347,7 +360,11 @@ protected QueryResult queryMessage(String clusterName, String topic, String key, for (String addr : brokerAddrs) { try { QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader(); - requestHeader.setTopic(topic); + if (isLmq) { + requestHeader.setTopic(clusterName); + } else { + requestHeader.setTopic(topic); + } requestHeader.setKey(key); requestHeader.setMaxNum(maxNum); requestHeader.setBeginTimestamp(begin); @@ -436,7 +453,7 @@ public void operationFail(Throwable throwable) { String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR); for (String k : keyArray) { // both topic and key must be equal at the same time - if (Objects.equals(key, k) && Objects.equals(topic, msgTopic)) { + if (Objects.equals(key, k) && (isLmq || Objects.equals(topic, msgTopic))) { matched = true; break; } diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index 3663df24d650..f52aba2dc00b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -165,7 +165,7 @@ public void assertQueryMessage() throws InterruptedException, MQClientException, callback.operationSucceed(response); return null; }).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(), any(InvokeCallback.class), any()); - QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 100, 1L, 50L, false); + QueryResult actual = mqAdminImpl.queryMessage(defaultTopic, "keys", 100, 1L, 50L); assertNotNull(actual); assertEquals(1, actual.getMessageList().size()); assertEquals(defaultTopic, actual.getMessageList().get(0).getTopic()); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java similarity index 97% rename from example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java index 81ef2e13859e..5fee94802877 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQProducer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.simple; +package org.apache.rocketmq.example.lmq; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -47,6 +47,7 @@ public static void main(String[] args) throws MQClientException, InterruptedExce for (int i = 0; i < 128; i++) { try { Message msg = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + msg.setKeys("Key" + i); msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH /* "INNER_MULTI_DISPATCH" */, String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, LMQ_TOPIC_1, LMQ_TOPIC_2) /* "%LMQ%123,%LMQ%456" */); SendResult sendResult = producer.send(msg); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java similarity index 98% rename from example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java index 7b1bdc39215f..931dd96b48f4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPullConsumer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.simple; +package org.apache.rocketmq.example.lmq; import java.util.Arrays; import java.util.HashSet; diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java similarity index 98% rename from example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java index efe37d868169..f8926a05dfd6 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushConsumer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.simple; +package org.apache.rocketmq.example.lmq; import com.google.common.collect.Lists; diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java similarity index 99% rename from example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java rename to example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java index 2044057b2af0..517eb12b7d2f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LMQPushPopConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/lmq/LMQPushPopConsumer.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.example.simple; +package org.apache.rocketmq.example.lmq; import com.google.common.collect.Lists; import java.util.HashMap; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 5be6d24ff764..6ebee1d0dd13 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -153,6 +153,12 @@ public QueryResult queryMessage(String topic, String key, int maxNum, long begin return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end); } + + public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end) + throws MQClientException, InterruptedException, RemotingException { + return defaultMQAdminExtImpl.queryMessage(clusterName, topic, key, maxNum, begin, end); + } + @Override public void start() throws MQClientException { defaultMQAdminExtImpl.start(); @@ -196,7 +202,8 @@ public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws R } @Override - public void createAndUpdateTopicConfigList(String addr, List topicConfigList) throws InterruptedException, RemotingException, MQClientException { + public void createAndUpdateTopicConfigList(String addr, + List topicConfigList) throws InterruptedException, RemotingException, MQClientException { defaultMQAdminExtImpl.createAndUpdateTopicConfigList(addr, topicConfigList); } @@ -300,6 +307,12 @@ public ConsumeStats examineConsumeStats( return examineConsumeStats(consumerGroup, null); } + @Override + public ConsumeStats examineConsumeStats(String clusterName, String consumerGroup, + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.examineConsumeStats(clusterName, consumerGroup, topic); + } + @Override public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, @@ -459,16 +472,35 @@ public List resetOffsetByTimestampOld(String consumerGroup, Strin return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); } + public List resetOffsetByTimestampOld(String clusterName, String consumerGroup, String topic, long timestamp, + boolean force) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.resetOffsetByTimestampOld(clusterName, consumerGroup, topic, timestamp, force); + } + + @Override + public Map resetOffsetByTimestamp(String clusterName, String topic, String group, + long timestamp, boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName, topic, group, timestamp, isForce); + } + @Override public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return resetOffsetByTimestamp(topic, group, timestamp, isForce, false); } + public Map resetOffsetByTimestamp(String clusterName, String topic, String group, + long timestamp, boolean isForce, boolean isC) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return defaultMQAdminExtImpl.resetOffsetByTimestamp(clusterName, topic, group, timestamp, isForce, isC); + } + + public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC); + return defaultMQAdminExtImpl.resetOffsetByTimestamp(null, topic, group, timestamp, isForce, isC); } @Override @@ -589,10 +621,19 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c @Override public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, - final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + final String topic, + final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); } + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String clusterName, final String consumerGroup, + final String clientId, + final String topic, + final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return defaultMQAdminExtImpl.consumeMessageDirectly(clusterName, consumerGroup, clientId, topic, msgId); + } + @Override public List messageTrackDetail( MessageExt msg) throws RemotingException, MQClientException, InterruptedException, @@ -796,10 +837,10 @@ public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) this.defaultMQAdminExtImpl.resetMasterFlushOffset(brokerAddr, masterFlushOffset); } - public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end) + public QueryResult queryMessageByUniqKey(String clusterName, String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { - - return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end); + return defaultMQAdminExtImpl.queryMessageByUniqKey(clusterName, topic, key, maxNum, begin, end); } public DefaultMQAdminExtImpl getDefaultMQAdminExtImpl() { @@ -831,13 +872,14 @@ public void updateControllerConfig(Properties properties, @Override public Pair electMaster(String controllerAddr, String clusterName, - String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { + String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.electMaster(controllerAddr, clusterName, brokerName, brokerId); } @Override public void cleanControllerBrokerData(String controllerAddr, String clusterName, String brokerName, - String brokerControllerIdsToClean, boolean isCleanLivingBroker) throws RemotingException, InterruptedException, MQBrokerException { + String brokerControllerIdsToClean, + boolean isCleanLivingBroker) throws RemotingException, InterruptedException, MQBrokerException { this.defaultMQAdminExtImpl.cleanControllerBrokerData(controllerAddr, clusterName, brokerName, brokerControllerIdsToClean, isCleanLivingBroker); } @@ -876,13 +918,15 @@ public void createUser(String brokerAddr, } @Override - public void createUser(String brokerAddr, String username, String password, String userType) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void createUser(String brokerAddr, String username, String password, + String userType) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.createUser(brokerAddr, username, password, userType); } @Override public void updateUser(String brokerAddr, String username, - String password, String userType, String userStatus) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + String password, String userType, + String userStatus) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.updateUser(brokerAddr, username, password, userType, userStatus); } @@ -912,38 +956,45 @@ public List listUser(String brokerAddr, @Override public void createAcl(String brokerAddr, String subject, List resources, List actions, - List sourceIps, String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + List sourceIps, + String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.createAcl(brokerAddr, subject, resources, actions, sourceIps, decision); } @Override - public void createAcl(String brokerAddr, AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void createAcl(String brokerAddr, + AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.createAcl(brokerAddr, aclInfo); } @Override public void updateAcl(String brokerAddr, String subject, List resources, List actions, - List sourceIps, String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + List sourceIps, + String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.updateAcl(brokerAddr, subject, resources, actions, sourceIps, decision); } @Override - public void updateAcl(String brokerAddr, AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void updateAcl(String brokerAddr, + AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.updateAcl(brokerAddr, aclInfo); } @Override - public void deleteAcl(String brokerAddr, String subject, String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void deleteAcl(String brokerAddr, String subject, + String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.deleteAcl(brokerAddr, subject, resource); } @Override - public AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public AclInfo getAcl(String brokerAddr, + String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return defaultMQAdminExtImpl.getAcl(brokerAddr, subject); } @Override - public List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public List listAcl(String brokerAddr, String subjectFilter, + String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return defaultMQAdminExtImpl.listAcl(brokerAddr, subjectFilter, resourceFilter); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 9546235d3e87..dc4d35e7049b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -422,14 +422,21 @@ public KVTable fetchBrokerRuntimeStats( return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis); } + @Override + public ConsumeStats examineConsumeStats( + String consumerGroup, + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + return examineConsumeStats(null, consumerGroup, topic); + } + @Override public ConsumeStats examineConsumeStats( String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { - return examineConsumeStats(consumerGroup, null); + return examineConsumeStats(null, consumerGroup, null); } @Override - public ConsumeStats examineConsumeStats(String consumerGroup, + public ConsumeStats examineConsumeStats(String clusterName, String consumerGroup, String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = null; List routeTopics = new ArrayList<>(); @@ -438,6 +445,12 @@ public ConsumeStats examineConsumeStats(String consumerGroup, routeTopics.add(topic); routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, consumerGroup)); } + + // Use clusterName topic to get topic route for lmq or rmq_sys_wheel_timer + if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) && !StringUtils.isEmpty(clusterName)) { + routeTopics.add(clusterName); + } + for (int i = 0; i < routeTopics.size(); i++) { try { topicRouteData = this.examineTopicRouteInfo(routeTopics.get(i)); @@ -467,25 +480,33 @@ public ConsumeStats examineConsumeStats(String consumerGroup, topics.add(messageQueue.getTopic()); } - ConsumeStats staticResult = new ConsumeStats(); - staticResult.setConsumeTps(result.getConsumeTps()); - // for topic, we put the physical stats, how about group? - // staticResult.getOffsetTable().putAll(result.getOffsetTable()); - - for (String currentTopic : topics) { - TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic); - if (currentRoute.getTopicQueueMappingByBroker() == null - || currentRoute.getTopicQueueMappingByBroker().isEmpty()) { - //normal topic - for (Map.Entry entry : result.getOffsetTable().entrySet()) { - if (entry.getKey().getTopic().equals(currentTopic)) { - staticResult.getOffsetTable().put(entry.getKey(), entry.getValue()); + ConsumeStats staticResult = null; + + if (StringUtils.isEmpty(clusterName)) { + + staticResult = new ConsumeStats(); + staticResult.setConsumeTps(result.getConsumeTps()); + // for topic, we put the physical stats, how about group? + // staticResult.getOffsetTable().putAll(result.getOffsetTable()); + + for (String currentTopic : topics) { + TopicRouteData currentRoute = this.examineTopicRouteInfo(currentTopic); + if (currentRoute.getTopicQueueMappingByBroker() == null + || currentRoute.getTopicQueueMappingByBroker().isEmpty()) { + //normal topic + for (Map.Entry entry : result.getOffsetTable().entrySet()) { + if (entry.getKey().getTopic().equals(currentTopic)) { + staticResult.getOffsetTable().put(entry.getKey(), entry.getValue()); + } } } + Map brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt); + ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result); + staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable()); } - Map brokerConfigMap = MQAdminUtils.examineTopicConfigFromRoute(currentTopic, currentRoute, defaultMQAdminExt); - ConsumeStats consumeStats = MQAdminUtils.convertPhysicalConsumeStats(brokerConfigMap, result); - staticResult.getOffsetTable().putAll(consumeStats.getOffsetTable()); + + } else { + staticResult = result; } if (staticResult.getOffsetTable().isEmpty()) { @@ -811,10 +832,16 @@ public void deleteKvConfig(String namespace, this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis); } - @Override - public List resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + public List resetOffsetByTimestampOld(String clusterName, String consumerGroup, String topic, + long timestamp, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + String routeTopic = topic; + // Use clusterName topic to get topic route for lmq or rmq_sys_wheel_timer + if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) + && !StringUtils.isEmpty(clusterName)) { + routeTopic = clusterName; + } + TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic); List rollbackStatsList = new ArrayList<>(); Map topicRouteMap = new HashMap<>(); for (QueueData queueData : topicRouteData.getQueueDatas()) { @@ -829,6 +856,12 @@ public List resetOffsetByTimestampOld(String consumerGroup, Strin return rollbackStatsList; } + @Override + public List resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return resetOffsetByTimestampOld(null, consumerGroup, topic, timestamp, force); + } + private List resetOffsetByTimestampOld(String brokerAddr, QueueData queueData, String consumerGroup, String topic, long timestamp, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { @@ -864,7 +897,7 @@ private List resetOffsetByTimestampOld(String brokerAddr, QueueDa @Override public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - return resetOffsetByTimestamp(topic, group, timestamp, isForce, false); + return resetOffsetByTimestamp(null, topic, group, timestamp, isForce, false); } @Override @@ -951,9 +984,16 @@ public void run() { }); } - public Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, + public Map resetOffsetByTimestamp(String clusterName, String topic, String group, + long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); + String routeTopic = topic; + // Use clusterName topic to get topic route for lmq or rmq_sys_wheel_timer + if (!StringUtils.isEmpty(topic) && (MixAll.isLmq(topic) || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) + && !StringUtils.isEmpty(clusterName)) { + routeTopic = clusterName; + } + TopicRouteData topicRouteData = this.examineTopicRouteInfo(routeTopic); List brokerDatas = topicRouteData.getBrokerDatas(); Map allOffsetTable = new HashMap<>(); if (brokerDatas != null) { @@ -1325,7 +1365,8 @@ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String c @Override public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, - final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + final String topic, + final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { MessageExt msg = this.viewMessage(topic, msgId); if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgId, timeoutMillis); @@ -1335,6 +1376,20 @@ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumer } } + @Override + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String clusterName, final String consumerGroup, + final String clientId, + final String topic, + final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { + MessageExt msg = this.queryMessage(clusterName, topic, msgId); + if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { + return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgId, timeoutMillis); + } else { + MessageClientExt msgClient = (MessageClientExt) msg; + return this.mqClientInstance.getMQClientAPIImpl().consumeMessageDirectly(NetworkUtil.socketAddress2String(msg.getStoreHost()), consumerGroup, clientId, topic, msgClient.getOffsetMsgId(), timeoutMillis); + } + } + @Override public List messageTrackDetail( MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { @@ -1664,10 +1719,10 @@ public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, f while (iterator.hasNext()) { TopicConfig topicConfig = iterator.next().getValue(); if (topicList.getTopicList().contains(topicConfig.getTopicName()) - || TopicValidator.isSystemTopic(topicConfig.getTopicName())) { + || TopicValidator.isSystemTopic(topicConfig.getTopicName())) { iterator.remove(); } else if (!specialTopic && StringUtils.startsWithAny(topicConfig.getTopicName(), - MixAll.RETRY_GROUP_TOPIC_PREFIX, MixAll.DLQ_GROUP_TOPIC_PREFIX)) { + MixAll.RETRY_GROUP_TOPIC_PREFIX, MixAll.DLQ_GROUP_TOPIC_PREFIX)) { iterator.remove(); } else if (!PermName.isValid(topicConfig.getPerm())) { iterator.remove(); @@ -1726,6 +1781,11 @@ public QueryResult queryMessage(String topic, String key, int maxNum, long begin return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } + public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException, RemotingException { + return this.mqClientInstance.getMQAdminImpl().queryMessage(clusterName, topic, key, maxNum, begin, end, false); + } + @Override public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException { @@ -1783,10 +1843,9 @@ public long searchOffset(final String brokerAddr, final String topicName, final return this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr, topicName, queueId, timestamp, timeoutMillis); } - public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, + public QueryResult queryMessageByUniqKey(String clusterName, String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { - - return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end); + return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(clusterName, topic, key, maxNum, begin, end); } @Override @@ -1812,6 +1871,12 @@ public void resetOffsetByQueueId(final String brokerAddr, final String consumeGr } } + @Override + public Map resetOffsetByTimestamp(String clusterName, String topic, String group, + long timestamp, boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return resetOffsetByTimestamp(clusterName, topic, group, timestamp, isForce, false); + } + @Override public HARuntimeInfo getBrokerHAStatus( String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { @@ -1844,7 +1909,7 @@ public void resetMasterFlushOffset(String brokerAddr, @Override public Pair electMaster(String controllerAddr, String clusterName, - String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { + String brokerName, Long brokerId) throws RemotingException, InterruptedException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().electMaster(controllerAddr, clusterName, brokerName, brokerId); } @@ -1930,20 +1995,23 @@ public void createUser(String brokerAddr, } @Override - public void createUser(String brokerAddr, String username, String password, String userType) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void createUser(String brokerAddr, String username, String password, + String userType) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { UserInfo userInfo = UserInfo.of(username, password, userType); this.createUser(brokerAddr, userInfo); } @Override public void updateUser(String brokerAddr, String username, - String password, String userType, String userStatus) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + String password, String userType, + String userStatus) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { UserInfo userInfo = UserInfo.of(username, password, userType, userStatus); this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr, userInfo, timeoutMillis); } @Override - public void updateUser(String brokerAddr, UserInfo userInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void updateUser(String brokerAddr, + UserInfo userInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().updateUser(brokerAddr, userInfo, timeoutMillis); } @@ -1967,40 +2035,47 @@ public List listUser(String brokerAddr, @Override public void createAcl(String brokerAddr, String subject, List resources, List actions, - List sourceIps, String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + List sourceIps, + String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps, decision); this.createAcl(brokerAddr, aclInfo); } @Override - public void createAcl(String brokerAddr, AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void createAcl(String brokerAddr, + AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().createAcl(brokerAddr, aclInfo, timeoutMillis); } @Override public void updateAcl(String brokerAddr, String subject, List resources, List actions, - List sourceIps, String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + List sourceIps, + String decision) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { AclInfo aclInfo = AclInfo.of(subject, resources, actions, sourceIps, decision); this.updateAcl(brokerAddr, aclInfo); } @Override - public void updateAcl(String brokerAddr, AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void updateAcl(String brokerAddr, + AclInfo aclInfo) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().updateAcl(brokerAddr, aclInfo, timeoutMillis); } @Override - public void deleteAcl(String brokerAddr, String subject, String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public void deleteAcl(String brokerAddr, String subject, + String resource) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().deleteAcl(brokerAddr, subject, resource, timeoutMillis); } @Override - public AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public AclInfo getAcl(String brokerAddr, + String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().getAcl(brokerAddr, subject, timeoutMillis); } @Override - public List listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + public List listAcl(String brokerAddr, String subjectFilter, + String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().listAcl(brokerAddr, subjectFilter, resourceFilter, timeoutMillis); } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 9dff3cbab959..ff78f22c704a 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -152,6 +152,10 @@ ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + ConsumeStats examineConsumeStats(final String clusterName, final String consumerGroup, + final String topic) throws RemotingException, MQClientException, + InterruptedException, MQBrokerException; + ConsumeStats examineConsumeStats(final String brokerAddr, final String consumerGroup, final String topicName, final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; @@ -232,6 +236,9 @@ List resetOffsetByTimestampOld(String consumerGroup, String topic Map resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + Map resetOffsetByTimestamp(String clusterName, String topic, String group, long timestamp, boolean isForce) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; @@ -293,6 +300,11 @@ ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String topic, String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + ConsumeMessageDirectlyResult consumeMessageDirectly(String clusterName, String consumerGroup, + String clientId, + String topic, + String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; + List messageTrackDetail( MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index c489cad68495..b638dcf61f35 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -72,6 +72,10 @@ public Options buildCommandlineOptions(Options options) { optionShowClientIP.setRequired(false); options.addOption(optionShowClientIP); + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -109,6 +113,8 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t boolean showClientIP = commandLine.hasOption('s') && "true".equalsIgnoreCase(commandLine.getOptionValue('s')); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; + if (commandLine.hasOption('g')) { String consumerGroup = commandLine.getOptionValue('g').trim(); String topicName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null; @@ -116,7 +122,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (topicName == null) { consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup); } else { - consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup, topicName); + consumeStats = defaultMQAdminExt.examineConsumeStats(clusterName, consumerGroup, topicName); } List mqList = new LinkedList<>(consumeStats.getOffsetTable().keySet()); Collections.sort(mqList); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index 5245ca089ffe..e83029eed31d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -44,9 +44,10 @@ import org.apache.rocketmq.tools.command.SubCommandException; public class QueryMsgByIdSubCommand implements SubCommand { - public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId, final Charset msgBodyCharset) throws MQClientException, + public static void queryById(final DefaultMQAdminExt admin, final String clusterName, final String topic, + final String msgId, final Charset msgBodyCharset) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, IOException { - MessageExt msg = admin.viewMessage(topic, msgId); + MessageExt msg = admin.queryMessage(clusterName, topic, msgId); printMsg(admin, msg, msgBodyCharset); } @@ -55,7 +56,8 @@ public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) printMsg(admin, msg, null); } - public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg, final Charset msgBodyCharset) throws IOException { + public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg, + final Charset msgBodyCharset) throws IOException { if (msg == null) { System.out.printf("%nMessage not found!"); return; @@ -219,6 +221,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -244,13 +250,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t final String msgIds = commandLine.getOptionValue('i').trim(); final String[] msgIdArr = StringUtils.split(msgIds, ","); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; if (commandLine.hasOption('g') && commandLine.hasOption('d')) { final String consumerGroup = commandLine.getOptionValue('g').trim(); final String clientId = commandLine.getOptionValue('d').trim(); for (String msgId : msgIdArr) { if (StringUtils.isNotBlank(msgId)) { - pushMsg(defaultMQAdminExt, consumerGroup, clientId, topic, msgId.trim()); + pushMsg(defaultMQAdminExt, clusterName, consumerGroup, clientId, topic, msgId.trim()); } } } else if (commandLine.hasOption('s')) { @@ -258,7 +265,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (resend) { for (String msgId : msgIdArr) { if (StringUtils.isNotBlank(msgId)) { - sendMsg(defaultMQAdminExt, defaultMQProducer, topic, msgId.trim()); + sendMsg(defaultMQAdminExt, clusterName, defaultMQProducer, topic, msgId.trim()); } } } @@ -269,7 +276,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } for (String msgId : msgIdArr) { if (StringUtils.isNotBlank(msgId)) { - queryById(defaultMQAdminExt, topic, msgId.trim(), msgBodyCharset); + queryById(defaultMQAdminExt, clusterName, topic, msgId.trim(), msgBodyCharset); } } @@ -282,13 +289,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } } - private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, + private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String clusterName, + final String consumerGroup, final String clientId, final String topic, final String msgId) { try { ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false); if (consumerRunningInfo != null && ConsumerRunningInfo.isPushType(consumerRunningInfo)) { ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + defaultMQAdminExt.consumeMessageDirectly(clusterName, consumerGroup, clientId, topic, msgId); System.out.printf("%s", result); } else { System.out.printf("this %s client is not push consumer ,not support direct push \n", clientId); @@ -298,10 +306,11 @@ private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String con } } - private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer, + private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final String clusterName, + final DefaultMQProducer defaultMQProducer, final String topic, final String msgId) { try { - MessageExt msg = defaultMQAdminExt.viewMessage(topic, msgId); + MessageExt msg = defaultMQAdminExt.queryMessage(clusterName, topic, msgId); if (msg != null) { // resend msg by id System.out.printf("prepare resend msg. originalMsgId=%s", msgId); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java index 64627fd19fae..02961c3bb501 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java @@ -23,6 +23,8 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; @@ -41,7 +43,7 @@ public String commandDesc() { @Override public Options buildCommandlineOptions(Options options) { - Option opt = new Option("t", "topic", true, "topic name"); + Option opt = new Option("t", "topic", true, "Topic name"); opt.setRequired(true); options.addOption(opt); @@ -57,7 +59,11 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); - opt = new Option("c", "maxNum", true, "The maximum number of messages returned by the query, default:64"); + opt = new Option("m", "maxNum", true, "The maximum number of messages returned by the query, default:64"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); opt.setRequired(false); options.addOption(opt); @@ -77,16 +83,20 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t long beginTimestamp = 0; long endTimestamp = Long.MAX_VALUE; int maxNum = 64; + String clusterName = null; if (commandLine.hasOption("b")) { beginTimestamp = Long.parseLong(commandLine.getOptionValue("b").trim()); } if (commandLine.hasOption("e")) { endTimestamp = Long.parseLong(commandLine.getOptionValue("e").trim()); } + if (commandLine.hasOption("m")) { + maxNum = Integer.parseInt(commandLine.getOptionValue("m").trim()); + } if (commandLine.hasOption("c")) { - maxNum = Integer.parseInt(commandLine.getOptionValue("c").trim()); + clusterName = commandLine.getOptionValue("c").trim(); } - this.queryByKey(defaultMQAdminExt, topic, key, maxNum, beginTimestamp, endTimestamp); + this.queryByKey(defaultMQAdminExt, clusterName, topic, key, maxNum, beginTimestamp, endTimestamp); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { @@ -94,12 +104,13 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } } - private void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key, int maxNum, long begin, + private void queryByKey(final DefaultMQAdminExt admin, final String cluster, final String topic, final String key, int maxNum, long begin, long end) - throws MQClientException, InterruptedException { + throws MQClientException, InterruptedException, RemotingException { admin.start(); - QueryResult queryResult = admin.queryMessage(topic, key, maxNum, begin, end); + QueryResult queryResult = admin.queryMessage(cluster, topic, key, maxNum, begin, end); + System.out.printf("%-50s %4s %40s%n", "#Message ID", "#QID", diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index b71cee901605..5295d91cc305 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -25,13 +25,11 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.rocketmq.client.QueryResult; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -51,19 +49,18 @@ private DefaultMQAdminExt createMQAdminExt(RPCHook rpcHook) throws SubCommandExc defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try { defaultMQAdminExt.start(); - } - catch (Exception e) { + } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } return defaultMQAdminExt; } } - public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId, - final boolean showAll) throws MQClientException, - RemotingException, MQBrokerException, InterruptedException, IOException { + public static void queryById(final DefaultMQAdminExt admin, final String clusterName, final String topic, + final String msgId, + final boolean showAll) throws MQClientException, InterruptedException, IOException { - QueryResult queryResult = admin.queryMessageByUniqKey(topic, msgId, 32, 0, Long.MAX_VALUE); + QueryResult queryResult = admin.queryMessageByUniqKey(clusterName, topic, msgId, 32, 0, Long.MAX_VALUE); assert queryResult != null; List list = queryResult.getMessageList(); if (list == null || list.size() == 0) { @@ -94,7 +91,7 @@ private static void showMessage(final DefaultMQAdminExt admin, MessageExt msg, i System.out.printf(strFormat, "Store Host:", RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())); System.out.printf(intFormat, "System Flag:", msg.getSysFlag()); System.out.printf(strFormat, "Properties:", - msg.getProperties() != null ? msg.getProperties().toString() : ""); + msg.getProperties() != null ? msg.getProperties().toString() : ""); System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath); try { @@ -166,6 +163,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -173,10 +174,11 @@ public Options buildCommandlineOptions(Options options) { public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { try { - defaultMQAdminExt = createMQAdminExt(rpcHook); + defaultMQAdminExt = createMQAdminExt(rpcHook); final String msgId = commandLine.getOptionValue('i').trim(); final String topic = commandLine.getOptionValue('t').trim(); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; final boolean showAll = commandLine.hasOption('a'); if (commandLine.hasOption('g') && commandLine.hasOption('d')) { final String consumerGroup = commandLine.getOptionValue('g').trim(); @@ -189,14 +191,14 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t } if (consumerRunningInfo != null && ConsumerRunningInfo.isPushType(consumerRunningInfo)) { ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); System.out.printf("%s", result); } else { - System.out.printf("get consumer info failed or this %s client is not push consumer ,not support direct push \n", clientId); + System.out.printf("get consumer info failed or this %s client is not push consumer, not support direct push \n", clientId); } } else { - queryById(defaultMQAdminExt, topic, msgId, showAll); + queryById(defaultMQAdminExt, clusterName, topic, msgId, showAll); } } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java index 993fa5018755..84a301bd60cc 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java @@ -77,6 +77,10 @@ public Options buildCommandlineOptions(Options options) { opt.setRequired(false); options.addOption(opt); + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -88,6 +92,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t String group = commandLine.getOptionValue("g").trim(); String topic = commandLine.getOptionValue("t").trim(); String timeStampStr = commandLine.getOptionValue("s").trim(); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; long timestamp = "now".equals(timeStampStr) ? System.currentTimeMillis() : 0; try { @@ -129,7 +134,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t if (brokerAddr != null && queueId >= 0) { System.out.printf("start reset consumer offset by specified, " + "group[%s], topic[%s], queueId[%s], broker[%s], timestamp(string)[%s], timestamp(long)[%s]%n", - group, topic, queueId, brokerAddr, timeStampStr, timestamp); + group, topic, queueId, brokerAddr, timeStampStr, timestamp); long resetOffset = null != offset ? offset : defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000); @@ -143,11 +148,11 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t Map offsetTable; try { - offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC); + offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp, force, isC); } catch (MQClientException e) { - // if consumer not online, use old command to reset reset + // if consumer not online, use old command to reset if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { - ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, group, topic, timestamp, force, timeStampStr); + ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt, clusterName, group, topic, timestamp, force, timeStampStr); return; } throw e; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java index 7984bb8c39f9..c179c5c80513 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java @@ -34,12 +34,13 @@ public class ResetOffsetByTimeOldCommand implements SubCommand { - public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, + public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String clusterName, String consumerGroup, + String topic, long timestamp, boolean force, String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { List rollbackStatsList = - defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); + defaultMQAdminExt.resetOffsetByTimestampOld(clusterName, consumerGroup, topic, timestamp, force); System.out.printf("reset consumer offset by specified " + "consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n", @@ -93,6 +94,11 @@ public Options buildCommandlineOptions(Options options) { opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -104,6 +110,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t String consumerGroup = commandLine.getOptionValue("g").trim(); String topic = commandLine.getOptionValue("t").trim(); String timeStampStr = commandLine.getOptionValue("s").trim(); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; long timestamp = 0; try { timestamp = Long.parseLong(timeStampStr); @@ -123,7 +130,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t force = Boolean.parseBoolean(commandLine.getOptionValue("f").trim()); } defaultMQAdminExt.start(); - resetOffset(defaultMQAdminExt, consumerGroup, topic, timestamp, force, timeStampStr); + resetOffset(defaultMQAdminExt, clusterName, consumerGroup, topic, timestamp, force, timeStampStr); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java index b22491a5918f..8f2ac2e1e141 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/SkipAccumulationSubCommand.java @@ -57,6 +57,10 @@ public Options buildCommandlineOptions(Options options) { opt = new Option("f", "force", true, "set the force rollback by timestamp switch[true|false]"); opt.setRequired(false); options.addOption(opt); + + opt = new Option("c", "cluster", true, "Cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); return options; } @@ -68,6 +72,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t try { String group = commandLine.getOptionValue("g").trim(); String topic = commandLine.getOptionValue("t").trim(); + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null; boolean force = true; if (commandLine.hasOption('f')) { force = Boolean.valueOf(commandLine.getOptionValue("f").trim()); @@ -76,7 +81,7 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) t defaultMQAdminExt.start(); Map offsetTable; try { - offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force); + offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(clusterName, topic, group, timestamp, force); } catch (MQClientException e) { if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) { List rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(group, topic, timestamp, force); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java index a1619ecedfdb..47ca761d1f6f 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java @@ -27,6 +27,8 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.protocol.admin.TopicOffset; import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; @@ -48,6 +50,10 @@ public Options buildCommandlineOptions(Options options) { Option opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); + + opt = new Option("c", "cluster", true, "cluster name or lmq parent topic, lmq is used to find the route."); + opt.setRequired(false); + options.addOption(opt); return options; } @@ -58,10 +64,26 @@ public void execute(final CommandLine commandLine, final Options options, defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + try { + TopicStatsTable topicStatsTable = new TopicStatsTable(); defaultMQAdminExt.start(); String topic = commandLine.getOptionValue('t').trim(); - TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic); + + if (commandLine.hasOption('c')) { + String cluster = commandLine.getOptionValue('c').trim(); + TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(cluster); + + for (BrokerData bd : topicRouteData.getBrokerDatas()) { + String addr = bd.selectBrokerAddr(); + if (addr != null) { + TopicStatsTable tst = defaultMQAdminExt.examineTopicStats(addr, topic); + topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable()); + } + } + } else { + topicStatsTable = defaultMQAdminExt.examineTopicStats(topic); + } List mqList = new LinkedList<>(); mqList.addAll(topicStatsTable.getOffsetTable().keySet()); diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java index fc5405e7472d..b24bd22db8f2 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java @@ -127,7 +127,7 @@ public void before() throws NoSuchFieldException, IllegalAccessException, Interr when(mQAdminImpl.queryMessageByUniqKey(anyString(), anyString())).thenReturn(retMsgExt); QueryResult queryResult = new QueryResult(0, Lists.newArrayList(retMsgExt)); - when(defaultMQAdminExtImpl.queryMessageByUniqKey(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult); + when(mQAdminImpl.queryMessageByUniqKey(anyString(), anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult); TopicRouteData topicRouteData = new TopicRouteData(); List brokerDataList = new ArrayList<>(); @@ -194,7 +194,7 @@ public void testExecuteConsumeActively() throws SubCommandException, Interrupted Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] args = new String[] {"-t myTopicTest", "-i msgId"}; + String[] args = new String[] {"-t myTopicTest", "-i msgId", "-c DefaultCluster"}; CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); @@ -218,7 +218,7 @@ public void testExecuteConsumePassively() throws SubCommandException, Interrupte Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] args = new String[] {"-t myTopicTest", "-i 7F000001000004D20000000000000066"}; + String[] args = new String[] {"-t myTopicTest", "-i 7F000001000004D20000000000000066", "-c DefaultCluster"}; CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); @@ -230,7 +230,7 @@ public void testExecuteWithConsumerGroupAndClientId() throws SubCommandException Options options = ServerUtil.buildCommandlineOptions(new Options()); - String[] args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"}; + String[] args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c DefaultCluster"}; CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); @@ -241,13 +241,13 @@ public void testExecute() throws SubCommandException { System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876"); - String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"}; + String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-c DefaultCluster"}; Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null); - args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId"}; + args = new String[] {"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000", "-g producerGroupName", "-d clientId", "-c DefaultCluster"}; commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new DefaultParser()); cmd.execute(commandLine, options, null);