rabbitmq详细教程,rabbitmq入门到精通
终极管理员 知识笔记 129阅读
日升时奋斗日落时自省
目录

1、MQ基本概念
2、RabbitMQ简介

3、RabbitMQ模式
3.1、RabbitMQ安装linux
3.1.1、直接安装
3.1.2、docker拉取
3.2、项目搭建
3.2.1、创建项目
3.2.2、配置
3.3、简单模式
3.3、Work queues 工作队列模式
3.4、Pub/Sub 订阅模式
3.4.1、fanout类型
3.4.2、direct类型
3.4.3、topic类型
3.5、死信队列
3.6、Lazy Queue
4、确认机制
5、重试机制
注该博客有点长重复内容不多希望友友们慢慢观看
1、MQ基本概念1应用解耦
服务与服务之前进行远程调用时不需要直接调度降低了关联性服务直接给中间件发送消息通过MQ消息给调度服务做到降低服务与服务之间耦合性
2异步提速
如何做到异步提速呢主要就是MQ作为消息中间人进行转发告诉其他服务这边已经好了你们可以开始了为啥说这里的异步快因为Rabbitmq请求速度是微妙级别的速度相比远程调用的请求速度是要快的多的
3削峰填谷
先来理解一下削峰填谷图有点抽象麻烦友友们慢慢看一下暂时也没有想到很突显的图
如何做到削峰填谷呢
MQ的产品
2、RabbitMQ简介AMQP 即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。
RabbitMQ基础架构图
下面来解释以下怎么看这个图producer就是提供者通过channel频道连接到Exchange交换机交换机通过虚拟用户绑定一个Queue队列consumer就是消费者通过channel频道直接去Queue队列里拿消息前提是要有消息
Virtual host 出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似于网络中的 namespace 进行隔离概念。当多个不同的用户使用同一个 RabbitMQ server提供的服务时可以划分出多个vhost每个用户在自己的 vhost 创建 exchange queue 等
Connection publisher consumer 和 broker 之间的 TCP 连接
Channel 如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection的开销将是巨大的效率也较低。 Channel 是在 connection 内部建立的逻辑连接如果应用程序支持多线程通常每个thread创建单独的 channel 进行通讯 AMQP method 包含了channel id 帮助客户端和message broker 识别 channel所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销
Exchange message 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发消息到queue 中去。常用的类型有 direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue 消息最终被送到这里等待 consumer 取走
Binding exchange 和 queue 之间的虚拟连接 binding 中可以包含 routing key。 Binding 信息被保存到 exchange 中的查询表中用于 message 的分发依据
去官网上看有以下六种模式友友们可以直接来官网上看RabbitMQ Tutorials — RabbitMQ
3.1、RabbitMQ安装linux 3.1.1、直接安装RabbitMQ安装在linux上需要一定的依赖环境
这里直接给友友们 配上如果是Ubuntu的直接yum换成apt就行
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz
还需要一个前置环境Erlang从下面进行去安装包也可以去对应官网取
链接: 提取码: 1234
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
开启管理界面
rabbitmq-plugins enable rabbitmq_management
修改默认配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
注进来后找这个位置只改这一行改成下面的样子就可以了一会要靠这个登录呢
启动RabbitMQ
service rabbitmq-server start
访问地址ip:15672 如果界面打不开可能是防火墙没有关或者云服务器安全组没有开放针对问题稍微百度一下即可
注15672是RabbitMQ的Web管理界面的默认监听端口5672是AMQP协议的默认端口25672是RabbitMQ集群间节点通信的默认端口当前我们使用管理页面访问15672接口即可
注账号密码都是guest
3.1.2、docker拉取mq.tar文件还是从
docker直接拉取也挺好用的先把tar文件加载一下
docker load -i mq.tar
然后创建一个网络
docker network create rabbtimq-maxxub
剩下的跑起来就行 -e 是环境变量其实就是设置登录名和密码
到时候登录名maxxub 密码就是1223456
docker run \
-e RABBITMQ_DEFAULT_USERmaxxub \
-e RABBITMQ_DEFAULT_PASS123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network rabbtimq-maxxub \
-d \
rabbitmq:3.8-management
3.2、项目搭建 3.2.1、创建项目1父工程
直接上springboot先创建父工程目起个名字我这里叫mq-demo以下是需要的依赖
<modules> <module>publisher</module> <module>consumer</module> </modules> <packaging>pom</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.12</version> <relativePath/> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!--AMQP依赖包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--单元测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> </dependencies>
2创建消费者模块和提供者模块子模块不需要依赖
3.2.2、配置application.yml 消费者和提供者都可以使用同一份复制过去就行了
下面需要友友们知道有虚拟用户这个东西
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS level: com.itheima: debugspring: rabbitmq: host: 这里写友友们自己的虚拟机ip或者云服务器ip port: 5672 virtual-host: / #虚拟用户 username: maxxub #登录rabbitmq的账号 password: 123456 #登录rabbitmq的密码 listener: simple: prefetch: 1 acknowledge-mode: auto #模式自动 进行一定的重试次数 retry: enabled: true # 超时重试 initial-interval: 1000ms multiplier: 2 #连接失败的重试 max-attempts: 3 #最大重试次数的 stateless: true #针对事务默认为false true针对上下文有定义保存是一个状态
3.3、简单模式 P表示producer 提供者
红色框Queue 队列
C表示consumer 消费者
简介叙述直接通过队列进行比较简单没有啥条件限制
提供者这里直接就在测试进行开始写了
消息发送依赖于RabbitTemplate类这个类直接注入就行了
Slf4jSpringBootTestpublic class SpringAmqpTest { Autowired private RabbitTemplate rabbitTemplate; Test void testSendMessage2Queue(){ String queueNamesimple.queue; //我们这里写队列名称一会在管理页面创建 String msghello,ampq!; //消息 rabbitTemplate.convertAndSend(queueName,msg); //参数队列 消息 }}
消费者
先去创建一个队列直接到rabbitmq管理页面进行创建就行了ip:15672 直接登录既可以
消费者的代码
SpringBootApplicationpublic class ConsumerApplication { //启动类 public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }}
Slf4jComponentpublic class MqListener { RabbitListener(queues simple.queue) //使用该注解监听队列消息 public void listenSimpleQueue(String msg){ System.out.println(消费者收到simple.queue的消息msg); }}
注先启动消费者的启动类 再启动提供者的测试代码消费者会收到消息并且直接打印
3.3、Work queues 工作队列模式P表示producer 提供者
红色框Queue 队列
C表示consumer 消费者
注这次有两个消费者其实也可以是多个消费者
与简单模式没有很大的区别使用同一个队列
消费者很简单没有任何区别就是添加了一个方法 该方法还是写在MqListener 类中
Slf4jComponentpublic class MqListener { RabbitListener(queues simple.queue) public void listenSimpleQueue1(String msg) throws InterruptedException { System.out.println(消费者1收到simple.queue的消息msg); Thread.sleep(20); } RabbitListener(queues simple.queue) public void listenSimpleQueue2(String msg) throws InterruptedException { System.err.println(消费者2收到simple.queue的消息msg); Thread.sleep(200); }}
注这里加了睡眠时间是为了下面演示做准备
提供者测试一下写在producer测试代码 其实也没用改啥就是给这个队列多发点消息
Slf4jSpringBootTestpublic class SpringAmqpTest { Autowired private RabbitTemplate rabbitTemplate; Test void testSendMessage2Queue() throws InterruptedException { String queueNamesimple.queue; for(int i1;i<50;i){ String msghello,ampq!; rabbitTemplate.convertAndSend(queueName,msg); Thread.sleep(20); } }}
注先启动消费者启动类在启动提供者测试类
listener: simple: prefetch: 1 #每次每次只处理一个消息 acknowledge-mode: auto retry: enabled: true # 超时重试 initial-interval: 1000ms #设定重试时间 multiplier: 2 #连接失败的重试 max-attempts: 3 #最大重试次数 stateless: true
这里给消费者2设置等待时间是200ms秒消费者设置等待时间是20ms处理是及时处理也就是一部分是消费者1一部分是消费者2
这里展示的结果跟我们前面的配置参数有关
listener: simple: prefetch: 1 #每次每次只处理一个消息 acknowledge-mode: auto retry: enabled: true # 超时重试 initial-interval: 1000ms #设定重试时间 multiplier: 2 #连接失败的重试 max-attempts: 3 #最大重试次数 stateless: true
如果把prefetch: 1该参数去掉消费者2等待时间为200ms>>20ms所以会进行等待处理导致消息处理饥饿
3.4、Pub/Sub 订阅模式P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机
C消费者消息的接收者会一直等待消息到来
Queue消息队列接收消息、缓存消息
Exchange交换机X一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。 Exchange有常见以下3种类型
简单说三种常见交换机类型fanout、direct、topic
有交换机和队列了我们简单教友友们了解一下如何绑定交换机和队列关系在管理页面
后面就开始给友友们代码直接创建交换机和队列并且进行绑定这是比较好的操作
3.4.1、fanout类型下面这个位置创建一个类
直接上代码代码创建交换机和队列并且进行绑定这里创建队列与管理页面创建效果都是一样的就是更快也不容易出问题消费者跑起来以后可以去管理页面看交换机和队列都是存在的
Configurationpublic class FanoutConfiguration { Bean //创建FanoutExchange类型交换机 以下是两种方法 两个选择一个就行 public FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange(maxxub.fanout).build(); //使用Builder创建 return new FanoutExchange(maxxub.fanout2); //直接new一个交换机对象 参数填写交换机名称 } Bean //创建Queue类型队列 以下是两种方法 两个选择一个就行 public Queue fanoutQueue3(){// QueueBuilder.durable(fanout.queue3).build(); //使用Builder创建 return new Queue(fanout.queue3); //直接new一个队列对象 参数填写队列名称 } Bean //进行队列和交换机绑定 使用绑定构建 public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){ //直接看 BindingBuilder 就是 bind绑定fanoutQueue3队列 to到 fanoutExchange交换机上 return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange); } Bean public Queue fanoutQueue4(){// QueueBuilder.durable(fanout.queue4).build(); return new Queue(fanout.queue4); } Bean public Binding fanoutBinding4(){ //可以直接调用方法 return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange()); }}
消费者MqListener类中写就行 还是同样的监听
RabbitListener(queues fanout.queue3) public void listenFanoutQueue3(String msg) throws InterruptedException { System.out.println(消费者收到fanout.queue3的消息msg); Thread.sleep(200); } RabbitListener(queues fanout.queue4) public void listenFanoutQueue4(String msg) throws InterruptedException { System.err.println(消费者收到fanout.queue4的消息msg); Thread.sleep(200); }
提供者写在producer的测试类中
注fanout刻画类型是没有路由的所这里就是null我们本身也没有设置路由
Test void testFanoutQueue(){ //前面创建的是maxxub.fanout2交换机所以这里也绑定maxxub.fanout2 String exchangeNamemaxxub.fanout2; //交换机名称 String msghello , amqp; //消息 //参数交换机、路由、消息 rabbitTemplate.convertAndSend(exchangeName,null,msg); }
注先启动消费者启动类 再启动提供者测试方法效果如下
3.4.2、direct类型就拿这个图来分析一下这里的P提供者连接X交换机设置不同Routing路由给两个队列相当于给两个队列加上了特殊的标签black路由标签和green路由标签给了Q2队列这个队列就负责接收blackgreen标签的消息其他的消息不归我管orange路由标签给Q1队列这个队列就负责接收orange标签的消息
创建Direct交换机和队列并设置路由进行绑定 不用的时候把Configuration注解注释掉
Configurationpublic class DirectConfiguration { Bean //创建DirectExchange交换机 以下是两种方法 进行创建 public DirectExchange directExchange(){ //参数就是 交换机的名称// ExchangeBuilder.fanoutExchange(maxxub.direct).build(); //使用Builder进行创建交换机 return new DirectExchange(maxxub.direct); } Bean //创建 队列 public Queue directQueue3(){ //填写参数就是 队列名称// QueueBuilder.durable(direct.queue1).build(); return new Queue(direct.queue1); } Bean //进行交换机和队列的绑定 同时添加上路由 public Binding directBinding1(Queue directQueue3,DirectExchange directExchange){ //解释 BindingBuilder bind绑定directQueue3队列 to到directExchange with带上路由red return BindingBuilder.bind(directQueue3).to(directExchange).with(red); } Bean public Binding directBinding2(Queue directQueue3,DirectExchange directExchange){ //解释 BindingBuilder bind绑定directQueue3队列 to到directExchange with带上路由blue return BindingBuilder.bind(directQueue3).to(directExchange).with(blue); } Bean public Queue directQueue4(){// QueueBuilder.durable(direct.queue2).build(); return new Queue(direct.queue2); } Bean public Binding directBinding4(){ //可以直接调用方法 于上面是雷同的交换机一个就够了这里再给交换机绑定队列和路由就行了 return BindingBuilder.bind(directQueue4()).to(directExchange()).with(red); } Bean public Binding directBinding3(){ //可以直接调用方法 return BindingBuilder.bind(directQueue4()).to(directExchange()).with(yellow); }}
注这种方法比较麻烦如果需要绑定多个路由或者队列还是很不方便这里只作为演示下面开始使用注解进行创建交换机和队列进行绑定并且带上路由
消费者MqListener类中写就行 还是同样的监听
参数解释
bindings就是绑定参数
QueueBinding注解中的参数value就是需要绑定的队列name队列名称durable就是消息持久化
QueueBinding注解中的参数exchange就是被绑定的交换机name交换机名称type交换机类型
参数key就是路由{可以直接写多个路由参数}
注好处就是能节省很多代码很简单并且很很清晰
RabbitListener(bindings QueueBinding( value Queue(name direct.queue1,durable true), exchange Exchange(name maxxub.direct,type ExchangeTypes.DIRECT), key {red,blue})) public void listenDirectQueue1(String msg) throws InterruptedException { System.out.println(消费者1收到direct.queue1的消息msg); Thread.sleep(200); } RabbitListener(bindings QueueBinding( value Queue(name direct.queue2,durable true), exchange Exchange(name maxxub.direct,type ExchangeTypes.DIRECT), key {red,yellow})) public void listenDirectQueue2(String msg) throws InterruptedException { System.out.println(消费者2收到direct.queue2的消息msg); Thread.sleep(200); }
提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示
Test void testDirectQueue(){ String exchangeNamemaxxub.direct; String msg红色警报; //参数交换机路由消息 rabbitTemplate.convertAndSend(exchangeName,red,msg); }
注先启动消费者启动类在启动提供者的测试方法因为两个队列都有路由“red”所以两个对哦都会收到消息
3.4.3、topic类型路由使用通配符的形式进行
*代表一个字符
#代表0个或者多个字符
英文大概意思就是这里路由是通过带有通配符进行的和direct类型不同的就是通配符能更灵活的设置路由类似标签
消费者MqListener类中写就行 还是同样的监听 这里没有啥改变基本就是key改了一点点
RabbitListener(bindings QueueBinding( value Queue(name topic.queue1,durable true), exchange Exchange(name maxxub.topic,type ExchangeTypes.TOPIC), key {#.new})) public void listenTopicQueue1(String msg) throws InterruptedException { System.out.println(消费者1收到topic.queue1的消息); Thread.sleep(200); } RabbitListener(bindings QueueBinding( value Queue(name topic.queue2,durable true), exchange Exchange(name maxxub.topic,type ExchangeTypes.TOPIC), key {china.#})) public void listenTopicQueue2(String msg) throws InterruptedException { System.out.println(消费者2收到topic.queue2的消息msg); Thread.sleep(200); }
提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示
Test void testTopicQueue(){ String exchangeNamemaxxub.topic; String msg中国繁荣; rabbitTemplate.convertAndSend(exchangeName,china.strong,msg); }
注先启动消费者启动类在启动提供者的测试方法因为两个队列都有路由“red”所以两个对哦都会收到消息
解释这里china.#的路由才能满足条件所以只有topic.queue2队列接收到了消息
3.5、死信队列死信队列英文缩写 DLX 。 Dead Letter Exchange死信交换机当消息成为Dead message后可以被重新发送到另一个交换机这个交换机就是DLX
主要就是为了处理消息丢失的现象如果消息长时间处理不带或者确认机制设置的问题导致消息丢失了怎么办不能说丢了就丢了吧死信队列就是一个比较好的办法去解决
就是给普通的消息队列设置一个消息过期时间在一定时间内没有被处理掉的那就把它放到私信队列中
思路我们给simple.queue队列发消息设置过期时间10000ms但是这个队列没有提供者同时给他绑定一个死信交换机和死信队列消息在10000ms以后会流入死信队列中
先做前置操作
上面是一步一步的过程下面是基本操作流程
消费者MqListener类中写就行这里只写死信队列的就可以了simple队列是故意不让他拿到消息的为了能够超过过期时间
RabbitListener(queues dlx.queue) public void listenDlxQueue2(String msg) throws InterruptedException { log.info(消费者收到dlx.queue的消息msg); Thread.sleep(200); }
提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示
参数没有很多的变化消息队列、路由、消息、一个消息处理器的对象进行设置消息过期时间
//延迟队列 Test void testSendTTLMessage(){ rabbitTemplate.convertAndSend(simple.direct, hi, hello, new MessagePostProcessor() { Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息拿到消息参数进行设置过期时间 message.getMessageProperties().setExpiration(10000); return message; } }); //下面打印一下消息是执行成功的 log.info(消息发送成功); }
注先启动消费者启动类在启动提供者的测试方法 (这里不好具体展示)
3.6、Lazy QueueRabbitmq在3.6.0版本开始就添加了Lazy Queue概念惰性队列
惰性队列的特征如下
接收到消息后直接存入磁盘而非内存内存中只保留最近的消息默认是2048条
消费者要消费消息时才会从磁盘中读取并加载到内存
支持数百万条的消息存储
但是现在会直接进行默认Lazy Queue模式
官方的大体意思就是3.12版本开始这个参数就可以进行忽略了开始默认为Lazy Queue队列但是仍然有用户是哟弄过CQv1队列也就是经典队列这里是可以通过参数配置进行调整为CQv2配置在rabbitmq的配置文件中
classic_queue.default_version2
我这里使用的是rabbitmq3.8版本所以这里展示还是通过代码方式给友友们展示一下
在管理界面添加一个lazy.queue队列
按照以上步骤会有这么个提示将鼠标放上去
提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示
Test void testPageOut(){ Message msg MessageBuilder .withBody(hello.getBytes(StandardCharsets.UTF_8)) //带有消息参数消息只能以byte类型发送所以这里自动转化一下 .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //设置一下队列的模式这里NON_PERSISTENT表示是数据不是持久的重启mq消息会丢失 //尝试发送10万条消息 for(int i0;i<100000;i){ rabbitTemplate.convertAndSend(lazy.queue,msg); } }
会有以下这两个点需要友友们去看 消息处理时间很快并且也大量数据情况下消息不会断
当然了也不仅仅就是了看消息处理的有多快的也是为了实用的前面说去配置配置文件也是可以简经典队列调节为lazy队列的如果想要仅仅只是个别创建队列能够是lazy队列Java代码也可以是可以实现的以下就是Lazy队列的使用代码创建的方法同时可以直接接收消息效果是一样的
RabbitListener(queuesToDeclare Queue( name lazy.queue, //设置队列名称 durable true, //设置持久化 arguments Argument(name x-queue.mode,value lazy) //设置队列模式 )) public void LazyQueue(String msg){ log.info(接收到 lazy.queue对的消息 :msg); }
4、确认机制 RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制开启确认机制会消耗一定资源也就会降低消息传输的速度返回确认消息给生产者返回的结果有这么几种
消息投递到了MQ但是路由失败此时会通过PublisherReturn返回路由异常原因然后返回Ack
临时消息入队成功就可以返回Ack
持久消息完成持久化就返回Ack
其他情况也就是失败情况返回NACK投递失败
上面的不要记住那么多简单分为成功就是Ack不成功就是Nack
直接在yml配置文件中配置的即可有以下三种模式
none关闭confirm机制默认机制
simple同步阻塞等待MQ的回执消息
correlatedMQ异步回调方式返回回执消息
下面直接上步骤
每个RabbitTemplate只能配置一个ReturnCallback在项目创建之前就需要配置好
这个配置是给提供者的是提供者消费发送的成功与失败进行返回消息
提供者这里只是配置一个消息回调
Slf4jConfigurationpublic class MqConfirmConfig implements ApplicationContextAware { Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class); //配置回调 这里配置confirm是无效的idea会给你提示confirm的方法但是效果不是这里需要的是回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { Override public void returnedMessage(ReturnedMessage returnedMessage) { log.debug(收到消息的return callback, returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getMessage(),returnedMessage.getReplyCode(), returnedMessage.getReplyText()); } }); }}
提供者下面开始发送消息进行确认
下面我们根据需要的参数进行写这个id代码
Test void testConfirmCallback() throws InterruptedException { //创建一个CorrelationData 对象 我们只取id 即可 调用一个带参的构造函数 CorrelationData cd new CorrelationData(UUID.randomUUID().toString()); /* * 添加 CorrelationData 对象中提供了一个异步处理方法也就是多线程可以看见的方法getFuture() * 这里给添加一个回调方法 参数是一个对象里面内部方法需要重写里面的 失败和成功的方法 * 但是这里失败是spring操作的失败不是Rabbitmq的失败所以不会返回Nack * */ cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { Override public void onFailure(Throwable ex) { log.debug(消息回调失败,ex); } Override public void onSuccess(CorrelationData.Confirm result) { log.debug(收到confirm callback回执); /* * 这里才是排定rabbitmq的消息是否成功 判断一下结果是否是有ack * 如果是的话 返回一个我们自己打印的消息 * */ if(result.isAck()){ log.debug(消息发送成功 收到ack); }else{ log.error(消息发送失败 收到nackresult.getReason()); } } }); // CorrelationData 对象创建好了 其实这里也可以说是id创建好了添加进发送的消息中 rabbitTemplate.convertAndSend(maxxub.direct,red2,hello,cd); Thread.sleep(2000); }
注这样的情况一般使用于检查问题因为需要一定的开销会大大降低rabbitmq处理消息的速度
5、重试机制针对这部分参数前面已经给友友们提供了
listener: simple: prefetch: 1 acknowledge-mode: auto #模式自动 进行一定的重试次数 retry: enabled: true # 超时重试 initial-interval: 1000ms multiplier: 2 #连接失败的重试 max-attempts: 3 #最大重试次数的 stateless: true #针对事务默认为false true针对上下文有定义保存是一个状态
前面都是直接给友友们提供了配置参数这里稍微具体说一下 MessageRecoverer就是消息恢复
继承该接口类有三个
1RejectAndDontRequeueRecoverer :重试耗尽后消息会被直接扔掉
2ImmediateRequeueMessageRecoverer重试耗尽后会重新进入队列消耗太大了
3RepublishMessageRecoverer重试耗尽后会去进入指定队列先存着
最优的当然是第三种了对消息做到了保护作用同时也不会无限进入队列4
下面就来实现一下
消费者
Slf4jConfiguration//ConditionalOnProperty(prefix spring,name rabbitmq.listener.simple.retry.enabled,havingValue true)public class ErrorConfiguration { /*创建一个交换机 队列 绑定一下并设置路由*/ Bean public DirectExchange directExchange(){ return new DirectExchange(error.direct); } Bean public Queue errorQueue(){ return new Queue(error.queue); } Bean public Binding errorBinding(DirectExchange directExchange,Queue errorQueue){ return BindingBuilder.bind(errorQueue).to(directExchange).with(error); } //这里 使用 MessageRecoverer 进行接收 Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ log.debug(MessageRecoverer消息); //调用 RepublishMessageRecoverer 返回机制 前面说了只能绑定一个rabbitTemplate 第一个参数填写即可 //第二个参数 交换机 刚刚创建的指定交换机 和 指定交换机的路由 return new RepublishMessageRecoverer(rabbitTemplate,error.direct,error); }}
注ConditionalOnProperty注解指定配置参数没有弄好但是这里是可以使用的如果有佬们能够很好的使用请评论下怎么弄很感谢