欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

rabbitmq详细教程,rabbitmq入门到精通

终极管理员 知识笔记 129阅读

日升时奋斗日落时自省 

目录

1、MQ基本概念

2、RabbitMQ简介

3、RabbitMQ模式

3.1、RabbitMQ安装linux

3.1.1、直接安装

3.1.2、docker拉取

3.2、项目搭建

3.2.1、创建项目

3.2.2、配置

3.3、简单模式

3.3、Work queues 工作队列模式

3.4、Pub/Sub 订阅模式

3.4.1、fanout类型

3.4.2、direct类型

3.4.3、topic类型

3.5、死信队列

3.6、Lazy Queue

4、确认机制

5、重试机制


注该博客有点长重复内容不多希望友友们慢慢观看

1、MQ基本概念

1应用解耦

服务与服务之前进行远程调用时不需要直接调度降低了关联性服务直接给中间件发送消息通过MQ消息给调度服务做到降低服务与服务之间耦合性

2异步提速

如何做到异步提速呢主要就是MQ作为消息中间人进行转发告诉其他服务这边已经好了你们可以开始了为啥说这里的异步快因为Rabbitmq请求速度是微妙级别的速度相比远程调用的请求速度是要快的多的

3削峰填谷

先来理解一下削峰填谷图有点抽象麻烦友友们慢慢看一下暂时也没有想到很突显的图

如何做到削峰填谷呢

MQ的产品

2、RabbitMQ简介

AMQP 即 Advanced Message Queuing Protocol高级消息队列协议是一个网络协议是应用层协议的一个开放标准为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不同产品不同的开发语言等条件的限制。 

RabbitMQ基础架构图

下面来解释以下怎么看这个图producer就是提供者通过channel频道连接到Exchange交换机交换机通过虚拟用户绑定一个Queue队列consumer就是消费者通过channel频道直接去Queue队列里拿消息前提是要有消息

 Virtual host 出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似于网络中的 namespace 进行隔离概念。当多个不同的用户使用同一个 RabbitMQ server提供的服务时可以划分出多个vhost每个用户在自己的 vhost 创建 exchange queue 等

Connection publisher consumer 和 broker 之间的 TCP 连接

Channel 如果每一次访问 RabbitMQ 都建立一个 Connection在消息量大的时候建立 TCP Connection的开销将是巨大的效率也较低。 Channel 是在 connection 内部建立的逻辑连接如果应用程序支持多线程通常每个thread创建单独的 channel 进行通讯 AMQP method 包含了channel id 帮助客户端和message broker 识别 channel所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection极大减少了操作系统建立 TCP connection 的开销

Exchange message 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发消息到queue 中去。常用的类型有 direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue 消息最终被送到这里等待 consumer 取走
Binding exchange 和 queue 之间的虚拟连接 binding 中可以包含 routing key。 Binding 信息被保存到 exchange 中的查询表中用于 message 的分发依据

3、RabbitMQ模式

去官网上看有以下六种模式友友们可以直接来官网上看RabbitMQ Tutorials — RabbitMQ

3.1、RabbitMQ安装linux 3.1.1、直接安装

RabbitMQ安装在linux上需要一定的依赖环境

这里直接给友友们 配上如果是Ubuntu的直接yum换成apt就行

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz

还需要一个前置环境Erlang从下面进行去安装包也可以去对应官网取 

链接: 提取码: 1234 

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

开启管理界面

rabbitmq-plugins enable rabbitmq_management

修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app

注进来后找这个位置只改这一行改成下面的样子就可以了一会要靠这个登录呢

启动RabbitMQ

service rabbitmq-server start

访问地址ip:15672 如果界面打不开可能是防火墙没有关或者云服务器安全组没有开放针对问题稍微百度一下即可

注15672是RabbitMQ的Web管理界面的默认监听端口5672是AMQP协议的默认端口25672是RabbitMQ集群间节点通信的默认端口当前我们使用管理页面访问15672接口即可

