RocketMQ(七):消费者如何保证顺序消费?

举报
菜菜的后端私房菜 发表于 2024/11/12 09:12:09 2024/11/12
【摘要】 RocketMQ(七):消费者如何保证顺序消费?之前的文章已经描述过消费消息有并发、顺序两种方式,其中并发消费常用于无序消息中,而顺序消息用于有序消息顺序消费是确保消息严格有序的前提,当需要确保消息有序时需要采用顺序消费,否则会可能打破消息的有序性顺序消费较为复杂,会涉及到多种锁来保证顺序消费本篇文章就来描述顺序消费的原理,来看看RocketMQ是如何保证顺序消费的,导图如下:往期好文:R...

RocketMQ(七):消费者如何保证顺序消费?

之前的文章已经描述过消费消息有并发、顺序两种方式,其中并发消费常用于无序消息中,而顺序消息用于有序消息

顺序消费是确保消息严格有序的前提,当需要确保消息有序时需要采用顺序消费,否则会可能打破消息的有序性

顺序消费较为复杂,会涉及到多种锁来保证顺序消费

本篇文章就来描述顺序消费的原理,来看看RocketMQ是如何保证顺序消费的,导图如下:

本文导图

往期好文:

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)

RocketMQ(四):消费前如何拉取消息?(长轮询机制)

RocketMQ(五):揭秘高吞吐量并发消费原理

RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)

顺序消费原理

再平衡时加分布式锁

在顺序消费下,再平衡机制为了让每个队列都只分配到一个消费者,要向Broker获取该队列的分布式锁

再平衡更新ProcessQueue时,调用updateProcessQueueTableInRebalance新增时,如果是顺序的再平衡要先判断内存队列processQueue是否加分布式锁:

		for (MessageQueue mq : mqSet) {
        	//本地没有则要新增
            if (!this.processQueueTable.containsKey(mq)) {
                //如果顺序 要判断队列是否加锁, 未获取说明其他消费者已经获取由它负责,这里跳过
                if (isOrder && !this.lock(mq)) {
                    continue;
                }

               //新增略
            }
        }

RebalanceImpl.lock

加锁的流程:获取Broker信息、调用lockBatchMQ向Broker申请给(批量)队列加分布式锁、将拿到锁队列对应的ProcessQueue设置为已加锁并更新获取锁的时间用于判断锁是否过期

public boolean lock(final MessageQueue mq) {
    //获取Broker
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if (findBrokerResult != null) {
        LockBatchRequestBody requestBody = new LockBatchRequestBody();
        requestBody.setConsumerGroup(this.consumerGroup);
        requestBody.setClientId(this.mQClientFactory.getClientId());
        requestBody.getMqSet().add(mq);

        try {
            //向Broker请求要加锁的队列 返回的结果就是获取到锁的队列
            Set<MessageQueue> lockedMq =
                this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
            
            //将获取到锁的队列对应的ProcessQueue的lock字段设置为true标识获取到分布式锁,并更新锁时间用于判断是否过期
            for (MessageQueue mmqq : lockedMq) {
                ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                if (processQueue != null) {
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
            }

            boolean lockOK = lockedMq.contains(mq);
            log.info("the message queue lock {}, {} {}",
                lockOK ? "OK" : "Failed",
                this.consumerGroup,
                mq);
            return lockOK;
        } catch (Exception e) {
            log.error("lockBatchMQ exception, " + mq, e);
        }
    }

    return false;
}

同时在ConsumeMessageOrderlyService组件启动时会开启定时任务(默认20S)调用lockAll向Broker获取当前负责processQueue的分布式锁(如果已持有则相当于续期)

顺序消费流程

前文已经说过:PullRequest(拉取消息)与MessageQueue、ProcessQueue(内存中存储、消费消息)一一对应,关系密切

在消费端流程中涉及三把锁,MessageQueue的本地锁、ProcessQueue的分布式锁和本地锁

PullRequest拉取消息到ProcessQueue后会提交异步消费消息,封装ConsumeRequest提交到线程池,其集群模式下的顺序消费流程为:

  1. 获取该队列messageQueue本地锁加锁(防止线程池并发消费同一队列)
  2. 校验是否持有processQueue分布式锁,如果未持有调用 tryLockLaterAndReconsume 延迟尝试加processQueue分布式锁并提交消费请求【未获取到锁,说明其他节点已获取分布式锁,当前节点先延时3s再进行消费(可能是再平衡机制将该队列分配给其他节点)】
  3. 循环开始顺序消费消息,每次消费设置的最大消费消息数量,如果消费成功就循环消费,期间校验是否持有processQueue分布式锁,以及是否超时(默认60S,超时延时提交消费请求,期间还会封装上下文和执行消费前后的钩子)
  4. 真正调用消息监听器进行消息消费时,需要获取processQueue的本地锁(再平衡如果将队列分配给其他消费者,会删除该队列,加锁防止在删除的过程中可能并发进行消费,防止多节点的重复消费)
  5. 最后处理消费后的结果 processConsumeResult

顺序消费流程

@Override
public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    }

    //获取该队列本地内存锁的锁对象
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    //队列加锁
    synchronized (objLock) {
        //集群模式要获取分布式锁并且不过期 否则调用tryLockLaterAndReconsume延迟处理
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            //消费成功循环处理 直到失败延迟处理
            for (boolean continueConsume = true; continueConsume; ) {
				
                //再次校验 没获取锁 或者 超时 都会调用tryLockLaterAndReconsume延迟处理 
                //...

                //如果循环超时则延时提交消费请求,后续重新消费
                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                }

                
                //每次最大消费消息数量
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                //从内存队列中获取要消费的消息
                List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                //消息可能是重试的,重置为正常的topic
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                if (!msgs.isEmpty()) {
                    //顺序上下文
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
					//执行后的状态
                    ConsumeOrderlyStatus status = null;

                    //构建上下文 行前的钩子..

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    try {
                        //消费前加processQueue本地锁
                        this.processQueue.getConsumeLock().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            break;
                        }

                        //顺序消费消息
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        hasException = true;
                    } finally {
                        this.processQueue.getConsumeLock().unlock();
                    }

					//处理状态..
                    
                    //执行后的钩子..

                    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                        .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                    //处理消费后的状态
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {
                    continueConsume = false;
                }
            }
        } else {
            if (this.processQueue.isDropped()) {
                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        }
    }
}

