欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

rocketmq如何保证消息顺序消费,rocketmq严格顺序消息

终极管理员 知识笔记 140阅读

RocketMQ如何保证消息被有序消费

消费者端如何接收有序消息

队列消费的两种模式

并发消费模式

当同一类消息被送入不同队列且这些消息在处理上并不需要按时序消费时可以考虑使用并发消费模式。

并发消费模式生产者会将消息轮询发送到不同的队列当中这些队列会和消费者实例建立多个连接线程将消息并发送入到不同的消费者因为消费者处理速度有快慢所以并不能保证物流数据会按1~9的顺序依次消费。

并发消费模式处理效率很高但无法保证有序性。

有序消费模式

有序消息是指生产者在产生数据的时候根据Hash规则指定让消息放入哪个队列在消费者消费时会保证不同消费者针对每一个队列只有唯一的连接线程用于消费指定队列。

有序消费模式可以保证消息按队列FIFO顺序依次被消费但因此失去并发性能有序消费模式只有在业务要求必须按顺序消费的场景下才允许使用。

RocketMQ如何实现有序消息

要实现RocketMQ有序消息需要两点调整

生产者端要求按id等唯一标识分配消息队列消费者端采用专用的监听器保证对队列的单线程应用

下面咱们来看一下代码

生产者端

SequenceMessageProvider核心代码是在向Broker发送消息时附加MessageQueueSelector对象在实现select方法时指定存放到哪个队列中。

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import java.nio.charset.StandardCharsets;import java.util.List;Slf4jpublic class SequenceMessageProvider {    public static void main(String[] args) {        // 前置准备代码        DefaultMQProducer producer  new DefaultMQProducer(producer-group);        producer.setNamesrvAddr(127.0.0.1:9876);        try {            producer.start();            // 模拟10笔订单            for (Integer orderId  1; orderId < 10; orderId) {                // 每笔订单要发3条消息1创建订单 2订单库存扣减 3增加积分                for (int i  0; i < 3; i) {                    String data  ;                    switch (i % 3) {                        case 0:                            data  orderId  号创建订单;                            break;                        case 1:                            data  orderId  号订单减少库存;                            break;                        case 2:                            data  orderId  号订单增加积分;                            break;                    }                    // 创建消息对象 topicorder,tagsorder,keyorderId                    Message message  new Message(order, order, orderId.toString(), data.getBytes(StandardCharsets.UTF_8));                    // 发送消息实现MessageQueueSelector接口                    SendResult result  producer.send(message, new MessageQueueSelector() {                        // select方法决定向broker哪一个队列发送消息                        Override                        public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {                            int orderId  Integer.parseInt(msg.getKeys());                            int index  orderId % list.size();                            MessageQueue messageQueue  list.get(index);                            log.info(id:{}data:{}queue:{}, orderId, new String(msg.getBody()), messageQueue);                            return messageQueue;                        }                    }, null);                }            }        } catch (Exception exception) {            exception.printStackTrace();        } finally {            try {                producer.shutdown();                log.warn(连接已关闭);            } catch (Exception exception) {                exception.printStackTrace();            }        }    }}

消费者端

消费者端最大的变化是registerMessageListener监听器要实例化MessageListenerOrdery对象用于为每一个队列分配唯一的连接线程进行消费。

每一批消息从Broker投递给消费者都会触发consumeMessage()方法实现对消息的消费。

import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;Slf4jpublic class SequenceMessageConsumer {    public static void main(String[] args) throws Exception {        // 声明并初始化一个consumer        // 需要一个consumer group名字作为构造方法的参数        DefaultMQPushConsumer consumer  new DefaultMQPushConsumer(consumer-group);        // 同样也要设置NamesrvAddr地址须要与提供者的地址列表保持一致        consumer.setNamesrvAddr(127.0.0.1:9876);        // 设置consumer所订阅的Top 和 Tag*代表全部的Tag        consumer.subscribe(order, *);        // 注册消息监听者消费者端要增加MessageListenerOrderly监听器用于实现有序队列        consumer.registerMessageListener(new MessageListenerOrderly() {            Override            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {                // 遍历输出                list.forEach(msg -> {                    log.info({}{}{}, msg.getKeys(), new String(msg.getBody()), context.getMessageQueue());                });                return ConsumeOrderlyStatus.SUCCESS;            }        });        // 启动消费者        consumer.start();    }}

如何实现消息全局顺序消费

只需要再生产者固定将所有消息发往到0号队列即可保证全局有序这也意味着全局采用单线程消费执行效率极差。

    Override    public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {        MessageQueue messageQueue  list.get(0);        return messageQueue;    }

有序消费有什么使用限制吗

有序消费模式只支持集群模式CLUSTERING不支持广播模式BROADCASTING采用广播模式会无法接收到数据。

        // 设置为集群模式        consumer.setMessageModel(MessageModel.CLUSTERING); //支持有序消息默认模式        consumer.setMessageModel(MessageModel.BROADCASTING); //不支持有序消息
标签:
声明:无特别说明,转载请标明本文来源!