-
Notifications
You must be signed in to change notification settings - Fork 8.4k
RocketMQ en
RocketMQ is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.
It has the following features:
-
Strict order of message sending and consumption
-
Rich modes of message pulling
-
Horizontal scalability of consumers
-
Real-time message subscription
-
Billion-level message accumulation capability
-
Download RocketMQ
Download Latest Binary File of RocketMQ, and decompress it.
The decompressed directory is as follows:
apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
-
Start NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
-
Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
-
Send and Receive Messages
Send messages:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Output when the message is successfuly sent: SendResult [sendStatus=SEND_OK, msgId= …
Receive messages:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Output when the message is successfully received: ConsumeMessageThread_%d Receive New Messages: [MessageExt…
-
Disable Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using Spring Integration
.
Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.
There are two concepts in Spring Cloud Stream: Binder and Binding
-
Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.
For example, Kafka
uses KafkaMessageChannelBinder
, RabbitMQ
uses RabbitMessageChannelBinder
, while RocketMQ
uses RocketMQMessageChannelBinder
.
-
Binding: Includes Input Binding and Output Binding.
Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.
Now let’s use Spring Cloud Stream to write a simple code for sending and receiving messages:
MessageChannel messageChannel = new DirectChannel();
// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<? > message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// Message sending
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.
The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.
For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
This is the implementation architecture of Spring Cloud Stream RocketMQ Binder:
The implementation of RocketMQ Binder depend on the RocketMQ-Spring framework.
RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:
-
RocketMQTemplate
: Sending messages, including synchronous, asynchronous, and transactional messages. -
@RocketMQTransactionListener
: Listen and check for transaction messages. -
@RocketMQMessageListener
: Consume messages.
RocketMQMessageChannelBinder
is a standard implementation of Binder, it will build RocketMQInboundChannelAdapter
and RocketMQMessageHandler
internally.
RocketMQMessageHandler
will construct RocketMQTemplate
based on the Binding configuration. RocketMQTemplate
will convert the org.springframework.messaging.Message
message class of spring-messaging
module to the RocketMQ message class org.apache.rocketmq.common .message.Message
internally, then send it out.
RocketMQInboundChannelAdapter
will also construct RocketMQListenerBindingContainer
based on the Binding configuration, and RocketMQListenerBindingContainer
will start the RocketMQ Consumer
to receive the messages.
Note
|
RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration |
Currently Binder supports setting the relevant key in Header
to set the properties of the RocketMQ message.
For example, TAGS
, DELAY
, TRANSACTIONAL_ARG
, KEYS
, WAIT_STORE_MSG_OK
, FLAG
represent the labels corresponding to the RocketMQ message.
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key")
.setHeader("DELAY", "1");
Message message = builder.build();
output().send(message);
SCS RocketMQ Binder support MessageSource
,which can receive messages by pull mode:
@SpringBootApplication
@EnableBinding(MQApplication.PolledProcessor.class)
public class MQApplication {
private final Logger logger =
LoggerFactory.getLogger(MQApplication.class);
public static void main(String[] args) {
SpringApplication.run(MQApplication.class, args);
}
@Bean
public ApplicationRunner runner(PollableMessageSource source,
MessageChannel dest) {
return args -> {
while (true) {
boolean result = source.poll(m -> {
String payload = (String) m.getPayload();
logger.info("Received: " + payload);
dest.send(MessageBuilder.withPayload(payload.toUpperCase())
.copyHeaders(m.getHeaders())
.build());
}, new ParameterizedTypeReference<String>() { });
if (result) {
logger.info("Processed a message");
}
else {
logger.info("Nothing to do");
}
Thread.sleep(5_000);
}
};
}
public static interface PolledProcessor {
@Input
PollableMessageSource source();
@Output
MessageChannel dest();
}
}
- spring.cloud.stream.rocketmq.binder.name-server
-
The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).
Default:
127.0.0.1:9876
. - spring.cloud.stream.rocketmq.binder.access-key
-
The AccessKey of Alibaba Cloud Account.
Default: null.
- spring.cloud.stream.rocketmq.binder.secret-key
-
The SecretKey of Alibaba Cloud Account.
Default: null.
- spring.cloud.stream.rocketmq.binder.enable-msg-trace
-
Enable Message Trace feature for all producers and consumers.
Default:
true
. - spring.cloud.stream.rocketmq.binder.customized-trace-topic
-
The trace topic for message trace.
Default:
RMQ_SYS_TRACE_TOPIC
.
The following properties are available for RocketMQ producers only and must be prefixed with spring.cloud.stream.rocketmq.bindings.<channelName>.consumer.
.
- enable
-
Enable Consumer Binding.
Default:
true
. - tags
-
Consumer subscription tags expression, tags split by
||
.Default: empty.
- sql
-
Consumer subscription sql expression.
Default: empty.
- broadcasting
-
Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
Default:
false
. - orderly
-
Receiving message concurrently or orderly.
Default:
false
. - delayLevelWhenNextConsume
-
Message consume retry strategy for concurrently consume:
-
-1,no retry,put into DLQ directly
-
0,broker control retry frequency
-
>0,client control retry frequency
Default:
0
.
-
- suspendCurrentQueueTimeMillis
-
Time interval of message consume retry for orderly consume.
Default:
1000
.
The following properties are available for RocketMQ producers only and must be prefixed with spring.cloud.stream.rocketmq.bindings.<channelName>.producer.
.
- enable
-
Enable Producer Binding.
Default:
true
. - group
-
Producer group name.
Default: empty.
- maxMessageSize
-
Maximum allowed message size in bytes.
Default:
8249344
. - transactional
-
Send Transactional Message.
Default:
false
. - sync
-
Send message in synchronous mode.
Default:
false
. - vipChannelEnabled
-
Send message with vip channel.
Default:
true
. - sendMessageTimeout
-
Millis of send message timeout.
Default:
3000
. - compressMessageBodyThreshold
-
Compress message body threshold, namely, message body larger than 4k will be compressed on default.
Default:
4096
. - retryTimesWhenSendFailed
-
Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
Default:
2
. - retryTimesWhenSendAsyncFailed
-
Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
Default:
2
. - retryNextServer
-
Indicate whether to retry another broker on sending failure internally.
Default:
false
.
- 文档
- Documents
- Open Source components
- Commercial components
- Example
- awesome spring cloud alibaba