顺序消费过程中部分流程也与并发消费类似,只不过需要通过加锁的方式保证顺序消费

List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);

processQueue 存储消息的内存队列是由两个TreeMap实现的,Key为消息的偏移量作为顺序,优先消费先持久化的消息(偏移量小)

其中 msgTreeMap 存储拉取到内存的消息,consumingMsgOrderlyTreeMap 在顺序消费时才使用,取出消息时将消息存入该容器,消费失败时再将消息放回 msgTreeMap 后续重复进行消费

public List<MessageExt> takeMessages(final int batchSize) {
    List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
    final long now = System.currentTimeMillis();
    try {
        //取出加写锁
        this.treeMapLock.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!this.msgTreeMap.isEmpty()) {
                for (int i = 0; i < batchSize; i++) {
                    //取出首个节点 也就是偏移量小的消息
                    Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                    if (entry != null) {
                        result.add(entry.getValue());
                        //加入consumingMsgOrderlyTreeMap
                        consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
                    } else {
                        break;
                    }
                }
            }

            if (result.isEmpty()) {
                consuming = false;
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("take Messages exception", e);
    }

    return result;
}

ProcessQueue.commit

处理消费成功的结果时会调用 ProcessQueue.commit 进行更新msgCount、msgSize等字段并清空consumingMsgOrderlyTreeMap,最后返回偏移量后续更新消费偏移量