注账号密码都是guest

3.1.2、docker拉取

mq.tar文件还是从

docker直接拉取也挺好用的先把tar文件加载一下

docker load -i mq.tar

然后创建一个网络

docker network create rabbtimq-maxxub

剩下的跑起来就行 -e 是环境变量其实就是设置登录名和密码

到时候登录名maxxub 密码就是1223456

docker run \

 -e RABBITMQ_DEFAULT_USERmaxxub \

 -e RABBITMQ_DEFAULT_PASS123456 \

 -v mq-plugins:/plugins \

 --name mq \

 --hostname mq \

 -p 15672:15672 \

 -p 5672:5672 \

 --network rabbtimq-maxxub \

 -d \

 rabbitmq:3.8-management

3.2、项目搭建 3.2.1、创建项目

1父工程

直接上springboot先创建父工程目起个名字我这里叫mq-demo以下是需要的依赖

    <modules>        <module>publisher</module>        <module>consumer</module>    </modules>    <packaging>pom</packaging>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.7.12</version>        <relativePath/>    </parent>    <properties>        <maven.compiler.source>8</maven.compiler.source>        <maven.compiler.target>8</maven.compiler.target>    </properties>    <dependencies>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>        <!--AMQP依赖包含RabbitMQ-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <!--单元测试-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-logging</artifactId>        </dependency>    </dependencies>

2创建消费者模块和提供者模块子模块不需要依赖

3.2.2、配置

application.yml 消费者和提供者都可以使用同一份复制过去就行了

下面需要友友们知道有虚拟用户这个东西

logging:  pattern:    dateformat: MM-dd HH:mm:ss:SSS  level:    com.itheima: debugspring:  rabbitmq:    host: 这里写友友们自己的虚拟机ip或者云服务器ip    port: 5672    virtual-host: /       #虚拟用户    username: maxxub      #登录rabbitmq的账号    password: 123456      #登录rabbitmq的密码    listener:      simple:        prefetch: 1        acknowledge-mode: auto  #模式自动 进行一定的重试次数        retry:          enabled: true  #  超时重试          initial-interval: 1000ms          multiplier: 2 #连接失败的重试          max-attempts: 3 #最大重试次数的          stateless: true #针对事务默认为false true针对上下文有定义保存是一个状态
3.3、简单模式

P表示producer 提供者

红色框Queue 队列

C表示consumer 消费者

简介叙述直接通过队列进行比较简单没有啥条件限制

提供者这里直接就在测试进行开始写了

消息发送依赖于RabbitTemplate类这个类直接注入就行了

