RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)

举报
菜菜的后端私房菜 发表于 2024/11/11 09:48:58 2024/11/11
【摘要】 图文并茂、深入浅出解析RocketMQ Consumer Rebalance机制

RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)

之前的文章已经说过拉取消息和并发消费消息的原理,其中消费者会根据要负责的队列进行消息的拉取以及消费,而再平衡机制就是决定消费者要负责哪些队列的

RocketMQ设计上,一个队列只能被一个消费者进行消费,而消费者可以同时消费多个队列

Consumer Rebalance 消费者再平衡机制用于将队列负载均衡到消费者

可以把它理解成一个分配资源的动作,其中的资源就是队列,而把队列分配给哪些消费者负责就是再平衡机制决定的

当消费者上/下线,队列动态扩容/缩容,都会导致触发再平衡机制,导致重新分配资源

频繁触发可能会导致吞吐量降低、数据一致性等危害

本篇文章就来聊聊Consumer Rebalance(再/重平衡)机制的原理以及危害和预防方式,思维导图如下:

思维导图

往期好文:

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)

RocketMQ(四):消费前如何拉取消息?(长轮询机制)

RocketMQ(五):揭秘高吞吐量并发消费原理

消费者再平衡 doRebalance

负责再平衡的组件是RebalanceImpl,对应推拉消费模式,它也有推拉的实现:RebalancePushImpl、RebalanceLitePullImpl

还是以推的消费来查看源码,也就是DefaultMQPushConsumerImpl与RebalancePushImpl

doRebalance

doRebalance方法是再平衡的开始方法,会根据每个Topic进行再平衡

(同一个Topic下的一个队列虽然只能由一个消费者负责,但是消费者可以负责多个Topic的队列)

public void doRebalance(final boolean isOrder) {
    //获取当前消费者订阅信息
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        //遍历每个Topic 
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                //根据Topic进行再平衡
                this.rebalanceByTopic(topic, isOrder);
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    //清理不再需要的ProcessorQueue(没在该消费者订阅Topic)
    this.truncateMessageQueueNotMyTopic();
}

进行完再平衡后,会调用truncateMessageQueueNotMyTopic清理不再需要的ProcessorQueue

分析前必须要知道:PullRequest、ProcessorQueue与MessageQueue一一对应,PullRequest用于拉取消息,拉取到消息将消息放入ProcessorQueue中后续进行消费,MessageQueue为操作它们时相当于操作(拉取消息、消费消息)哪个队列 (我愿称它们为“黄金铁三角”,但它们没有武魂融合技)

根据Topic再平衡(核心方法) rebalanceByTopic

根据Topic进行再平衡是再平衡的核心方法,分为广播、集群模式进行处理

广播模式消费者要处理该Topic下所有的队列,而集群模式下会通过不同的策略来进行分配队列

集群模式下再平衡的流程为:

  1. 获取该Topic下所有队列this.topicSubscribeInfoTable.get(topic)(内存获取,路由数据来源NameServer)
  2. 向Broker获取该Topic下消费者组中的所有消费者(客户端ID) this.mQClientFactory.findConsumerIdList(topic, consumerGroup)(由于消费者可能不在同一个节点上,但它们都会向Broker注册,因此要去Broker获取)
  3. 所有队列和消费者客户端ID排序后,使用分配队列策略进行分配队列,默认平均哈希算法 strategy.allocate(比如三个队列,三个消费者就平均每个消费者一个队列,如果是四个队列,那第一个消费者就多负责个队列)
  4. 通过分配完的队列更新ProcessorQueue this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder)(删除不需要再负责的ProcessorQueue以及新增需要负责的ProcessorQueue)
  5. 如果更新ProcessorQueue,还要改变队列的流控配置以及向所有Broker进行心跳 this.messageQueueChanged(topic, mqSet, allocateResultSet)

再平衡集群模式主流程

rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        //广播模式
        case BROADCASTING: {
            //获取当前消费者要负责的队列即topic下所有队列(这些队列就是当前消费者要负责的,广播下就是topic下所有队列,消费者都要负责)
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                //根据消费者要负责的队列更新ProcessQueue
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    //更新ProcessQueue成功还要更改对应的消息队列
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
            //获取该Topic下所有队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //向Broker查询该Topic下的这个消费者组的所有消费者客户端ID,方便后续给消费者分配队列
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);

            if (mqSet != null && cidAll != null) {
                //所有队列 set->list
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);

                //所有队列、消费者客户端ID排序 方便分配
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                //分配队列的策略 默认是AllocateMessageQueueAveragely平均哈希队列算法,在构建消费者DefaultMQPushConsumer时确定的
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                //调用策略获取要分配给当前消费者的队列
                List<MessageQueue> allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                        e);
                    return;
                }

                //要分配的队列 list->set
                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                //根据要分配的队列更新ProcessQueue
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    //更新ProcessQueue同时改变MessageQueue
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

查询所有消费者ID findConsumerIdList

在向Broker查当前Topic、消费者组下所有消费者ID时,也是老套路:

  1. 先多级缓存查Broker地址信息,本地查不到就去NameServer
  2. 再通过API RPC请求Broker,其中请求码为GET_CONSUMER_LIST_BY_GROUP
public List<String> findConsumerIdList(final String topic, final String group) {
    //多级缓存查broker
    String brokerAddr = this.findBrokerAddrByTopic(topic);
    if (null == brokerAddr) {
        this.updateTopicRouteInfoFromNameServer(topic);
        brokerAddr = this.findBrokerAddrByTopic(topic);
    }

    if (null != brokerAddr) {
        try {
            //API请求
            return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
        } catch (Exception e) {
            log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
        }
    }

    return null;
}

平均哈希算法分配队列 AllocateMessageQueueAveragely.allocate

平均哈希算法就是每个消费者先平均负责相同的队列,如果此时还有队列多出就按照消费者顺序依次多分配一个队列

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    List<String> cidAll) {
    //校验
    //...
    List<MessageQueue> result = new ArrayList<MessageQueue>();

	//当前消费者下标
    int index = cidAll.indexOf(currentCID);
    //模 相当于平均后多出来的队列
    int mod = mqAll.size() % cidAll.size();
    //得到平均值 如果mod > 0 并且 index < mod 说明平均后有队列多出来,index在mod前的消费者都要多分配一个队列
    int averageSize =
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
            + 1 : mqAll.size() / cidAll.size());
    //开始的下标
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    //加入队列的范围
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

需要注意的是这里的cidAll是同组消费者ID列表,如果多消费者组同时订阅相同的Topic(包括tag也相同),那么消费时会导致各消费者组都有消费者进行消息消费

根据分配队列更新ProcessQueue(updateProcessQueueTableInRebalance)

先将不用负责的与拉取超时的ProcessQueue进行删除(topic相同,但其对应的mq不在分配的mq中)

如果有需要新增的ProcessQueue,会先删除本地存储broker该队列消费偏移量,再从broker请求该队列最新的消费偏移量(之前拉取消息文章中已经分析过此流程)

