推或拉? rabbitMQ 消费模式该如何选择

举报
战斧 发表于 2023/09/19 11:11:25 2023/09/19
【摘要】 前言在前面的选型对比中,我们提到了rabbitMQ同时支持推和拉的消息投递方式,那么什么是消息的推和拉?我们又该如何选择呢?今天我们就一起来看下吧一、推拉两种模式的概念MQ 是一个非常重要的消息传递架构,它可以实现解耦并且提高系统的可靠性和吞吐量。 在很多MQ组件中,消息可以通过推和拉模式进行传递。推模式在推模式下,当一个生产者发布消息到队列时,队列会立即将这条消息发送给所有订阅了该队列的消...

    前言

    在前面的选型对比中,我们提到了rabbitMQ同时支持推和拉的消息投递方式,那么什么是消息的推和拉?我们又该如何选择呢?今天我们就一起来看下吧


    一、推拉两种模式的概念

    MQ 是一个非常重要的消息传递架构,它可以实现解耦并且提高系统的可靠性和吞吐量。 在很多MQ组件中,消息可以通过推和拉模式进行传递。

    • 推模式
      在推模式下,当一个生产者发布消息到队列时,队列会立即将这条消息发送给所有订阅了该队列的消费者
    • 拉模式
      在拉模式下,当生产者发布消息到队列时,队列不会立即发送消息给消费者,而是等待消费者请求消息后才发送

    二、推模式的使用及优势

    1. 使用

    代码如下(示例):

    package com.zhanfu.springboot.demo.mq;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class PushConsumer {
        private final static String QUEUE_NAME = "myqueue";
    
        public static void main(String[] args) throws Exception {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // 创建连接
            Connection connection = factory.newConnection();
            // 创建信道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Received '" + message + "'");
                }
            };
            // 开始消费消息,第二个参数为是否自动ack
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    我们定义了一个consumer 消费者,然后把该消费者使用basicConsume方法来订阅某个队列的消息。当有消息到达队列时,consumer 里的handleDelivery方法就会被调用

    2. 优劣

    1. 优势:这种方式可以实现实时通信
    2. 劣势:如果消费者的处理能力跟不上生产者的速度,就会在消费者处造成消息堆积

    三、拉模式的使用及优势

    1. 使用

    代码如下(示例):

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.*;
    
    public class RabbitMQPullConsumer {
        private static final String QUEUE_NAME = "queue_name";
        private static final String HOST_NAME = "host_name";
        private static final String USERNAME = "username";
        private static final String PASSWORD = "password";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接和通道
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST_NAME);
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
    
            // 消费消息
            while (true) {
            	// 手动从队列中获取一条消息,第二个参数为是否自动ack
                GetResponse response = channel.basicGet(QUEUE_NAME, false);
                if (response == null) {
                    // 没有消息,则等待一段时间再继续检查
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    String message = new String(response.getBody(), "UTF-8");
                    System.out.println("Received message: " + message);
                    channel.basicAck(response.getEnvelope().getDeliveryTag(), false); // 手动Ack
                }
            }
        }
    }
    
    

    如果获取的GetResponse对象为null,则表示队列中没有消息。我们将等待一段时间(这里是1秒),然后再继续检查队列中是否有新消息。

    如果获取的GetResponse对象不为null,则表示有新消息。我们将从GetResponse对象中提取消息内容,然后输出它。如果我们设置basicGet()方法的第二个参数为true,则表示自动ack,即在获取消息后直接将消息从队列中删除。

    这是一个基本的拉模式的RabbitMQ消费者实现。当然,生产上如果采用拉模式,我们更推荐使用多个线程异步的方式处理,一个线程负责循环拉取消息后,存入一个长度有限的阻塞队列,另一个/一些线程从阻塞队列中取出消息,处理完一条则手动Ack一条,这样更有效率。

    2. 优劣

    1. 优势:消费端可以按照自己的处理速度来消费,避免产生在消费端产生消息堆积。
    2. 劣势:消息传递方面可能会有一些延迟,当处理速度长时间小于消息发布速度时,容易造成大量消息堆积在rabbitMQ服务器,一些时间敏感的队列,可能会使内部的消息失效

    四、消费端Ack模式与Qos

    1. Ack模式

    我们在上面的代码中,不难发现,不管是推模式还是拉模式,方法的入参中都有一个布尔变量

    // 推模式,直接订阅
    channel.basicConsume(QUEUE_NAME, true, consumer);
    // 拉模式,一次拉取一条
    channel.basicGet(QUEUE_NAME, true);

    这个布尔变量其实就是是否自动Ack,其实我们在《RabbitMQ 能保证消息可靠性吗》一文中就提到过,这是一种消息确认机制:

    • 自动ACK
      即意味着,我们收到消息时就会通知RabbitMQ服务器,服务器就会将该消息已经被消费,进而删除。
    • 手动ACK
      -即意味着在某个我觉得可以通知RabbitMQ服务器时,我才会通知。

    那么两种模式我们该怎么取舍?

    从实际生产的角度,我强烈建议将Ack模式设置为手动,这样可以保证消费者处理消息失败时,消息不会立即从队列中删除,而是需要重新分配给其他消费者,增强了系统的容错性和可靠性。同时,建议在消费者处理消息时,对消息进行异常处理,确保消息能够正确地被处理

    2. Qos 服务质量

    在rabbitMQ的API中,我们不难发现有一项叫做 Qos(quality of service—— 服务质量) 的参数设置
    在这里插入图片描述

    These settings impose limits on the amount of data the server will deliver to consumers before requiring acknowledgements.Thus they provide a means of consumer-initiated flow control.
    这些设置对服务器在接收到Ack之前将传递给消费者的数据量进行了限制。因此,它们提供了一种由消费者发起的流量控制方式。

    通俗的讲,这个Qos参数是配合Ack 用来控制消费端的流量的(即限流:自动ACK不需要设置Qos;手动ACK模式时,则可以设置Qo,控制消费者处理消息的速度,它拥有三个参数

    1. prefetchSize:服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0
    2. prefetchCount:服务器将传递的最大消息条数,如果不受限制,则为0
    3. global:是否将该设置应用到整个信道,还是仅此一个消费者

    在上面拉模式的代码中,我们就使用了一个 channel.basicQos(1); ,也是最常用的限制方法,即以消息条数为单位限制,其实设置的就是最大消息条数

        /**
         * Request a specific prefetchCount "quality of service" settings
         * for this channel.
         * <p>
         * Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
         *
         * @param prefetchCount maximum number of messages that the server
         *                      will deliver, 0 if unlimited
         * @throws java.io.IOException if an error is encountered
         * @see #basicQos(int, int, boolean)
         */
        void basicQos(int prefetchCount) throws IOException;
    

    我们可以设置QoS,控制消费者处理消息的速度。通过合理设置预取计数或预取字节数,可以确保消费者处理消息的速度与RabbitMQ服务器发送消息的速度相匹配,避免队列中积压过多的未确认消息需要注意的是,这个prefetchCount的取值是一个经验值,太大容易导致消费端内存消耗过高,太小则效率低下,一般建议在100以下


    五、总结

    综合来看,推模式适合实时通信且生产者和消费者的速度相当的场景,而且对于利于压榨出消费者端的处理潜力;拉模式适合大量数据的场景,并且可以更好地控制消息的消费进度。但是,实际应用中更多地是将两种模式结合使用,以达到更好的效果。

    【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
    • 点赞
    • 收藏
    • 关注作者

    评论(0

    0/1000
    抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。