RocketMQ如何保证消息的顺序性
和Kafka只支持同一个 Partition 内消息的顺序性一样,RocketMQ 中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的
当我们作为 MQ 的生产者需要发送顺序消息时,需要在 send 方法中,传入一个 MessageQueueSelector。
MessageQueueSelector 中需要实现一个 select 方法,这个方法就是用来定义要把消息发送到哪个 MessageQueue 的,通常可以使用取模法进行路由:
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { |
通过以上形式就可以将需要有序的消息发送到同一个队列中。
RocketMQ 的 MessageListener 回调函数提供了两种消费模式,有序消费模式 MessageListenerOrderly 和并发消费模式 MessageListenerConcurrently。所以,想要实现顺序消费,需要使用 MessageListenerOrderly 模式接收消息:
consumer.registerMessageListener(new MessageListenerOrderly() { |
当我们用以上方式注册一个消费之后,为了保证同一个队列中的有序消息可以被顺序消费,就要保证 RocketMQ 的 Broker 只会把消息发送到同一个消费者上,这时候就需要加锁了。
在实现中,ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,会尝试向 Broker 为当前消费者客户端申请分布式锁。如果获取成功,那么后续消息将会只发给这个 Consumer。
接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入 ProcessQueue,同时将消息提交到消费线程池进行执行。
顺序消费存在的问题
RocketMQ 的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻塞,会导致更多消息阻塞。所以,顺序消息需要慎用。