新增ProcessQueue时,如果之前没有维护过mq与ProcessQueue的对应关系,还要新增PullRequest,最后将PullRequest返回队列,便于后续拉取消息

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;
	
    //当前消费者的processQueueTable迭代器
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        if (mq.getTopic().equals(topic)) {
            //topic相同 但是mq不在分配的队列中就删除
            if (!mqSet.contains(mq)) {
                pq.setDropped(true);
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                }
            } else if (pq.isPullExpired()) {
                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY:
                        //拉取超时 默认120s 推的情况下删除
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                consumerGroup, mq);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
        //新增的队列要添加对应的ProcessQueue
        if (!this.processQueueTable.containsKey(mq)) {
			//先删除内存中存储的偏移量
            this.removeDirtyOffset(mq);
            
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = -1L;
            try {
                //向Broker获取该队列的消费偏移量
                nextOffset = this.computePullFromWhereWithException(mq);
            } catch (Exception e) {
                log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
                continue;
            }

            if (nextOffset >= 0) {
                //添加ProcessQueue 如果之前不存在还要添加PullRequest
                //(PullRequest拉取消息、ProcessQueue存储消息、MessageQueue一一对应)
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    //PullRequest放入队列,方便后续进行拉取消息
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

消息队列更改 messageQueueChanged

如果删除过ProcessorQueue或新增过PullRequest,都要对队列的消息数量、大小流控进行更改,并加锁通知所有Broker

最终调用API请求码:HEART_BEAT向Broker进行心跳

RebalancePushImpl.messageQueueChanged

public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    /**
     * When rebalance result changed, should update subscription's version to notify broker.
     * Fix: inconsistency subscription may lead to consumer miss messages.
     */
    SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
    long newVersion = System.currentTimeMillis();
    log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
    subscriptionData.setSubVersion(newVersion);

    //当前队列数量
    int currentQueueCount = this.processQueueTable.size();
    if (currentQueueCount != 0) {
        //更改消息数量流控:topic最多允许拉多少消息
        int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
        if (pullThresholdForTopic != -1) {
            //每个队列允许存多少消息 
            int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount);
            log.info("The pullThresholdForQueue is changed from {} to {}",
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal);
            this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
        }

        //更改消息大小流控 与数量类似
        int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
        if (pullThresholdSizeForTopic != -1) {
            int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount);
            log.info("The pullThresholdSizeForQueue is changed from {} to {}",
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal);
            this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
        }
    }

    // notify broker
    //加锁通知所有broker
    this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}

通知所有Broker时会将MQClientInstance记录的生产者、消费者都进行心跳,因此要加锁避免重复

Broker处理

再平衡过程中,Broker需要处理三种请求:

  1. 查询队列的消费偏移量(以前文章分析过,这里不分析)
  2. 查询消费者组下所有消费者
  3. 消费者心跳

接下来依次分析Broker是如何处理的

查询消费者组下所有消费者

请求码为GET_CONSUMER_LIST_BY_GROUP,处理的组件为ConsumerManageProcessor

(之前也分析过用它来读写消费偏移量,马上它所有处理的请求就都要分析完了)

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

getConsumerListByGroup会使用ConsumerManager的consumerTable,根据消费者组获取ConsumerGroupInfo信息

ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

ConsumerGroupInfo consumerGroupInfo =
    this.brokerController.getConsumerManager().getConsumerGroupInfo(
        requestHeader.getConsumerGroup());

ConsumerGroupInfo中消费者的信息被封装为ClientChannelInfo存储在channelInfoTable中,其中就可以提取到消费者的客户端ID

private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16);

Broker查询消费者组内消费者ID

实现比较简单,其中的很多信息都是在心跳时更新的,接下来看下心跳的流程

心跳

心跳的请求码为HEART_BEAT,由ClientManageProcessor负责处理,会调用heartBeat

处理心跳的过程中会先处理消费者的心跳再处理生产者心跳

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    //心跳数据 分为消费者和生产者的心跳
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    //客户端channel 方便后续通信交互
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
        ctx.channel(),
        heartbeatData.getClientID(),
        request.getLanguage(),
        request.getVersion()
    );

    //处理消费者心跳
    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
        //消费组订阅配置
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                data.getGroupName());
        //消费组变更通知是否开启 后续变更通知组内消费者再平衡
        boolean isNotifyConsumerIdsChangedEnable = true;
        if (null != subscriptionGroupConfig) {
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            //创建重试队列的Topic
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        }

        //注册消费者
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
            data.getGroupName(),
            clientChannelInfo,
            data.getConsumeType(),
            data.getMessageModel(),
            data.getConsumeFromWhere(),
            data.getSubscriptionDataSet(),
            isNotifyConsumerIdsChangedEnable
        );

        if (changed) {
            log.info("registerConsumer info changed {} {}",
                data.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        }
    }

    //处理生产者心跳
    for (ProducerData data : heartbeatData.getProducerDataSet()) {
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
            clientChannelInfo);
    }
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

消费者的心跳最终调用registerConsumer判断消费者或者Topic订阅数据是否有改变,改变会通知组内消费者

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

    //获取消费者组信息
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        //没有就创建
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    //改变客户端消费者channel
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);
    //改变订阅数据
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            //如果消费者或组内订阅有变通知其他消费者
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    //通知已注册 更新消息过滤器的订阅数据
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
    return r1 || r2;
}

