RocketMQ(七):消费者如何保证顺序消费?
RocketMQ(七):消费者如何保证顺序消费?
之前的文章已经描述过消费消息有并发、顺序两种方式,其中并发消费常用于无序消息中,而顺序消息用于有序消息
顺序消费是确保消息严格有序的前提,当需要确保消息有序时需要采用顺序消费,否则会可能打破消息的有序性
顺序消费较为复杂,会涉及到多种锁来保证顺序消费
本篇文章就来描述顺序消费的原理,来看看RocketMQ是如何保证顺序消费的,导图如下:
往期好文:
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)
顺序消费原理
再平衡时加分布式锁
在顺序消费下,再平衡机制为了让每个队列都只分配到一个消费者,要向Broker获取该队列的分布式锁
再平衡更新ProcessQueue时,调用updateProcessQueueTableInRebalance
新增时,如果是顺序的再平衡要先判断内存队列processQueue是否加分布式锁:
for (MessageQueue mq : mqSet) {
//本地没有则要新增
if (!this.processQueueTable.containsKey(mq)) {
//如果顺序 要判断队列是否加锁, 未获取说明其他消费者已经获取由它负责,这里跳过
if (isOrder && !this.lock(mq)) {
continue;
}
//新增略
}
}
RebalanceImpl.lock
加锁的流程:获取Broker信息、调用lockBatchMQ向Broker申请给(批量)队列加分布式锁、将拿到锁队列对应的ProcessQueue设置为已加锁并更新获取锁的时间用于判断锁是否过期
public boolean lock(final MessageQueue mq) {
//获取Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
//向Broker请求要加锁的队列 返回的结果就是获取到锁的队列
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
//将获取到锁的队列对应的ProcessQueue的lock字段设置为true标识获取到分布式锁,并更新锁时间用于判断是否过期
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
同时在ConsumeMessageOrderlyService组件启动时会开启定时任务(默认20S)调用lockAll
向Broker获取当前负责processQueue的分布式锁(如果已持有则相当于续期)
顺序消费流程
前文已经说过:PullRequest(拉取消息)与MessageQueue、ProcessQueue(内存中存储、消费消息)一一对应,关系密切
在消费端流程中涉及三把锁,MessageQueue的本地锁、ProcessQueue的分布式锁和本地锁
PullRequest拉取消息到ProcessQueue后会提交异步消费消息,封装ConsumeRequest提交到线程池,其集群模式下的顺序消费流程为:
- 获取该队列messageQueue本地锁加锁(防止线程池并发消费同一队列)
- 校验是否持有processQueue分布式锁,如果未持有调用
tryLockLaterAndReconsume
延迟尝试加processQueue分布式锁并提交消费请求【未获取到锁,说明其他节点已获取分布式锁,当前节点先延时3s再进行消费(可能是再平衡机制将该队列分配给其他节点)】 - 循环开始顺序消费消息,每次消费设置的最大消费消息数量,如果消费成功就循环消费,期间校验是否持有processQueue分布式锁,以及是否超时(默认60S,超时延时提交消费请求,期间还会封装上下文和执行消费前后的钩子)
- 真正调用消息监听器进行消息消费时,需要获取processQueue的本地锁(再平衡如果将队列分配给其他消费者,会删除该队列,加锁防止在删除的过程中可能并发进行消费,防止多节点的重复消费)
- 最后处理消费后的结果
processConsumeResult
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
//获取该队列本地内存锁的锁对象
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
//队列加锁
synchronized (objLock) {
//集群模式要获取分布式锁并且不过期 否则调用tryLockLaterAndReconsume延迟处理
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
//消费成功循环处理 直到失败延迟处理
for (boolean continueConsume = true; continueConsume; ) {
//再次校验 没获取锁 或者 超时 都会调用tryLockLaterAndReconsume延迟处理
//...
//如果循环超时则延时提交消费请求,后续重新消费
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
//每次最大消费消息数量
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//从内存队列中获取要消费的消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
//消息可能是重试的,重置为正常的topic
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
//顺序上下文
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
//执行后的状态
ConsumeOrderlyStatus status = null;
//构建上下文 行前的钩子..
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//消费前加processQueue本地锁
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//顺序消费消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
}
//处理状态..
//执行后的钩子..
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//处理消费后的状态
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
顺序消费过程中部分流程也与并发消费类似,只不过需要通过加锁的方式保证顺序消费
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
processQueue 存储消息的内存队列是由两个TreeMap实现的,Key为消息的偏移量作为顺序,优先消费先持久化的消息(偏移量小)
其中 msgTreeMap
存储拉取到内存的消息,consumingMsgOrderlyTreeMap
在顺序消费时才使用,取出消息时将消息存入该容器,消费失败时再将消息放回 msgTreeMap
后续重复进行消费
public List<MessageExt> takeMessages(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
//取出加写锁
this.treeMapLock.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
//取出首个节点 也就是偏移量小的消息
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
//加入consumingMsgOrderlyTreeMap
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
ProcessQueue.commit
处理消费成功的结果时会调用 ProcessQueue.commit
进行更新msgCount、msgSize等字段并清空consumingMsgOrderlyTreeMap,最后返回偏移量后续更新消费偏移量
public long commit() {
try {
this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
//维护 msgCount、msgSize
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
//清空
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
ProcessQueue.makeMessageToConsumeAgain
处理失败则会调用 ProcessQueue.makeMessageToConsumeAgain
将取出的消息重新放回msgTreeMap,延迟后再尝试消费
public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
try {
this.treeMapLock.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
//消息放回msgTreeMap
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
}
}
processConsumeResult
processConsumeResult
根据消费状态处理结果大致分成成功与失败的情况:
如果是成功会取出消息偏移量并进行更新(调用commit
)
//获取偏移量
commitOffset = consumeRequest.getProcessQueue().commit();
//更新
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
更新流程与并发消费使用的组件相同(这里不再说明,之前的文章已描述),也是定时向Broker更新消费偏移量的
如果是失败则会把消息放回内存队列(调用makeMessageToConsumeAgain
),然后再调用submitConsumeRequestLater
延时提交消费请求(延时重新消费)
与并发消费不同的是:并发消费延时会放回Broker并且随着消费失败延时时间也会增加,而顺序消费一直都在内存中延时重试,如果一直消费失败会“卡住”导致消息堆积
总结顺序消费流程如下:
Broker处理获取分布式锁
broker维护mqLockTable的双层Map,其中第一层Key为消费者组名,第二层为队列,值为锁实体
ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable
队列的锁实体字段包含持有锁的客户端ID和获取锁的时间,用于判断某客户端是否持有锁
static class LockEntry {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
}
获取分布式锁的请求码为LOCK_BATCH_MQ
,Broker使用AdminBrokerProcessor调用RebalanceLockManager的tryLockBatch进行处理:
-
遍历需要加锁的队列,调用
isLocked
判断消费者(客户端)是否已持有队列的锁(获取到队列对应的锁实体,通过锁实体记录的客户端ID与当前客户端ID是否相同,持有锁时间是否过期(60S)来判断当前是否为持有锁的状态,如果持有锁相当于获取锁成功并更新获取锁的时间,加入加锁队列集合,否则加入未加锁队列集合)
-
如果(客户端)有队列当前未持有锁,则要尝试获取锁(操作期间为复合操作,broker使用本地锁保证原子性)
-
获取队列对应的锁实体判断是否持有锁(为空说明为第一次获取锁直接创建,isLocked比较客户端ID并且未过期才算获取锁,isExpired已过期也可以获取锁,其他情况就是别的客户端已经获取锁)
-
返回获取锁的队列集合
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
//加锁队列集合
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
//未加锁队列集合
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
//判断队列上次加锁的客户端是否为当前客户端
if (this.isLocked(group, mq, clientId)) {
lockedMqs.add(mq);
} else {
notLockedMqs.add(mq);
}
}
//如果有的队列上次加锁的客户端不是当前客户端 则要尝试加锁(该期间可能别的客户端还在持有锁)
if (!notLockedMqs.isEmpty()) {
try {
//加锁保证原子性 (这里的锁是broker本地锁保证复合操作的原子性)
this.lock.lockInterruptibly();
try {
//根据消费组拿到对应队列与锁
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
//初始化
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
//遍历需要加锁的集合
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
//锁实体为空说明第一次获取直接创建
if (null == lockEntry) {
lockEntry = new LockEntry();
//设置获取锁的客户端ID (类似偏向锁)
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
}
//如果持有锁的客户端ID相同并且未过期(60S)则更新持有锁时间
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
//加锁成功加入
lockedMqs.add(mq);
continue;
}
//当前持有锁的客户端ID
String oldClientId = lockEntry.getClientId();
//如果已过期 则设置当前客户端为新获取锁的客户端
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
//加锁成功加入
lockedMqs.add(mq);
continue;
}
//其他情况 则有其他客户端已获取锁
}
} finally {
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
return lockedMqs;
}
解锁流程同理,都是操作mqLockTable
分布式锁过期时间在服务端Broker与客户端(消费者)不同:
客户端默认加锁时间为30S rocketmq.client.rebalance.lockMaxLiveTime
客户端定时任务默认每20S进行锁续期 rocketmq.client.rebalance.lockInterval
服务端默认加锁时间为60S rocketmq.broker.rebalance.lockMaxLiveTime
总结
顺序消费的流程与并发消息流程的类似,但为了确保消息有序、依次进行消费,期间会需要加多种锁
顺序消费流程中会先对messageQueue加本地锁,这是为了确保线程池执行ConsumeRequest任务时只有一个线程执行
然后检查要消费的队列processQueue是否持有分布式锁,这是为了确保再平衡机制时被多个节点的消费者重复消费消息
如果未持有分布式锁会向Broker尝试加锁,并延时提交消费请求,后续重试
如果持有分布式锁会开始循环消费,期间也会检查持有分布式锁、超时等情况,不满足条件就延时重试
监听器消费消息时,还要持有processQueue的本地锁,这是为了防止当前消费者不再负责该队列的情况下会删除,不加锁并发删除时会导致重复消费
使用消息监听器消费完消息后根据状态进行处理结果,如果成功则在内存中更新消费偏移量,后续再定时向Broker更新(与并发消费相同)
如果失败则会将消息放回processQueue并延时提交消费请求后续重试,与并发消费不同(并发消费失败会延时,重投入重试队列再进行重试或加入死信队列,而顺序消息是一直在内存中重试,会阻塞后续消息)
再平衡机制新增processQueue时,如果是顺序消费就会去尝试获取它的分布式锁(默认30S过期),并且有定时任务默认每20S进行分布式锁续期
Broker使用锁实体作为processQueue的分布式锁,记录持有分布式锁的客户端以及过期时间(默认60S过期)
每次需要对哪些队列进行加锁,只需要判断队列对应的锁实体客户端ID以及过期时间即可
最后(点赞、收藏、关注求求啦~)
我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)