public long commit() {
    try {
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            //维护  msgCount、msgSize
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            //清空
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}

ProcessQueue.makeMessageToConsumeAgain

处理失败则会调用 ProcessQueue.makeMessageToConsumeAgain 将取出的消息重新放回msgTreeMap,延迟后再尝试消费

public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
    try {
        this.treeMapLock.writeLock().lockInterruptibly();
        try {
            for (MessageExt msg : msgs) {
                //消息放回msgTreeMap
                this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                this.msgTreeMap.put(msg.getQueueOffset(), msg);
            }
        } finally {
            this.treeMapLock.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("makeMessageToCosumeAgain exception", e);
    }
}

processConsumeResult

processConsumeResult根据消费状态处理结果大致分成成功与失败的情况:

如果是成功会取出消息偏移量并进行更新(调用commit

//获取偏移量
commitOffset = consumeRequest.getProcessQueue().commit();
//更新
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

更新流程与并发消费使用的组件相同(这里不再说明,之前的文章已描述),也是定时向Broker更新消费偏移量的

如果是失败则会把消息放回内存队列(调用makeMessageToConsumeAgain),然后再调用submitConsumeRequestLater延时提交消费请求(延时重新消费)

与并发消费不同的是:并发消费延时会放回Broker并且随着消费失败延时时间也会增加,而顺序消费一直都在内存中延时重试,如果一直消费失败会“卡住”导致消息堆积

总结顺序消费流程如下:

顺序消费流程

Broker处理获取分布式锁

broker维护mqLockTable的双层Map,其中第一层Key为消费者组名,第二层为队列,值为锁实体

ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable 

队列的锁实体字段包含持有锁的客户端ID和获取锁的时间,用于判断某客户端是否持有锁

static class LockEntry {
    private String clientId;
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}

获取分布式锁的请求码为LOCK_BATCH_MQ,Broker使用AdminBrokerProcessor调用RebalanceLockManager的tryLockBatch进行处理:

  1. 遍历需要加锁的队列,调用isLocked判断消费者(客户端)是否已持有队列的锁

    (获取到队列对应的锁实体,通过锁实体记录的客户端ID与当前客户端ID是否相同,持有锁时间是否过期(60S)来判断当前是否为持有锁的状态,如果持有锁相当于获取锁成功并更新获取锁的时间,加入加锁队列集合,否则加入未加锁队列集合)

  2. 如果(客户端)有队列当前未持有锁,则要尝试获取锁(操作期间为复合操作,broker使用本地锁保证原子性)

  3. 获取队列对应的锁实体判断是否持有锁(为空说明为第一次获取锁直接创建,isLocked比较客户端ID并且未过期才算获取锁,isExpired已过期也可以获取锁,其他情况就是别的客户端已经获取锁)

  4. 返回获取锁的队列集合

Broker处理批量加锁

public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
    final String clientId) {
    //加锁队列集合
    Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
    //未加锁队列集合
    Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());

    for (MessageQueue mq : mqs) {
        //判断队列上次加锁的客户端是否为当前客户端
        if (this.isLocked(group, mq, clientId)) {
            lockedMqs.add(mq);
        } else {
            notLockedMqs.add(mq);
        }
    }

    //如果有的队列上次加锁的客户端不是当前客户端 则要尝试加锁(该期间可能别的客户端还在持有锁)
    if (!notLockedMqs.isEmpty()) {
        try {
            //加锁保证原子性 (这里的锁是broker本地锁保证复合操作的原子性)
            this.lock.lockInterruptibly();
            try {
                //根据消费组拿到对应队列与锁
                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                if (null == groupValue) {
                    //初始化
                    groupValue = new ConcurrentHashMap<>(32);
                    this.mqLockTable.put(group, groupValue);
                }

                //遍历需要加锁的集合
                for (MessageQueue mq : notLockedMqs) {
                    LockEntry lockEntry = groupValue.get(mq);
                    //锁实体为空说明第一次获取直接创建
                    if (null == lockEntry) {
                        lockEntry = new LockEntry();
                        //设置获取锁的客户端ID (类似偏向锁)
                        lockEntry.setClientId(clientId);
                        groupValue.put(mq, lockEntry);
                    }

                    //如果持有锁的客户端ID相同并且未过期(60S)则更新持有锁时间
                    if (lockEntry.isLocked(clientId)) {
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        //加锁成功加入
                        lockedMqs.add(mq);
                        continue;
                    }

                    //当前持有锁的客户端ID
                    String oldClientId = lockEntry.getClientId();
					
                    //如果已过期 则设置当前客户端为新获取锁的客户端
                    if (lockEntry.isExpired()) {
                        lockEntry.setClientId(clientId);
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        //加锁成功加入
                        lockedMqs.add(mq);
                        continue;
                    }

					//其他情况 则有其他客户端已获取锁
                }
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }
    }

    return lockedMqs;
}

解锁流程同理,都是操作mqLockTable

分布式锁过期时间在服务端Broker与客户端(消费者)不同:

客户端默认加锁时间为30S rocketmq.client.rebalance.lockMaxLiveTime

客户端定时任务默认每20S进行锁续期 rocketmq.client.rebalance.lockInterval

服务端默认加锁时间为60S rocketmq.broker.rebalance.lockMaxLiveTime

总结

顺序消费的流程与并发消息流程的类似,但为了确保消息有序、依次进行消费,期间会需要加多种锁

顺序消费流程中会先对messageQueue加本地锁,这是为了确保线程池执行ConsumeRequest任务时只有一个线程执行

然后检查要消费的队列processQueue是否持有分布式锁,这是为了确保再平衡机制时被多个节点的消费者重复消费消息

如果未持有分布式锁会向Broker尝试加锁,并延时提交消费请求,后续重试

如果持有分布式锁会开始循环消费,期间也会检查持有分布式锁、超时等情况,不满足条件就延时重试

监听器消费消息时,还要持有processQueue的本地锁,这是为了防止当前消费者不再负责该队列的情况下会删除,不加锁并发删除时会导致重复消费

使用消息监听器消费完消息后根据状态进行处理结果,如果成功则在内存中更新消费偏移量,后续再定时向Broker更新(与并发消费相同)

如果失败则会将消息放回processQueue并延时提交消费请求后续重试,与并发消费不同(并发消费失败会延时,重投入重试队列再进行重试或加入死信队列,而顺序消息是一直在内存中重试,会阻塞后续消息)

再平衡机制新增processQueue时,如果是顺序消费就会去尝试获取它的分布式锁(默认30S过期),并且有定时任务默认每20S进行分布式锁续期

Broker使用锁实体作为processQueue的分布式锁,记录持有分布式锁的客户端以及过期时间(默认60S过期)

每次需要对哪些队列进行加锁,只需要判断队列对应的锁实体客户端ID以及过期时间即可

最后(点赞、收藏、关注求求啦~)

我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。