改变客户端channel实际上就是新增消费者的channel或覆盖已存在的channel,只有新增才算更改,才会后续通知

这里的channelInfoTable就是查询消费组下消费者ID会用到的

public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
    MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
    boolean updated = false;
    this.consumeType = consumeType;
    this.messageModel = messageModel;
    this.consumeFromWhere = consumeFromWhere;

    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
    if (null == infoOld) {
        //没有就创建
        ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        if (null == prev) {
            //以前也没有说明以前未注册过,改变成功
            log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                messageModel, infoNew.toString());
            updated = true;
        }

        infoOld = infoNew;
    } else {
        //客户端channel存在,但id不相等 则覆盖ClientChannelInfo
        if (!infoOld.getClientId().equals(infoNew.getClientId())) {
            log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
                this.groupName,
                infoOld.toString(),
                infoNew.toString());
            this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        }
    }

    this.lastUpdateTimestamp = System.currentTimeMillis();
    infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);

    return updated;
}

更改订阅数据就是新增当前组内没有的订阅Topic,以及删除当前组内不在需要订阅的Topic

public boolean updateSubscription(final Set<SubscriptionData> subList) {
    boolean updated = false;
	//topic订阅数据
    for (SubscriptionData sub : subList) {
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        if (old == null) {
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            if (null == prev) {
                //以前不存在则更新成功
                updated = true;
                log.info("subscription changed, add new topic, group: {} {}",
                    this.groupName,
                    sub.toString());
            }
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                log.info("subscription changed, group: {} OLD: {} NEW: {}",
                    this.groupName,
                    old.toString(),
                    sub.toString()
                );
            }
			//时间戳新则覆盖
            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }

    //遍历当前组的订阅数据
    Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, SubscriptionData> next = it.next();
        String oldTopic = next.getKey();

        boolean exist = false;
        for (SubscriptionData sub : subList) {
            if (sub.getTopic().equals(oldTopic)) {
                exist = true;
                break;
            }
        }

        //如果当前订阅数据有但心跳时没有的Topic要删除
        if (!exist) {
            log.warn("subscription changed, group: {} remove topic {} {}",
                this.groupName,
                oldTopic,
                next.getValue().toString()
            );

            it.remove();
            updated = true;
        }
    }

    this.lastUpdateTimestamp = System.currentTimeMillis();

    return updated;
}

最后,只有新增channel或订阅时才会调用notifyConsumerIdsChanged使用组内存储的客户端channel通知组内消费者,请求码为NOTIFY_CONSUMER_IDS_CHANGED

客户端收到这个请求会唤醒定时再平衡的线程去触发再平衡

Broker接收心跳

注册生产者的流程也是去维护生产者客户端channel的,通过客户端channel便于RPC通信,这里就不过多赘述

触发再平衡的时机

触发再平衡机制是由RebalanceService循环定时触发的,默认情况下是等待20s触发一次

//默认等待20S
private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));

public void run() {
    while (!this.isStopped()) {
        //等待
        this.waitForRunning(waitInterval);
        //触发再平衡
        this.mqClientFactory.doRebalance();
    }
}

消费者启动/上线通过定时任务触发再平衡

DefaultMQPushConsumerImpl消费者在启动时,会启动MQClientInstance,而MQClientInstance会去启动再平衡的定时任务RebalanceService

但是RebalanceService会先等待再去触发再平衡,因此在消费者启动最后的步骤会调用rebalanceImmediately唤醒RebalanceService,从而触发再平衡

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            //..
            
            //启动MQClientInstance 它会去启动RebalanceService
            mQClientFactory.start();
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //唤醒RebalanceService
    this.mQClientFactory.rebalanceImmediately();
}

流程:启动->开始再平衡定时任务->等待->被消费者启动最后代码唤醒->触发再平衡->新的消费者通知Broker->Broker通知组内其他消费者再平衡

消费者关闭/下线触发再平衡

