-
Notifications
You must be signed in to change notification settings - Fork 896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the issue of sending message "No route information of this topic: xxx" when the producer does not configure the namespace #619
Conversation
Do you have detailed error information? The judgment of namespance is done in rocketmq-client org.apache.rocketmq.common.protocol.NamespaceUtil#wrapNamespace |
1、My MQ instance has been released and I am currently unable to test and obtain detailed error information. 5、For ease of understanding, you can refer to the following program to manually build RocketMQTemplate instances to reproduce the problem public class Test3 {
public static void main(String[] args) throws Exception {
String groupId = "xxxx";
String accessKey = "xxx";
String secretKey = "xxx";
String onsAddr = "xxx";
RocketMQProperties properties = new RocketMQProperties();
RocketMQProperties.Producer p = new RocketMQProperties.Producer();
p.setGroup(groupId);
p.setAccessKey(accessKey);
p.setSecretKey(secretKey);
properties.setNameServer(onsAddr);
properties.setProducer(p);
DefaultMQProducer producer = defaultMQProducer(properties);
producer.start();
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(producer);
rocketMQTemplate.setMessageConverter(new MappingFastJsonMessageConverter());
SendResult sendResult = rocketMQTemplate.syncSend("TP_TEST1", "hello");
System.out.println("msgId:" + sendResult);
}
/**
* This method is derived from:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultMQProducer(org.apache.rocketmq.spring.autoconfigure.RocketMQProperties)
* @param rocketMQProperties
* @return
*/
public static DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();
DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
producer.setNamesrvAddr(nameServer);
if (StringUtils.hasLength(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setUseTLS(producerConfig.isTlsEnable());
producer.setNamespace(producerConfig.getNamespace());
producer.setInstanceName(producerConfig.getInstanceName());
return producer;
}
} |
In the Default MQProducer, this line of code will be executed before sending a message: "msg. setTopic (this. with Namespace (msg. getTopic()); ". If "org. apache. locketmq. client. ClientConfig# namespaceInitialized" is false, it means that the namespace has not been initialized and the program will automatically initialize. But because in "org. apache. locketmq. spring. autoconfiguration. RocketMQAutoConfiguration # defaultMQProducer", even if a namespace is not configured, the value will still be affected by executing the code: "producer. setNamespace (productConfiguration. getNamespace())" And set to true. As a result, when sending messages, the defaultMQProducer mistakenly thought of the namespace. |
What is the purpose of the change
Solve the problem of sending message errors when producers do not configure namespaces。
In the current situation, when the namespace is not configured, the method of setting the namespace is called when instantiating the bean, causing the producer to initialize the namespace to be empty, but 'ClientConfig # namespaceInitialized' is set to true. This leads to the producer sending an error message: "No route information of this topic: xxx"
Brief changelog
When instantiating beans ("defaultMQProducer", "defaultLitePullConsumer"), the set namespace method is called only when the namespace is configured。
Verifying this change
Verified!
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR
.[ISSUE #123] Fix UnknownException when host config not exist
. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle
to make sure basic checks pass. Runmvn clean install -DskipITs
to make sure unit-test pass. Runmvn clean test-compile failsafe:integration-test
to make sure integration-test pass.