Slf4jSpringBootTestpublic class SpringAmqpTest {    Autowired    private RabbitTemplate rabbitTemplate;    Test    void testSendMessage2Queue(){        String queueNamesimple.queue;   //我们这里写队列名称一会在管理页面创建        String msghello,ampq!;   //消息        rabbitTemplate.convertAndSend(queueName,msg);  //参数队列  消息    }}

消费者

先去创建一个队列直接到rabbitmq管理页面进行创建就行了ip:15672 直接登录既可以

消费者的代码

SpringBootApplicationpublic class ConsumerApplication {    //启动类    public static void main(String[] args) {        SpringApplication.run(ConsumerApplication.class, args);    }}
Slf4jComponentpublic class MqListener {    RabbitListener(queues  simple.queue)   //使用该注解监听队列消息    public void listenSimpleQueue(String msg){        System.out.println(消费者收到simple.queue的消息msg);    }}

注先启动消费者的启动类 再启动提供者的测试代码消费者会收到消息并且直接打印

3.3、Work queues 工作队列模式

P表示producer 提供者

红色框Queue 队列

C表示consumer 消费者

注这次有两个消费者其实也可以是多个消费者

与简单模式没有很大的区别使用同一个队列

消费者很简单没有任何区别就是添加了一个方法 该方法还是写在MqListener 类中

Slf4jComponentpublic class MqListener {    RabbitListener(queues  simple.queue)    public void listenSimpleQueue1(String msg) throws InterruptedException {        System.out.println(消费者1收到simple.queue的消息msg);        Thread.sleep(20);    }    RabbitListener(queues  simple.queue)    public void listenSimpleQueue2(String msg) throws InterruptedException {        System.err.println(消费者2收到simple.queue的消息msg);        Thread.sleep(200);    }}

注这里加了睡眠时间是为了下面演示做准备

提供者测试一下写在producer测试代码 其实也没用改啥就是给这个队列多发点消息

Slf4jSpringBootTestpublic class SpringAmqpTest {    Autowired    private RabbitTemplate rabbitTemplate;    Test    void testSendMessage2Queue() throws InterruptedException {        String queueNamesimple.queue;        for(int i1;i<50;i){            String msghello,ampq!;            rabbitTemplate.convertAndSend(queueName,msg);            Thread.sleep(20);        }    }}

注先启动消费者启动类在启动提供者测试类

    listener:      simple:        prefetch: 1     #每次每次只处理一个消息          acknowledge-mode: auto        retry:          enabled: true  #  超时重试          initial-interval: 1000ms     #设定重试时间          multiplier: 2 #连接失败的重试          max-attempts: 3  #最大重试次数          stateless: true

这里给消费者2设置等待时间是200ms秒消费者设置等待时间是20ms处理是及时处理也就是一部分是消费者1一部分是消费者2

这里展示的结果跟我们前面的配置参数有关

    listener:      simple:        prefetch: 1     #每次每次只处理一个消息        acknowledge-mode: auto        retry:          enabled: true  #  超时重试          initial-interval: 1000ms     #设定重试时间          multiplier: 2 #连接失败的重试          max-attempts: 3  #最大重试次数          stateless: true

如果把prefetch: 1该参数去掉消费者2等待时间为200ms>>20ms所以会进行等待处理导致消息处理饥饿

3.4、Pub/Sub 订阅模式

P生产者也就是要发送消息的程序但是不再发送到队列中而是发给X交换机
C消费者消息的接收者会一直等待消息到来
Queue消息队列接收消息、缓存消息
Exchange交换机X一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。 Exchange有常见以下3种类型

简单说三种常见交换机类型fanout、direct、topic

有交换机和队列了我们简单教友友们了解一下如何绑定交换机和队列关系在管理页面

后面就开始给友友们代码直接创建交换机和队列并且进行绑定这是比较好的操作

3.4.1、fanout类型

下面这个位置创建一个类

直接上代码代码创建交换机和队列并且进行绑定这里创建队列与管理页面创建效果都是一样的就是更快也不容易出问题消费者跑起来以后可以去管理页面看交换机和队列都是存在的

Configurationpublic class FanoutConfiguration {    Bean     //创建FanoutExchange类型交换机 以下是两种方法 两个选择一个就行     public FanoutExchange fanoutExchange(){//        ExchangeBuilder.fanoutExchange(maxxub.fanout).build();   //使用Builder创建        return new FanoutExchange(maxxub.fanout2); //直接new一个交换机对象 参数填写交换机名称     }    Bean      //创建Queue类型队列  以下是两种方法 两个选择一个就行     public Queue fanoutQueue3(){//        QueueBuilder.durable(fanout.queue3).build();   //使用Builder创建        return new Queue(fanout.queue3); //直接new一个队列对象 参数填写队列名称     }    Bean  //进行队列和交换机绑定 使用绑定构建     public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){        //直接看  BindingBuilder 就是 bind绑定fanoutQueue3队列 to到 fanoutExchange交换机上        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);    }    Bean    public Queue fanoutQueue4(){//        QueueBuilder.durable(fanout.queue4).build();        return new Queue(fanout.queue4);    }    Bean    public Binding fanoutBinding4(){        //可以直接调用方法        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());    }}

消费者MqListener类中写就行 还是同样的监听

    RabbitListener(queues  fanout.queue3)    public void listenFanoutQueue3(String msg) throws InterruptedException {        System.out.println(消费者收到fanout.queue3的消息msg);        Thread.sleep(200);    }    RabbitListener(queues  fanout.queue4)    public void listenFanoutQueue4(String msg) throws InterruptedException {        System.err.println(消费者收到fanout.queue4的消息msg);        Thread.sleep(200);    }

提供者写在producer的测试类中

注fanout刻画类型是没有路由的所这里就是null我们本身也没有设置路由

    Test    void testFanoutQueue(){        //前面创建的是maxxub.fanout2交换机所以这里也绑定maxxub.fanout2        String exchangeNamemaxxub.fanout2;  //交换机名称        String msghello , amqp;           //消息        //参数交换机、路由、消息        rabbitTemplate.convertAndSend(exchangeName,null,msg);    }

注先启动消费者启动类 再启动提供者测试方法效果如下

3.4.2、direct类型

就拿这个图来分析一下这里的P提供者连接X交换机设置不同Routing路由给两个队列相当于给两个队列加上了特殊的标签black路由标签和green路由标签给了Q2队列这个队列就负责接收blackgreen标签的消息其他的消息不归我管orange路由标签给Q1队列这个队列就负责接收orange标签的消息

创建Direct交换机和队列并设置路由进行绑定 不用的时候把Configuration注解注释掉

Configurationpublic class DirectConfiguration {    Bean    //创建DirectExchange交换机 以下是两种方法 进行创建    public DirectExchange directExchange(){        //参数就是  交换机的名称//        ExchangeBuilder.fanoutExchange(maxxub.direct).build();  //使用Builder进行创建交换机        return new DirectExchange(maxxub.direct);    }    Bean   //创建  队列    public Queue directQueue3(){        //填写参数就是  队列名称//        QueueBuilder.durable(direct.queue1).build();        return new Queue(direct.queue1);    }    Bean    //进行交换机和队列的绑定 同时添加上路由    public Binding directBinding1(Queue directQueue3,DirectExchange directExchange){        //解释  BindingBuilder bind绑定directQueue3队列 to到directExchange with带上路由red        return BindingBuilder.bind(directQueue3).to(directExchange).with(red);    }    Bean    public Binding directBinding2(Queue directQueue3,DirectExchange directExchange){        //解释  BindingBuilder bind绑定directQueue3队列 to到directExchange with带上路由blue        return BindingBuilder.bind(directQueue3).to(directExchange).with(blue);    }    Bean    public Queue directQueue4(){//        QueueBuilder.durable(direct.queue2).build();        return new Queue(direct.queue2);    }    Bean    public Binding directBinding4(){        //可以直接调用方法 于上面是雷同的交换机一个就够了这里再给交换机绑定队列和路由就行了        return BindingBuilder.bind(directQueue4()).to(directExchange()).with(red);    }    Bean    public Binding directBinding3(){        //可以直接调用方法        return BindingBuilder.bind(directQueue4()).to(directExchange()).with(yellow);    }}

注这种方法比较麻烦如果需要绑定多个路由或者队列还是很不方便这里只作为演示下面开始使用注解进行创建交换机和队列进行绑定并且带上路由

消费者MqListener类中写就行 还是同样的监听

参数解释

bindings就是绑定参数

QueueBinding注解中的参数value就是需要绑定的队列name队列名称durable就是消息持久化

QueueBinding注解中的参数exchange就是被绑定的交换机name交换机名称type交换机类型

参数key就是路由{可以直接写多个路由参数}

注好处就是能节省很多代码很简单并且很很清晰

    RabbitListener(bindings  QueueBinding(            value Queue(name  direct.queue1,durable  true),            exchange  Exchange(name  maxxub.direct,type  ExchangeTypes.DIRECT),            key  {red,blue}))    public void listenDirectQueue1(String msg) throws InterruptedException {        System.out.println(消费者1收到direct.queue1的消息msg);        Thread.sleep(200);    }    RabbitListener(bindings  QueueBinding(            value Queue(name  direct.queue2,durable  true),            exchange  Exchange(name  maxxub.direct,type  ExchangeTypes.DIRECT),            key  {red,yellow}))    public void listenDirectQueue2(String msg) throws InterruptedException {        System.out.println(消费者2收到direct.queue2的消息msg);        Thread.sleep(200);    }

提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示

    Test    void testDirectQueue(){        String exchangeNamemaxxub.direct;        String msg红色警报;        //参数交换机路由消息        rabbitTemplate.convertAndSend(exchangeName,red,msg);    }

注先启动消费者启动类在启动提供者的测试方法因为两个队列都有路由“red”所以两个对哦都会收到消息

3.4.3、topic类型

路由使用通配符的形式进行

*代表一个字符

#代表0个或者多个字符

英文大概意思就是这里路由是通过带有通配符进行的和direct类型不同的就是通配符能更灵活的设置路由类似标签

消费者MqListener类中写就行 还是同样的监听 这里没有啥改变基本就是key改了一点点

    RabbitListener(bindings  QueueBinding(            value Queue(name  topic.queue1,durable  true),            exchange  Exchange(name  maxxub.topic,type  ExchangeTypes.TOPIC),            key  {#.new}))    public void listenTopicQueue1(String msg) throws InterruptedException {        System.out.println(消费者1收到topic.queue1的消息);        Thread.sleep(200);    }    RabbitListener(bindings  QueueBinding(            value Queue(name  topic.queue2,durable  true),            exchange  Exchange(name  maxxub.topic,type  ExchangeTypes.TOPIC),            key  {china.#}))    public void listenTopicQueue2(String msg) throws InterruptedException {        System.out.println(消费者2收到topic.queue2的消息msg);        Thread.sleep(200);    }

提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示

    Test    void testTopicQueue(){        String exchangeNamemaxxub.topic;        String msg中国繁荣;        rabbitTemplate.convertAndSend(exchangeName,china.strong,msg);    }

注先启动消费者启动类在启动提供者的测试方法因为两个队列都有路由“red”所以两个对哦都会收到消息 

解释这里china.#的路由才能满足条件所以只有topic.queue2队列接收到了消息

3.5、死信队列

死信队列英文缩写 DLX 。 Dead Letter Exchange死信交换机当消息成为Dead message后可以被重新发送到另一个交换机这个交换机就是DLX

主要就是为了处理消息丢失的现象如果消息长时间处理不带或者确认机制设置的问题导致消息丢失了怎么办不能说丢了就丢了吧死信队列就是一个比较好的办法去解决

就是给普通的消息队列设置一个消息过期时间在一定时间内没有被处理掉的那就把它放到私信队列中

思路我们给simple.queue队列发消息设置过期时间10000ms但是这个队列没有提供者同时给他绑定一个死信交换机和死信队列消息在10000ms以后会流入死信队列中

先做前置操作

上面是一步一步的过程下面是基本操作流程

消费者MqListener类中写就行这里只写死信队列的就可以了simple队列是故意不让他拿到消息的为了能够超过过期时间

    RabbitListener(queues  dlx.queue)    public void listenDlxQueue2(String msg) throws InterruptedException {        log.info(消费者收到dlx.queue的消息msg);        Thread.sleep(200);    }

提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示

参数没有很多的变化消息队列、路由、消息、一个消息处理器的对象进行设置消息过期时间

    //延迟队列    Test    void testSendTTLMessage(){        rabbitTemplate.convertAndSend(simple.direct, hi, hello, new MessagePostProcessor() {            Override            public Message postProcessMessage(Message message) throws AmqpException {                //设置消息拿到消息参数进行设置过期时间                message.getMessageProperties().setExpiration(10000);                return message;            }        });        //下面打印一下消息是执行成功的        log.info(消息发送成功);    }

注先启动消费者启动类在启动提供者的测试方法 (这里不好具体展示)

3.6、Lazy Queue

Rabbitmq在3.6.0版本开始就添加了Lazy Queue概念惰性队列

惰性队列的特征如下

接收到消息后直接存入磁盘而非内存内存中只保留最近的消息默认是2048条

消费者要消费消息时才会从磁盘中读取并加载到内存

支持数百万条的消息存储

但是现在会直接进行默认Lazy Queue模式

官方的大体意思就是3.12版本开始这个参数就可以进行忽略了开始默认为Lazy Queue队列但是仍然有用户是哟弄过CQv1队列也就是经典队列这里是可以通过参数配置进行调整为CQv2配置在rabbitmq的配置文件中

classic_queue.default_version2

我这里使用的是rabbitmq3.8版本所以这里展示还是通过代码方式给友友们展示一下

在管理界面添加一个lazy.queue队列

按照以上步骤会有这么个提示将鼠标放上去

提供者写在producer的测试类中参数如果真的不知道怎么办了ctrlp就会有提示

    Test    void testPageOut(){        Message msg  MessageBuilder                .withBody(hello.getBytes(StandardCharsets.UTF_8))   //带有消息参数消息只能以byte类型发送所以这里自动转化一下                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build(); //设置一下队列的模式这里NON_PERSISTENT表示是数据不是持久的重启mq消息会丢失        //尝试发送10万条消息        for(int i0;i<100000;i){            rabbitTemplate.convertAndSend(lazy.queue,msg);        }    }

会有以下这两个点需要友友们去看 消息处理时间很快并且也大量数据情况下消息不会断

当然了也不仅仅就是了看消息处理的有多快的也是为了实用的前面说去配置配置文件也是可以简经典队列调节为lazy队列的如果想要仅仅只是个别创建队列能够是lazy队列Java代码也可以是可以实现的以下就是Lazy队列的使用代码创建的方法同时可以直接接收消息效果是一样的

    RabbitListener(queuesToDeclare  Queue(            name  lazy.queue,  //设置队列名称            durable  true,     //设置持久化            arguments  Argument(name  x-queue.mode,value  lazy) //设置队列模式    ))    public void LazyQueue(String msg){        log.info(接收到 lazy.queue对的消息 :msg);    }
4、确认机制

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制开启确认机制会消耗一定资源也就会降低消息传输的速度返回确认消息给生产者返回的结果有这么几种

消息投递到了MQ但是路由失败此时会通过PublisherReturn返回路由异常原因然后返回Ack

临时消息入队成功就可以返回Ack

持久消息完成持久化就返回Ack

其他情况也就是失败情况返回NACK投递失败

上面的不要记住那么多简单分为成功就是Ack不成功就是Nack

直接在yml配置文件中配置的即可有以下三种模式

none关闭confirm机制默认机制

simple同步阻塞等待MQ的回执消息

correlatedMQ异步回调方式返回回执消息

下面直接上步骤

每个RabbitTemplate只能配置一个ReturnCallback在项目创建之前就需要配置好

这个配置是给提供者的是提供者消费发送的成功与失败进行返回消息

提供者这里只是配置一个消息回调

Slf4jConfigurationpublic class MqConfirmConfig implements ApplicationContextAware {    Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        RabbitTemplate rabbitTemplate  applicationContext.getBean(RabbitTemplate.class);        //配置回调 这里配置confirm是无效的idea会给你提示confirm的方法但是效果不是这里需要的是回调        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {            Override            public void returnedMessage(ReturnedMessage returnedMessage) {                log.debug(收到消息的return callback,                        returnedMessage.getExchange(),returnedMessage.getRoutingKey(),                        returnedMessage.getMessage(),returnedMessage.getReplyCode(),                        returnedMessage.getReplyText());            }        });    }}

提供者下面开始发送消息进行确认

下面我们根据需要的参数进行写这个id代码

 

    Test    void testConfirmCallback() throws InterruptedException {        //创建一个CorrelationData 对象 我们只取id 即可 调用一个带参的构造函数        CorrelationData cd  new CorrelationData(UUID.randomUUID().toString());        /*        * 添加 CorrelationData 对象中提供了一个异步处理方法也就是多线程可以看见的方法getFuture()         * 这里给添加一个回调方法 参数是一个对象里面内部方法需要重写里面的 失败和成功的方法         * 但是这里失败是spring操作的失败不是Rabbitmq的失败所以不会返回Nack        * */        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {            Override            public void onFailure(Throwable ex) {                log.debug(消息回调失败,ex);            }            Override            public void onSuccess(CorrelationData.Confirm result) {                log.debug(收到confirm callback回执);                /*                * 这里才是排定rabbitmq的消息是否成功 判断一下结果是否是有ack                 * 如果是的话 返回一个我们自己打印的消息                * */                if(result.isAck()){                    log.debug(消息发送成功 收到ack);                }else{                    log.error(消息发送失败 收到nackresult.getReason());                }            }        });        // CorrelationData 对象创建好了 其实这里也可以说是id创建好了添加进发送的消息中        rabbitTemplate.convertAndSend(maxxub.direct,red2,hello,cd);        Thread.sleep(2000);    }

注这样的情况一般使用于检查问题因为需要一定的开销会大大降低rabbitmq处理消息的速度

5、重试机制

针对这部分参数前面已经给友友们提供了

    listener:      simple:        prefetch: 1        acknowledge-mode: auto  #模式自动 进行一定的重试次数        retry:          enabled: true  #  超时重试          initial-interval: 1000ms          multiplier: 2 #连接失败的重试          max-attempts: 3 #最大重试次数的          stateless: true #针对事务默认为false true针对上下文有定义保存是一个状态

前面都是直接给友友们提供了配置参数这里稍微具体说一下 MessageRecoverer就是消息恢复

继承该接口类有三个

1RejectAndDontRequeueRecoverer :重试耗尽后消息会被直接扔掉

2ImmediateRequeueMessageRecoverer重试耗尽后会重新进入队列消耗太大了

3RepublishMessageRecoverer重试耗尽后会去进入指定队列先存着

最优的当然是第三种了对消息做到了保护作用同时也不会无限进入队列4

下面就来实现一下

消费者

Slf4jConfiguration//ConditionalOnProperty(prefix  spring,name  rabbitmq.listener.simple.retry.enabled,havingValue  true)public class ErrorConfiguration {    /*创建一个交换机 队列 绑定一下并设置路由*/    Bean    public DirectExchange directExchange(){        return new DirectExchange(error.direct);    }    Bean    public Queue errorQueue(){        return new Queue(error.queue);    }    Bean    public Binding errorBinding(DirectExchange directExchange,Queue errorQueue){        return BindingBuilder.bind(errorQueue).to(directExchange).with(error);    }    //这里 使用 MessageRecoverer 进行接收    Bean    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){        log.debug(MessageRecoverer消息);        //调用 RepublishMessageRecoverer 返回机制 前面说了只能绑定一个rabbitTemplate 第一个参数填写即可        //第二个参数 交换机 刚刚创建的指定交换机 和 指定交换机的路由        return new RepublishMessageRecoverer(rabbitTemplate,error.direct,error);    }}

注ConditionalOnProperty注解指定配置参数没有弄好但是这里是可以使用的如果有佬们能够很好的使用请评论下怎么弄很感谢

标签:
声明:无特别说明,转载请标明本文来源!