消费者关闭时也会触发各种组件的关闭方法,其中有三个与触发消费者重新再平衡有关的操作,根据时间流程如下:

  1. mQClientFactory.shutdown:关闭MQClientInstance,会去关闭rebalanceService,从而唤醒rebalanceService触发再平衡
  2. rebalanceImpl.destroy:清理 processQueueTable,触发再平衡后processQueue会更改从而会给Broker心跳
  3. mQClientFactory.unregisterConsumer:注销消费者,给Broker心跳时会改变消费者,因此Broker会告诉其他消费者下线
//注销消费者 向broker发送注销请求 broker会给组内其他消费者下发再平衡
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());

请求码为UNREGISTER_CLIENT

public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    boolean isNotifyConsumerIdsChangedEnable) {
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null != consumerGroupInfo) {
        //销毁channel
        consumerGroupInfo.unregisterChannel(clientChannelInfo);
        if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
            ConsumerGroupInfo remove = this.consumerTable.remove(group);
            if (remove != null) {
                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
            }
        }
        //通知组内再平衡
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }
}

流程:销毁消费者->Broker通知组内所有消费者再平衡

Broker通知消费者改变

消费者接收Broker通知组内消费者有改变时,又会去唤醒再平衡的线程,导致触发再平衡

public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    //...
    
    //唤醒
    this.mqClientFactory.rebalanceImmediately();
    //
    return null;
}

MQClientInstance定时任务向Broker心跳

MQClientInstance有个默认30S的定时任务会向Broker进行心跳,消费者有改动也可能导致Broker下发再平衡机制

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

流程与触发时机总结如下图:

触发再平衡的时机

上文遗留问题

上篇文章描述消费消息,在消费失败时会进行重试,此时会进行延时最终把消息投递到重试Topic上

那么重试Topic是如何触发再平衡,以及生成PullRequest继续走通后续拉取消息的流程呢?

DefaultMQPushConsumerImpl在启动时,会调用copySubscription,集群模式下会将消费组的重试Topic加入rebalanceImpl的内部订阅中

case CLUSTERING:
    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
    break;

后续遍历topic进行再平衡时,也会遍历重试Topic从而能够拉取重试队列的消息进行消费重试

再平衡导致的问题

从再平衡机制的流程不难看出,它牺牲部分一致性来满足流程中不阻塞的可用性,从而达到最终一致性

在程序启动、队列扩容/缩容、消费者上线/下线等场景下,都可能导致短暂的再平衡分配队列不一致的情况,从而导致消息会被延迟消费、可能被重复消费

如果要确保再平衡分配队列完全一致,不出现重复消费的情况,就只能处理再平衡阶段时通过停止拉取、消费等工作,牺牲可用性来换取一致性

虽然最终一致性的再平衡机制可能会出现短暂的负载不一致,但只需要消费者做幂等即可解决,而且再平衡期间满足可用性,不会影响性能

当线上需要在业务高峰期进行大量队列扩容,增强消费能力时会触发再平衡机制,可能影响吞吐量从而导致性能下降

为了避免这种情况,可以新增topic、队列,在旧消费者组临时增加“转发消息”的消费者,将消息转发到新队列中实现水平扩容消费粒度

总结

再平衡机制负责将队列负载均衡到消费者,是拉取消息、消费消息的前提

再平衡通过牺牲一定的一致性(频繁触发可能负载不一致)来满足可用性,以此达到最终一致性,期间可能出现消息重复消费,因此消费要做幂等

消费者触发再平衡时,先遍历订阅的Topic,并根据Topic进行再平衡,通过获取Topic下的所有队列,并向Broker获取同组的其他消费者,然后根据分配策略分配队列给当前消费者,再根据分配的队列更新ProcessQueue,如果ProcessQueue有更新则要维护MQ的流控并向所有Broker进行心跳

Broker收到心跳后更新消费者的channel与订阅,如果有新增则会向同组消费者下发再平衡请求

消费者上线/下线、队列的增加、减少都会触发组内消费者的再平衡,消费者的定时任务也会触发再平衡

如果多消费者组同时订阅相同的Topic(包括tag也相同),那么消费时会导致各消费者组都有消费者进行消息消费

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。