Skip to content

RocketMQ en

format edited this page Aug 2, 2019 · 1 revision

Spring Cloud Alibaba RocketMQ Binder

Introduction of RocketMQ

RocketMQ is an open-source distributed message system. It is based on highly available distributed cluster technologies and provides message publishing and subscription service with low latency and high stability. RocketMQ is widely used in a variety of industries, such as decoupling of asynchronous communication, enterprise sulotions, financial settlements, telecommunication, e-commerce, logistics, marketing, social media, instant messaging, mobile applications, mobile games, vedios, IoT, and Internet of Vehicles.

It has the following features:

  • Strict order of message sending and consumption

  • Rich modes of message pulling

  • Horizontal scalability of consumers

  • Real-time message subscription

  • Billion-level message accumulation capability

RocketMQ Usages

  • Download RocketMQ

Download Latest Binary File of RocketMQ, and decompress it.

The decompressed directory is as follows:

apache-rocketmq
├── LICENSE
├── NOTICE
├── README.md
├── benchmark
├── bin
├── conf
└── lib
  • Start NameServer

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
  • Start Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
  • Send and Receive Messages

Send messages:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

Output when the message is successfuly sent: SendResult [sendStatus=SEND_OK, msgId= …​

Receive messages:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

Output when the message is successfully received: ConsumeMessageThread_%d Receive New Messages: [MessageExt…​

  • Disable Server

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

Introduction of Spring Cloud Stream

Spring Cloud Stream is a microservice framework used to build architectures based on messages. It helps you to create production-ready single-server Spring applications based on SpringBoot, and connects with Broker using Spring Integration.

Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.

There are two concepts in Spring Cloud Stream: Binder and Binding

  • Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.

For example, Kafka uses KafkaMessageChannelBinder, RabbitMQ uses RabbitMessageChannelBinder, while RocketMQ uses RocketMQMessageChannelBinder.

  • Binding: Includes Input Binding and Output Binding.

Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.

SCSt overview
Figure 1. Spring Cloud Stream

Now let’s use Spring Cloud Stream to write a simple code for sending and receiving messages:

MessageChannel messageChannel = new DirectChannel();

// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<? > message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// Message sending
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

All the message types in this code are provided by the `spring-messaging`module. It shields the lower-layer implementations of message middleware. If you would like to change the message middleware, you only need to configure the related message middleware information in the configuration file and modify the binder dependency.

The lower layer of Spring Cloud Stream also implements various code abstractions based on the previous code.

How to use Spring Cloud Alibaba RocketMQ Binder

For using the Spring Cloud Alibaba RocketMQ Binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>

Alternatively, you can also use the Spring Cloud Stream RocketMQ Starter:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

How Spring Cloud Alibaba RocketMQ Binder Works

This is the implementation architecture of Spring Cloud Stream RocketMQ Binder:

TB1v8rcbUY1gK0jSZFCXXcwqXXa 1236 773
Figure 2. SCS RocketMQ Binder

The implementation of RocketMQ Binder depend on the RocketMQ-Spring framework.

RocketMQ Spring framework is an integration of RocketMQ and Spring Boot. It provides three main features:

  1. RocketMQTemplate: Sending messages, including synchronous, asynchronous, and transactional messages.

  2. @RocketMQTransactionListener: Listen and check for transaction messages.

  3. @RocketMQMessageListener: Consume messages.

RocketMQMessageChannelBinder is a standard implementation of Binder, it will build RocketMQInboundChannelAdapter and RocketMQMessageHandler internally.

RocketMQMessageHandler will construct RocketMQTemplate based on the Binding configuration. RocketMQTemplate will convert the org.springframework.messaging.Message message class of spring-messaging module to the RocketMQ message class org.apache.rocketmq.common .message.Message internally, then send it out.

RocketMQInboundChannelAdapter will also construct RocketMQListenerBindingContainer based on the Binding configuration, and RocketMQListenerBindingContainer will start the RocketMQ Consumer to receive the messages.

Note
RocketMQ Binder Application can also be used to configure rocketmq.** to trigger RocketMQ Spring related AutoConfiguration

Currently Binder supports setting the relevant key in Header to set the properties of the RocketMQ message.

For example, TAGS, DELAY, TRANSACTIONAL_ARG, KEYS, WAIT_STORE_MSG_OK, FLAG represent the labels corresponding to the RocketMQ message.

MessageBuilder builder = MessageBuilder.withPayload(msg)
    .setHeader(RocketMQHeaders.TAGS, "binder")
    .setHeader(RocketMQHeaders.KEYS, "my-key")
    .setHeader("DELAY", "1");
Message message = builder.build();
output().send(message);

Support MessageSource

SCS RocketMQ Binder support MessageSource,which can receive messages by pull mode:

@SpringBootApplication
@EnableBinding(MQApplication.PolledProcessor.class)
public class MQApplication {

  private final Logger logger =
  	  LoggerFactory.getLogger(MQApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(MQApplication.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

Configuration Options

RocketMQ Binder Properties

spring.cloud.stream.rocketmq.binder.name-server

The name server of RocketMQ Server(Older versions use the namesrv-addr configuration item).

Default: 127.0.0.1:9876.

spring.cloud.stream.rocketmq.binder.access-key

The AccessKey of Alibaba Cloud Account.

Default: null.

spring.cloud.stream.rocketmq.binder.secret-key

The SecretKey of Alibaba Cloud Account.

Default: null.

spring.cloud.stream.rocketmq.binder.enable-msg-trace

Enable Message Trace feature for all producers and consumers.

Default: true.

spring.cloud.stream.rocketmq.binder.customized-trace-topic

The trace topic for message trace.

Default: RMQ_SYS_TRACE_TOPIC.

RocketMQ Consumer Properties

The following properties are available for RocketMQ producers only and must be prefixed with spring.cloud.stream.rocketmq.bindings.<channelName>.consumer..

enable

Enable Consumer Binding.

Default: true.

tags

Consumer subscription tags expression, tags split by ||.

Default: empty.

sql

Consumer subscription sql expression.

Default: empty.

broadcasting

Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.

Default: false.

orderly

Receiving message concurrently or orderly.

Default: false.

delayLevelWhenNextConsume

Message consume retry strategy for concurrently consume:

  • -1,no retry,put into DLQ directly

  • 0,broker control retry frequency

  • >0,client control retry frequency

    Default: 0.

suspendCurrentQueueTimeMillis

Time interval of message consume retry for orderly consume.

Default: 1000.

RocketMQ Provider Properties

The following properties are available for RocketMQ producers only and must be prefixed with spring.cloud.stream.rocketmq.bindings.<channelName>.producer..

enable

Enable Producer Binding.

Default: true.

group

Producer group name.

Default: empty.

maxMessageSize

Maximum allowed message size in bytes.

Default: 8249344.

transactional

Send Transactional Message.

Default: false.

sync

Send message in synchronous mode.

Default: false.

vipChannelEnabled

Send message with vip channel.

Default: true.

sendMessageTimeout

Millis of send message timeout.

Default: 3000.

compressMessageBodyThreshold

Compress message body threshold, namely, message body larger than 4k will be compressed on default.

Default: 4096.

retryTimesWhenSendFailed

Maximum number of retry to perform internally before claiming sending failure in synchronous mode.

Default: 2.

retryTimesWhenSendAsyncFailed

Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.

Default: 2.

retryNextServer

Indicate whether to retry another broker on sending failure internally.

Default: false.

Clone this wiki locally