rocketmq如何保证消息顺序消费,rocketmq严格顺序消息
终极管理员 知识笔记 140阅读
RocketMQ如何保证消息被有序消费
消费者端如何接收有序消息
队列消费的两种模式
并发消费模式
当同一类消息被送入不同队列且这些消息在处理上并不需要按时序消费时可以考虑使用并发消费模式。
并发消费模式生产者会将消息轮询发送到不同的队列当中这些队列会和消费者实例建立多个连接线程将消息并发送入到不同的消费者因为消费者处理速度有快慢所以并不能保证物流数据会按1~9的顺序依次消费。
并发消费模式处理效率很高但无法保证有序性。
有序消费模式
有序消息是指生产者在产生数据的时候根据Hash规则指定让消息放入哪个队列在消费者消费时会保证不同消费者针对每一个队列只有唯一的连接线程用于消费指定队列。
有序消费模式可以保证消息按队列FIFO顺序依次被消费但因此失去并发性能有序消费模式只有在业务要求必须按顺序消费的场景下才允许使用。
RocketMQ如何实现有序消息
要实现RocketMQ有序消息需要两点调整
生产者端要求按id等唯一标识分配消息队列消费者端采用专用的监听器保证对队列的单线程应用下面咱们来看一下代码
生产者端
SequenceMessageProvider核心代码是在向Broker发送消息时附加MessageQueueSelector对象在实现select方法时指定存放到哪个队列中。
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import java.nio.charset.StandardCharsets;import java.util.List;Slf4jpublic class SequenceMessageProvider { public static void main(String[] args) { // 前置准备代码 DefaultMQProducer producer new DefaultMQProducer(producer-group); producer.setNamesrvAddr(127.0.0.1:9876); try { producer.start(); // 模拟10笔订单 for (Integer orderId 1; orderId < 10; orderId) { // 每笔订单要发3条消息1创建订单 2订单库存扣减 3增加积分 for (int i 0; i < 3; i) { String data ; switch (i % 3) { case 0: data orderId 号创建订单; break; case 1: data orderId 号订单减少库存; break; case 2: data orderId 号订单增加积分; break; } // 创建消息对象 topicorder,tagsorder,keyorderId Message message new Message(order, order, orderId.toString(), data.getBytes(StandardCharsets.UTF_8)); // 发送消息实现MessageQueueSelector接口 SendResult result producer.send(message, new MessageQueueSelector() { // select方法决定向broker哪一个队列发送消息 Override public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) { int orderId Integer.parseInt(msg.getKeys()); int index orderId % list.size(); MessageQueue messageQueue list.get(index); log.info(id:{}data:{}queue:{}, orderId, new String(msg.getBody()), messageQueue); return messageQueue; } }, null); } } } catch (Exception exception) { exception.printStackTrace(); } finally { try { producer.shutdown(); log.warn(连接已关闭); } catch (Exception exception) { exception.printStackTrace(); } } }}
消费者端
消费者端最大的变化是registerMessageListener监听器要实例化MessageListenerOrdery对象用于为每一个队列分配唯一的连接线程进行消费。
每一批消息从Broker投递给消费者都会触发consumeMessage()方法实现对消息的消费。
import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;Slf4jpublic class SequenceMessageConsumer { public static void main(String[] args) throws Exception { // 声明并初始化一个consumer // 需要一个consumer group名字作为构造方法的参数 DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer-group); // 同样也要设置NamesrvAddr地址须要与提供者的地址列表保持一致 consumer.setNamesrvAddr(127.0.0.1:9876); // 设置consumer所订阅的Top 和 Tag*代表全部的Tag consumer.subscribe(order, *); // 注册消息监听者消费者端要增加MessageListenerOrderly监听器用于实现有序队列 consumer.registerMessageListener(new MessageListenerOrderly() { Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) { // 遍历输出 list.forEach(msg -> { log.info({}{}{}, msg.getKeys(), new String(msg.getBody()), context.getMessageQueue()); }); return ConsumeOrderlyStatus.SUCCESS; } }); // 启动消费者 consumer.start(); }}
如何实现消息全局顺序消费
只需要再生产者固定将所有消息发往到0号队列即可保证全局有序这也意味着全局采用单线程消费执行效率极差。
Override public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) { MessageQueue messageQueue list.get(0); return messageQueue; }
有序消费有什么使用限制吗
有序消费模式只支持集群模式CLUSTERING不支持广播模式BROADCASTING采用广播模式会无法接收到数据。
// 设置为集群模式 consumer.setMessageModel(MessageModel.CLUSTERING); //支持有序消息默认模式 consumer.setMessageModel(MessageModel.BROADCASTING); //不支持有序消息