RabbitMQ之死信队列解读
目录
RabbitConfigDeal 配置类:创建队列及交换机并进行绑定
主启动类RabbitMq01Application:实现ApplicationRunner接口
基本介绍
什么是死信交换机
在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。
什么是死信队列
死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已
RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。
要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。
消息进入到死信队列的情况
消息过期
队列过期
TTL: Time to Live的简称,过期时间
队列达到最大长度(先入队的消息会被发送到DLX)
消费者拒绝消息不进行重新投递
从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。
application.yml 启动手动确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:消息的一个数字标签
- multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
- requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列
消费者拒绝消息
开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- basicReject是否定的交付,一般在消费消息时出现异常等的时候执行。可以将该消息丢弃或重排序去重新处理消息
- 参数1: 消费消息的index
- 参数2: 对异常消息的处理,true表示重排序,false表示丢弃
- Reject 在拒绝消息时,可以使用 requeue 标识,告诉 RabbitMQ 是否需要重新发送给别的消费者。如果是 false 则不重新发送,一般这个消息就会被RabbitMQ 丢弃。Reject 一次只能拒绝一条消息。如果是 true 则消息发生了重新投递。
- Nack 跟 Reject 类似,只是它可以一次性拒绝多个消息。也可以使用 requeue 标识,这是 RabbitMQ 对 AMQP 规范的一个扩展。
- 通过 RejectRequeuConsumer 可以看到无论是使用 Reject 方式还是 Nack 方式,当 requeue
- 参数设置为 true 时,消息发生了重新投递。当 requeue 参数设置为 false 时,消息丢失了。
springboot代码实战
实战架构
如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。
工程概述
工程采用springboot架构,主要用到的依赖为:
application.yml配置文件如下:
RabbitConfigDeal 配置类:创建队列及交换机并进行绑定
创建正常交换机
创建死信交换机
创建死信队列
创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order
绑定正常交换机和正常队列
绑定死信交换机和死信队列
MessageService业务类:发送消息及接收消息
发送消息方法
这里用的路由key为info
MessageConvert
- 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
- RabbitMQ 的序列化是指
Message
的body
属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有SimpleMessageConverter
(默认)、Jackson2JsonMessageConverter
等
接受消息
Message
在消息传递的过程中,实际上传递的对象为
org.springframework.amqp.core.Message
,它主要由两部分组成:
MessageProperties // 消息属性
byte[] body // 消息内容
@RabbitListener
使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理
消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
- application/octet-stream:二进制字节数组存储,使用 byte[]
- application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
- text/plain:文本数据类型存储,使用 String
- application/json:JSON 格式,使用 Object、相应类型
主启动类RabbitMq01Application:实现ApplicationRunner接口
在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。
启动主启动类后查看控制台:
我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。
- 点赞
- 收藏
- 关注作者
评论(0)