RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)
RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)
之前的文章已经说过拉取消息和并发消费消息的原理,其中消费者会根据要负责的队列进行消息的拉取以及消费,而再平衡机制就是决定消费者要负责哪些队列的
RocketMQ设计上,一个队列只能被一个消费者进行消费,而消费者可以同时消费多个队列
Consumer Rebalance 消费者再平衡机制用于将队列负载均衡到消费者
可以把它理解成一个分配资源的动作,其中的资源就是队列,而把队列分配给哪些消费者负责就是再平衡机制决定的
当消费者上/下线,队列动态扩容/缩容,都会导致触发再平衡机制,导致重新分配资源
频繁触发可能会导致吞吐量降低、数据一致性等危害
本篇文章就来聊聊Consumer Rebalance(再/重平衡)机制的原理以及危害和预防方式,思维导图如下:
往期好文:
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
消费者再平衡 doRebalance
负责再平衡的组件是RebalanceImpl,对应推拉消费模式,它也有推拉的实现:RebalancePushImpl、RebalanceLitePullImpl
还是以推的消费来查看源码,也就是DefaultMQPushConsumerImpl与RebalancePushImpl
doRebalance
doRebalance
方法是再平衡的开始方法,会根据每个Topic进行再平衡
(同一个Topic下的一个队列虽然只能由一个消费者负责,但是消费者可以负责多个Topic的队列)
public void doRebalance(final boolean isOrder) {
//获取当前消费者订阅信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
//遍历每个Topic
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//根据Topic进行再平衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//清理不再需要的ProcessorQueue(没在该消费者订阅Topic)
this.truncateMessageQueueNotMyTopic();
}
进行完再平衡后,会调用truncateMessageQueueNotMyTopic
清理不再需要的ProcessorQueue
分析前必须要知道:PullRequest、ProcessorQueue与MessageQueue一一对应,PullRequest用于拉取消息,拉取到消息将消息放入ProcessorQueue中后续进行消费,MessageQueue为操作它们时相当于操作(拉取消息、消费消息)哪个队列 (我愿称它们为“黄金铁三角”,但它们没有武魂融合技)
根据Topic再平衡(核心方法) rebalanceByTopic
根据Topic进行再平衡是再平衡的核心方法,分为广播、集群模式进行处理
广播模式消费者要处理该Topic下所有的队列,而集群模式下会通过不同的策略来进行分配队列
集群模式下再平衡的流程为:
- 获取该Topic下所有队列
this.topicSubscribeInfoTable.get(topic)
(内存获取,路由数据来源NameServer) - 向Broker获取该Topic下消费者组中的所有消费者(客户端ID)
this.mQClientFactory.findConsumerIdList(topic, consumerGroup)
(由于消费者可能不在同一个节点上,但它们都会向Broker注册,因此要去Broker获取) - 所有队列和消费者客户端ID排序后,使用分配队列策略进行分配队列,默认平均哈希算法
strategy.allocate
(比如三个队列,三个消费者就平均每个消费者一个队列,如果是四个队列,那第一个消费者就多负责个队列) - 通过分配完的队列更新ProcessorQueue
this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder)
(删除不需要再负责的ProcessorQueue以及新增需要负责的ProcessorQueue) - 如果更新ProcessorQueue,还要改变队列的流控配置以及向所有Broker进行心跳
this.messageQueueChanged(topic, mqSet, allocateResultSet)
rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
//广播模式
case BROADCASTING: {
//获取当前消费者要负责的队列即topic下所有队列(这些队列就是当前消费者要负责的,广播下就是topic下所有队列,消费者都要负责)
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
//根据消费者要负责的队列更新ProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
//更新ProcessQueue成功还要更改对应的消息队列
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
//获取该Topic下所有队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//向Broker查询该Topic下的这个消费者组的所有消费者客户端ID,方便后续给消费者分配队列
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (mqSet != null && cidAll != null) {
//所有队列 set->list
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//所有队列、消费者客户端ID排序 方便分配
Collections.sort(mqAll);
Collections.sort(cidAll);
//分配队列的策略 默认是AllocateMessageQueueAveragely平均哈希队列算法,在构建消费者DefaultMQPushConsumer时确定的
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
//调用策略获取要分配给当前消费者的队列
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
//要分配的队列 list->set
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//根据要分配的队列更新ProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
//更新ProcessQueue同时改变MessageQueue
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
查询所有消费者ID findConsumerIdList
在向Broker查当前Topic、消费者组下所有消费者ID时,也是老套路:
- 先多级缓存查Broker地址信息,本地查不到就去NameServer
- 再通过API RPC请求Broker,其中请求码为
GET_CONSUMER_LIST_BY_GROUP
public List<String> findConsumerIdList(final String topic, final String group) {
//多级缓存查broker
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
try {
//API请求
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
} catch (Exception e) {
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
}
}
return null;
}
平均哈希算法分配队列 AllocateMessageQueueAveragely.allocate
平均哈希算法就是每个消费者先平均负责相同的队列,如果此时还有队列多出就按照消费者顺序依次多分配一个队列
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
//校验
//...
List<MessageQueue> result = new ArrayList<MessageQueue>();
//当前消费者下标
int index = cidAll.indexOf(currentCID);
//模 相当于平均后多出来的队列
int mod = mqAll.size() % cidAll.size();
//得到平均值 如果mod > 0 并且 index < mod 说明平均后有队列多出来,index在mod前的消费者都要多分配一个队列
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
//开始的下标
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
//加入队列的范围
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
需要注意的是这里的cidAll是同组消费者ID列表,如果多消费者组同时订阅相同的Topic(包括tag也相同),那么消费时会导致各消费者组都有消费者进行消息消费
根据分配队列更新ProcessQueue(updateProcessQueueTableInRebalance)
先将不用负责的与拉取超时的ProcessQueue进行删除(topic相同,但其对应的mq不在分配的mq中)
如果有需要新增的ProcessQueue,会先删除本地存储broker该队列消费偏移量,再从broker请求该队列最新的消费偏移量(之前拉取消息文章中已经分析过此流程)
新增ProcessQueue时,如果之前没有维护过mq与ProcessQueue的对应关系,还要新增PullRequest,最后将PullRequest返回队列,便于后续拉取消息
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
//当前消费者的processQueueTable迭代器
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
//topic相同 但是mq不在分配的队列中就删除
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
//拉取超时 默认120s 推的情况下删除
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
//新增的队列要添加对应的ProcessQueue
if (!this.processQueueTable.containsKey(mq)) {
//先删除内存中存储的偏移量
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
//向Broker获取该队列的消费偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
//添加ProcessQueue 如果之前不存在还要添加PullRequest
//(PullRequest拉取消息、ProcessQueue存储消息、MessageQueue一一对应)
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//PullRequest放入队列,方便后续进行拉取消息
this.dispatchPullRequest(pullRequestList);
return changed;
}
消息队列更改 messageQueueChanged
如果删除过ProcessorQueue或新增过PullRequest,都要对队列的消息数量、大小流控进行更改,并加锁通知所有Broker
最终调用API请求码:HEART_BEAT向Broker进行心跳
RebalancePushImpl.messageQueueChanged
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
/**
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
//当前队列数量
int currentQueueCount = this.processQueueTable.size();
if (currentQueueCount != 0) {
//更改消息数量流控:topic最多允许拉多少消息
int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
if (pullThresholdForTopic != -1) {
//每个队列允许存多少消息
int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
log.info("The pullThresholdForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
}
//更改消息大小流控 与数量类似
int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
if (pullThresholdSizeForTopic != -1) {
int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
log.info("The pullThresholdSizeForQueue is changed from {} to {}",
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
}
}
// notify broker
//加锁通知所有broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
通知所有Broker时会将MQClientInstance记录的生产者、消费者都进行心跳,因此要加锁避免重复
Broker处理
再平衡过程中,Broker需要处理三种请求:
- 查询队列的消费偏移量(以前文章分析过,这里不分析)
- 查询消费者组下所有消费者
- 消费者心跳
接下来依次分析Broker是如何处理的
查询消费者组下所有消费者
请求码为GET_CONSUMER_LIST_BY_GROUP
,处理的组件为ConsumerManageProcessor
(之前也分析过用它来读写消费偏移量,马上它所有处理的请求就都要分析完了)
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
getConsumerListByGroup
会使用ConsumerManager的consumerTable,根据消费者组获取ConsumerGroupInfo信息
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup());
ConsumerGroupInfo中消费者的信息被封装为ClientChannelInfo存储在channelInfoTable中,其中就可以提取到消费者的客户端ID
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
实现比较简单,其中的很多信息都是在心跳时更新的,接下来看下心跳的流程
心跳
心跳的请求码为HEART_BEAT
,由ClientManageProcessor负责处理,会调用heartBeat
处理心跳的过程中会先处理消费者的心跳再处理生产者心跳
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
//心跳数据 分为消费者和生产者的心跳
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
//客户端channel 方便后续通信交互
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
//处理消费者心跳
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//消费组订阅配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
//消费组变更通知是否开启 后续变更通知组内消费者再平衡
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
//创建重试队列的Topic
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
//处理生产者心跳
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
消费者的心跳最终调用registerConsumer
判断消费者或者Topic订阅数据是否有改变,改变会通知组内消费者
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
//没有就创建
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
//改变客户端消费者channel
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
//改变订阅数据
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
//如果消费者或组内订阅有变通知其他消费者
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
//通知已注册 更新消息过滤器的订阅数据
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
改变客户端channel实际上就是新增消费者的channel或覆盖已存在的channel,只有新增才算更改,才会后续通知
这里的channelInfoTable就是查询消费组下消费者ID会用到的
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) {
//没有就创建
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
//以前也没有说明以前未注册过,改变成功
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
updated = true;
}
infoOld = infoNew;
} else {
//客户端channel存在,但id不相等 则覆盖ClientChannelInfo
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
this.groupName,
infoOld.toString(),
infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
更改订阅数据就是新增当前组内没有的订阅Topic,以及删除当前组内不在需要订阅的Topic
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
//topic订阅数据
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
//以前不存在则更新成功
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
//时间戳新则覆盖
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
//遍历当前组的订阅数据
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
//如果当前订阅数据有但心跳时没有的Topic要删除
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
最后,只有新增channel或订阅时才会调用notifyConsumerIdsChanged
使用组内存储的客户端channel通知组内消费者,请求码为NOTIFY_CONSUMER_IDS_CHANGED
客户端收到这个请求会唤醒定时再平衡的线程去触发再平衡
注册生产者的流程也是去维护生产者客户端channel的,通过客户端channel便于RPC通信,这里就不过多赘述
触发再平衡的时机
触发再平衡机制是由RebalanceService循环定时触发的,默认情况下是等待20s触发一次
//默认等待20S
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
public void run() {
while (!this.isStopped()) {
//等待
this.waitForRunning(waitInterval);
//触发再平衡
this.mqClientFactory.doRebalance();
}
}
消费者启动/上线通过定时任务触发再平衡
DefaultMQPushConsumerImpl消费者在启动时,会启动MQClientInstance,而MQClientInstance会去启动再平衡的定时任务RebalanceService
但是RebalanceService会先等待再去触发再平衡,因此在消费者启动最后的步骤会调用rebalanceImmediately
唤醒RebalanceService,从而触发再平衡
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
//..
//启动MQClientInstance 它会去启动RebalanceService
mQClientFactory.start();
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//唤醒RebalanceService
this.mQClientFactory.rebalanceImmediately();
}
流程:启动->开始再平衡定时任务->等待->被消费者启动最后代码唤醒->触发再平衡->新的消费者通知Broker->Broker通知组内其他消费者再平衡
消费者关闭/下线触发再平衡
消费者关闭时也会触发各种组件的关闭方法,其中有三个与触发消费者重新再平衡有关的操作,根据时间流程如下:
- mQClientFactory.shutdown:关闭MQClientInstance,会去关闭rebalanceService,从而唤醒rebalanceService触发再平衡
- rebalanceImpl.destroy:清理 processQueueTable,触发再平衡后processQueue会更改从而会给Broker心跳
- mQClientFactory.unregisterConsumer:注销消费者,给Broker心跳时会改变消费者,因此Broker会告诉其他消费者下线
//注销消费者 向broker发送注销请求 broker会给组内其他消费者下发再平衡
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
请求码为UNREGISTER_CLIENT
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
//销毁channel
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
//通知组内再平衡
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
流程:销毁消费者->Broker通知组内所有消费者再平衡
Broker通知消费者改变
消费者接收Broker通知组内消费者有改变时,又会去唤醒再平衡的线程,导致触发再平衡
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//...
//唤醒
this.mqClientFactory.rebalanceImmediately();
//
return null;
}
MQClientInstance定时任务向Broker心跳
MQClientInstance有个默认30S的定时任务会向Broker进行心跳,消费者有改动也可能导致Broker下发再平衡机制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
流程与触发时机总结如下图:
上文遗留问题
上篇文章描述消费消息,在消费失败时会进行重试,此时会进行延时最终把消息投递到重试Topic上
那么重试Topic是如何触发再平衡,以及生成PullRequest继续走通后续拉取消息的流程呢?
DefaultMQPushConsumerImpl在启动时,会调用copySubscription
,集群模式下会将消费组的重试Topic加入rebalanceImpl的内部订阅中
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
后续遍历topic进行再平衡时,也会遍历重试Topic从而能够拉取重试队列的消息进行消费重试
再平衡导致的问题
从再平衡机制的流程不难看出,它牺牲部分一致性来满足流程中不阻塞的可用性,从而达到最终一致性
在程序启动、队列扩容/缩容、消费者上线/下线等场景下,都可能导致短暂的再平衡分配队列不一致的情况,从而导致消息会被延迟消费、可能被重复消费
如果要确保再平衡分配队列完全一致,不出现重复消费的情况,就只能处理再平衡阶段时通过停止拉取、消费等工作,牺牲可用性来换取一致性
虽然最终一致性的再平衡机制可能会出现短暂的负载不一致,但只需要消费者做幂等即可解决,而且再平衡期间满足可用性,不会影响性能
当线上需要在业务高峰期进行大量队列扩容,增强消费能力时会触发再平衡机制,可能影响吞吐量从而导致性能下降
为了避免这种情况,可以新增topic、队列,在旧消费者组临时增加“转发消息”的消费者,将消息转发到新队列中实现水平扩容消费粒度
总结
再平衡机制负责将队列负载均衡到消费者,是拉取消息、消费消息的前提
再平衡通过牺牲一定的一致性(频繁触发可能负载不一致)来满足可用性,以此达到最终一致性,期间可能出现消息重复消费,因此消费要做幂等
消费者触发再平衡时,先遍历订阅的Topic,并根据Topic进行再平衡,通过获取Topic下的所有队列,并向Broker获取同组的其他消费者,然后根据分配策略分配队列给当前消费者,再根据分配的队列更新ProcessQueue,如果ProcessQueue有更新则要维护MQ的流控并向所有Broker进行心跳
Broker收到心跳后更新消费者的channel与订阅,如果有新增则会向同组消费者下发再平衡请求
消费者上线/下线、队列的增加、减少都会触发组内消费者的再平衡,消费者的定时任务也会触发再平衡
如果多消费者组同时订阅相同的Topic(包括tag也相同),那么消费时会导致各消费者组都有消费者进行消息消费
最后(点赞、收藏、关注求求啦~)
本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜
- 点赞
- 收藏
- 关注作者
评论(0)