Kafka只支持同一个 Partition 内消息的顺序性一样,RocketMQ 中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的

当我们作为 MQ 的生产者需要发送顺序消息时,需要在 send 方法中,传入一个 MessageQueueSelector

MessageQueueSelector 中需要实现一个 select 方法,这个方法就是用来定义要把消息发送到哪个 MessageQueue 的,通常可以使用取模法进行路由:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
//mqs:该Topic下所有可选的MessageQueue
//msg:待发送的消息
//arg:发送消息时传递的参数
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
//根据参数,计算出一个要接收消息的MessageQueue的下标
int index = id % mqs.size();
//返回这个MessageQueue
return mqs.get(index);
}
}, orderId);

通过以上形式就可以将需要有序的消息发送到同一个队列中。

RocketMQMessageListener 回调函数提供了两种消费模式,有序消费模式 MessageListenerOrderly 和并发消费模式 MessageListenerConcurrently。所以,想要实现顺序消费,需要使用 MessageListenerOrderly 模式接收消息:

consumer.registerMessageListener(new MessageListenerOrderly() {
Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs ,ConsumeOrderlyContext context) {
System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS ;
}
});

当我们用以上方式注册一个消费之后,为了保证同一个队列中的有序消息可以被顺序消费,就要保证 RocketMQBroker 只会把消息发送到同一个消费者上,这时候就需要加锁了。

在实现中,ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,会尝试向 Broker 为当前消费者客户端申请分布式锁。如果获取成功,那么后续消息将会只发给这个 Consumer

接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入 ProcessQueue,同时将消息提交到消费线程池进行执行。

顺序消费存在的问题

RocketMQ 的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻塞,会导致更多消息阻塞。所以,顺序消息需要慎用。