Rocket之消息消费

本文阅读 53 分钟
首页 代码,Java 正文

消息消费以组的模式开展,一个消费组可以包含多个消费者,每个消费组可以订阅多个主题,消费组之间有集群模式广播模式两种消费模式。

消费模式

Rocket目前支持集群模式广播消费模式,其中集群消费模式使用最为广泛。

  • 集群消费模式 集群模式是当前主题下的同一条消息只允许被同一消费组内的一个消费者消费。在集群模式下,消费组内的多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用的思想:**一个消息队列同一时间只允许被同一消费组内的一个消费者消费,一个消费者可以消费多个消息队列。
  • 广播消费模式 广播模式是当前主题下的同一条消息将被消费组内的所有消费者都消费一次。广播消费比较适合各个消费者实例都需要通知的场景,例如刷新应用服务器的缓存。

消费进度反馈机制

RocketMQ客户端消费一批数据后,需要向Broker反馈消息的消费进度,Broker会记录消息消费进度,这样在客户端重启或者队列重平衡时会根据其消费进度重新向Broker拉取消息。 消息消费进度反馈机制核心要点如下:

  • 消费线程池在处理完一批消息后,会将消息消费进度存储在本地内存中。
  • 客户端会启动一个定时线程,每5s将存储在本地内存中的所有队列消息消费偏移量提交到Broker中。
  • Broker收到的消息消费进度会存储在内存中,每隔5s将消息消费偏移量持久化到磁盘文件中。
  • 在客户端向Broker拉取消息时也会将该队列的消息消费偏移量提交到Broker。 注意,由于Rocket支持并发消费,如果有多个线程同时在处理多个消息,那么线程池该如何提交偏移量呢?答案是提交线程池中偏移量最小的消息的偏移量,这样能避免消息丢失,但同样有消息重复消费的风险,这需要我们在业务逻辑中对接口进行幂等处理。 另外,如果消息消费采用集群模式,那么消息进度存储在Broker上,如果采用广播模式,那么消息消费进度存储在消费端

消息服务器与消费者之间的消息传送也有两种方式:推模式拉模式。RocketMQ对这两种方式都支持。

Pull消费流程

所谓的拉模式是消费端主动发起拉取消息的请求,拉取消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定的要求,在Rocket中DefaultMQPullConsumer是默认的消费者实现类。

Push消费流程

所谓的推模式是消息到达消息服务器后,再推送给消息消费者。此模式代码接入非常简单,适合大部分业务场景。缺点是灵活度差,在了解其消费原理后,排查消费问题方可简单快捷。在Rocket中DefaultMQPushConsumer是默认的Push消费者实现类。 RocketMQ消息推模式基于拉模式实现,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

消息拉取分为3个步骤:

  • 拉取客户端消息拉取请求并封装。
  • 消息服务器查找消息并返回。
  • 消息拉取客户端处理返回的消息。

下面我们详细分析这三个步骤:

1.消息拉取客户端封装消息拉取请求

消息拉取客户端封装消息拉取请求的主要步骤如下:

  • 从PullRequest中获取ProcessQueue,如果处理队列当前状态未被丢弃,则更新ProcessQueue的lastPulltimestamp为当前时间戳。如果当前消费者被挂起,则将拉取任务延迟1s再放入PullMessageService的拉取任务队列中,最后结束本次消息拉取。
  • 进行消息拉取流控。从消息消费数量消费间隔两个维度进行控制。
    - 消息处理总数,如果ProcessQueue当前处理的消息条数超过了pullThresholdForQueue=1000,将触发流控,放弃本次拉取任务,并且该队列的下次拉取任务将在50ms后才加入拉取任务队列。每触发1000次流控后输出提示语。 - ProcessQueue中队列最大偏移量与最小偏移量的间距不能超过consumeConcurrentlyMaxSpan,否则触发流控。每触发1000次流控后输出提示语。这里主要的考量是担心因为一条消息堵塞,使消息进度无法向前推进,可能会造成大量消息重复消费。
  • 拉取该主题的订阅信息,如果为空则结束本次消息拉取,关于该队列的下一次拉取任务将延迟3s执行。
  • 构建消息拉取系统标记。
  • 调用PullAPIWrapper.pullKernelImpl方法后与服务端交互,我们先了解一下pullKernelImpl方法中的参数含义:
    - MessageQueue mq:从哪个消息消费队列拉取消息。 - String subExpression:消息过滤表达式。 - String expressionType:消息表达式类型,分为TAG、SQL92. - long offset:消息拉取偏移量。 - int maxNums:本次拉取最大消息条数,默认32条。 - int sysFlag:拉取系统标记。 - long commitOffset:当前MessageQueue的消费进度(内存中)。 - long brokerSuspendMaxTimeMillis:消息拉取过程中允许Broker挂起的时间,默认15s。 - long timeoutMillis:消息拉取超时时间。 - CommunicationMode communicationMode:消息拉取模式,默认为异步拉取 - PullCallback pullCallback :从Broker拉取到消息后的回调方法。
  • 根据brokerName、brokerID从MQClientInstance中获取Broker地址,在整个Rocket Broker的部署结构中,相同名称的Broker构成主从结构,其BrokerId会不一样,在每次拉取消息后,会给出一个建议,下次是从主节点还是从节点拉取。
  • 如果消息过滤模式为类过滤,则需要根据主题名称、Broker地址找到注册在broker上的FilterServer地址,从FilterServer上拉取消息,否则从Broker上拉取消息。上述步骤完成后,RocketMQ通过pullMessageAsync方法异步向Broker拉取消息(默认拉取方式)。

2.消息服务端Broker组装消息

根据消息拉取命令RequestCode.PULL_MESSAGE,很容易找到Broker端处理消息拉取的入口:PullMessageProcessor#processRequest。

  • 根据订阅信息构建消息过滤器。
  • 调用MessageStore.getMessage查找消息,该方法参数如下:
    - String group:消费组名称 - String topic:主题名称 - int queueId:队列ID - long offset:待拉取偏移量 - MessageFilter messageFilter:消息过滤器。
  • 根据主题名称与队列标号获取消息消费队列
    - nextBeginOffset:待查找队列的偏移量。 - minOffset:当前消息队列的最小偏移量。 - maxOffset:当前消息队列的最大偏移量。 - maxOffsetPy:当前CommitLog文件的最大偏移量。
  • 消息偏移量异常情况校对下一次拉取偏移量。
    - maxOffset=0:表示当前消费队列中没有消息。如果当前节点为主节点,下次拉取偏移量为0.如果当前Broker节点为从节点并且offsetCheckInSlave为true,设置下次拉取偏移量为0。其他情况下次拉取时使用原偏移量。 - offset<minOffset:表示待拉取消息偏移量小于队列的起始偏移量。如果当前节点为主节点,下次拉取偏移量为队列的最小偏移量。如果当前Broker为从节点并且offsetCheckInSlave为true,下次拉取偏移量为队列的最小偏移量。其他情况下次拉取时使用原偏移量。 - offset==maxOffset:表示队列中没有最新的消息,则下次拉取偏移量依然为offset。 - offset>maxOffset:表示偏移量越界。此时需要考虑当前队列的偏移量是否为0,如果当前队列的最小偏移量为0,则使用最小偏移量纠正下次拉取偏移量。如果当前队列的最小偏移量不为0,则使用该队列的最大偏移量来纠正下次拉取偏移量。
  • 如果minOffset<offset<maxOffset,从当前offset处尝试拉取32条消息,在 [Rocket之Broker]()中我们会详细介绍根据消息队列偏移量从CommitLog文件中查找消息的过程。
  • 根据PullResult填充responseHeader的NextBeginOffset、MinOffset、MaxOffset。
  • 根据主从同步延迟,如果从节点数据包含下一次拉取的偏移量,则设置下一次拉取任务的brokerId。
  • GetMessageResult与Response状态编码转换。
  • 如果CommitLog标记为可用并且当前节点为主节点,则更新消息消费进度。 服务端消息拉取处理完毕,将返回结果拉取到消息调用方。

3.消息拉取客户端处理消息

  • 根据消息服务端响应结果解码成PullResultExt对象,此时只是从网络中读取消息列表中的byte[] messageBinary属性。
  • 调用pullAPIWrapper的processPullResult,将消息字节数组解码成消息列表并填充msgFoundList,对消息继续进行消息过滤(TAG模式)。
  • 更新PullRequest的下一次拉取偏移量,如果msgFoundList为空,则立即将PullRequest放入PullMessageService的pullRequestQueue,以便PullMessageService能及时唤醒并再次执行消息拉取。为什么msgFoundList会为空呢?因为RocketMQ根据TAG进行消息过滤时,在服务端只是验证了TAG的哈希码,所以客户端再次对消息进行过滤时,可能会出现msgFoundList为空的情况。
  • 首先将拉取到的消息存入ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中供消费者消费。该方法是一个异步方法,也就是PullCallBack将消息提交到ConsumeMessageService中就会立即返回,至于这些消息如何消费,PullCallBack不会关注。
  • 将消息提交给消费者线程后,PullCallBack将立即返回,可以说本次消息拉取顺利完成。

4.消息拉取长轮序机制分析

Rocket并没有真正实现推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发送消息拉取请求,如果消息消费者向Rocket发送消息拉取时,消息并未到达消息队列,且未启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起),再去判断消息是否已到达消息队列。如果消息未到达,则提示消息拉取客户端消息不存在,如果开启了长轮询模式,RocketMQ一方面会每5s轮询检查一次消息是否可达,同时一有新消息到达后,立即通知挂起线程再次验证新消息是否是自己感兴趣的,如果是则从CommitLog文件提取消息返回给消息拉取客户端,否则挂起超时,超时时间由消息拉取方在消息拉取时封装在请求参数中,推模式默认为15s,拉模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis进行设置,默认为20s。RocketMQ通过在Broker端配置longPollingEnable为true来开启长轮询模式。

默认支持挂起,根据是否开启长轮询决定挂起方式。如果开启长轮询,挂起超时时间来自请求参数,推模式默认为15s,拉模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis进行设置,默认为20s。然后创建拉取任务PullRequest并提交到PullRequestHoldService线程中。

RocketMQ轮询机制由两个线程共同完成

  • PullRequestHoldService:每隔5s重试一次。
  • DefaultMessageStore#ReputMessageService:每处理一次重新拉取,线程休眠1s,继续下一次检查。

PullRequestHoldService是如何工作的? 根据消息主题与消息队列ID构建key,从ConcurrentMap<String/ topic@queueId /,ManyPullRequest> pullRequestTable中获取该主题队列对应的ManyPullRequest,然后将PullRequest放入ManyPullRequest。ManyPullRequest对象内部持有一个PullRequest列表,表示同一主题队列的累积拉取消息任务。 如果开启长轮询,每5s判断一次新消息是否到达。如果未开启长轮询,则默认等待1s再次判断。 遍历拉取任务表,根据主题与队列(从任务表pullRequestTable中的key获取)获取消息消费队列的最大偏移量,如果该偏移量大于待拉取偏移量,说明有新的消息到达,调用notifyMessageArriving触发消息拉取。

我们知道,RoketMQ提供了两种消费模式,集群模式广播模式。广播模式中所有消费者会消费全部的队列,故没有所谓的消费队列负载问题,而集群模式下需要考虑同一个消费组内的多个消费者之间如何分配队列问题。RocketMQ中队列负载算法

  • AllocateMessageQueueAveragely:平均分配,推荐使用。 适用场景:如果发送方发送的消息在各个队列上分布均,则使用此负载算法。
  • AllocateMessageQueueAveragelyByCircle:平均轮询分配,推荐使用。 适用场景:如果不同Broker上的消息明显不同,那么适用此负载算法。
  • AllocateMessageQueueConsistentHash:一致性哈希。因为消息队列负载信息不容易跟踪,所以不推荐使用。
  • AllocateMessageQueueByConfig:根据配置,为每一个消费者配置固定的消息队列。
  • AllocateMessageQueueByMachineRoom:根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。

如果没有特殊的要求,尽量使用AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle,这是因为分配算法比较直观。消息队列分配原则为在同一个消费组内一个消费者可以分配多个消息队列,但同一个消息队列只会分配给一个消费者,故如果消费者个数大于消息队列数量,则有些消费者无法消费消息。

对比消息队列是否发生变化,主要思路是遍历当前负载队列集合,如果队列不在新分配队列的集合中,需要将该队列停止消费并保存消费进度;遍历已分配的队列,如果队列不在队列负载列表中(processQueueTable),则需要创建该队列拉取任务PullRequest,然后添加到PullMessageService线程的pullRequestQueue中,PullMessageService才会继续拉取任务。

在消费时间过程中可能会遇到消息消费队列增加或者减少、消息消费者增加或减少,这时就需要对消息消费队列进行重新平衡,即重新分配,这就是所谓的重平衡机制。在RocketMQ中,有专门的定时任务每隔20s根据当前队列数量、消费者数量重新进行队列的负载计算,如果计算出来的结果和当前不一样,则触发消息消费队列的重平衡。以集群模式为例讲解针对单个主题进行消息队列重平衡的大致过程如下:

  • 从主题订阅信息缓存表中获取主题的队列信息。发送请求从Broker中通过topic和consumerGroup名称获取该消息组内当前所有的消费者客户端ID,主题的队列可能分布在多个broker上,那么请求该发往哪个broker呢?RocketMQ从主题的路由信息表中随机选择一个broker。Broker为什么会存在消费组内所有消费者的信息呢?这是因为消费者在启动的时候会向MQClientInstance中注册消费者,然后MQClientInstance会向所有的Broker发送心跳包,心跳包中包含MQClientInstance的消费者信息。
  • 对消费者列表和消息队列列表进行排序。之所以这样做,是为了让同一个消费组内看到的视图保持一致,确保同一个消息队列不会被多个消费者分配。
  • ConcurrentMap<MessageQueue,ProcessQueue> processQueueTable是当前消费者负载的消息队列缓存表,如果缓存表中的MessageQueue不包含在mySet中,说明经过本次消息队列负载后,该mq被分配给其他的消费者,需要暂停该消息队列消息的消费。方式是把ProccessQueue的状态设置为droped=true,该ProccessQueue中的消息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是否将MessageQueue、ProcessQueue从缓存表中移除。removeUnnecessaryMessageQueue方法主要用于持久化待移除MessageQueue的消息消费进度在推模式下,如果是集群模式并且是顺序消费,还需要先解锁队列
  • 遍历本次负载分配到的队列集合,如果ProcessQueueTable中没有包含该消息队列,表明这是本次新增加的消息队列,首先内存中移除该消息的消费进度,然后从磁盘中读取该消息队列的消费进度,创建PullRequest对象。
  • 将PullRequest加入PullMessageService,以便唤醒PullMessageService线程。

有两个问题我们需要关注一下

问题一:PullRequest对象在什么时候创建并加入PullRequestQueue,可以唤醒PullMessageService线程? 答:RebalanceService线程每隔20s对消费者订阅的主题进行一次队列的重新分配,每次分配都会获取主题的所有队列,从broker服务器实时查询当前该主题该消费组内的消费者列表,对新分配的消息队列会创建PullRequest对象。在一个JVM进程中,同一个消费组同一个队列只会存在一个PullRequest对象问题二:集群内多个消费者是如何负载主题下多个消费队列的?如果有新的消费者加入,消息队列优惠如何重新分配? 答:每次进行队列重新负载时,会从Broker实时查询当前消费组内所有的消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列,从而消费消息。

消息消费

PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后存入ProcessQueue消息处理队列中,然后调用ConsumeMessageConcurrentlyService#submitconsumeRequest方法进行消息消费。使用线程池消费消息,确保了消息拉取和消息消费的解耦。Rocket使用ConsumeMessageService来实现消息消费的处理逻辑。Rocket支持顺序消费并发消费。先重点介绍并发消费,然后补充顺序消费和并发消费的不同点。

消费者消息消费服务ConsumeMessageService的主要方法是submitconsumeRequest提交消费请求。大致步骤如下:

  • comsumeMessageBatchMaxSize表示消息批次,也就是一个消息消费任务ConsumeRequest中包含的消息条数,默认为1。msgs.size()默认最多为32条消息,受DefaultMQPushConsumer.pullBatchSize属性控制,如果msgs.size()小于comsumeMessageBatchMaxSize,则直接将拉取到的消息放入ConsumeRequest,然后将consumeRequest提交到消息消费者线程池中。如果提交过程中出现拒绝提交异常,则延迟5s再提交。这里其实是给出一种标准的拒绝提交实现方式,实际上,由于消费者线程池使用的任务队列LinkedBlocklingQueue为无界队列,故不会出现拒绝提交异常。
  • 如果拉取的消息条数大于comsumeMessageBatchMaxSize,这对拉取消息进行分页,每页comsumeMessageBatchMaxSize条消息,创建多个ConsumeRequest任务并发提交到消费线程池。
  • 进入具体的消息消费队列时,会先检查ProcessQueue的dropped,如果设置为true,则停止该队列的消费。在进行消息重新负载时,如果该消息队列被分配给消费组内的其他消费者,需要将droped设置为true,阻止消费者继续消费不属于自己的消息队列。
  • 执行消息消费钩子函数ConsumeMessageHook#consumeMessageBefore。通过consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(hook)方法消息消费执行钩子函数。
  • 恢复重试消息主题名。为什么呢?这是由消息重试机制决定的,RocketMQ将消息存入CommitLog文件时,如果发现消息的延迟级别delayTimeLevel大于0,会先将重试主题存入消息的属性,然后将主题名称设置为SCHEDULE_TOPIC_XXX,以便之后重新参与消息消费。
  • 执行具体的消息消费,调用应用程序消息监听器的consumeMessage方法,进入具体的消息消费业务逻辑,返回该批消息的消费结果。
  • 执行消息消费钩子函数ConsumeMessageHook#consumeMessageAfter。
  • 执行业务消息消费后,在处理结果前再次验证一次ProcessQueue的isDropped状态值。如果状态值为true,将不对结果进行任何处理。也就是说,在消息消费进入第四步时,如果因新的消费者加入或原先的消费者宕机,导致原先分配给消费者的队列在负载之后分配给了别的消费者,那么消息会被重复消费。
  • 根据消息监听器返回的结果计算ackIndex,如果返回Consume_SUCCESS,则将ackIndex设置为msgs.size()-1,如果返回RECONSUME_LATER,则将ackIndex设置为-1,这是为下文发送msg.back(ACK)消息做的准备。
  • 如果是广播模式,业务方会返回RECONSUME_LATER,消息并不会被重新消费,而是输出警告日志。如果是集群模式,消息消费成功,因为ackIndex=msgs.size()-1,所以i=ackindex+1等于msgs.size(),并不会执行sendMessageBack。只有在业务方返回RECONSUME_LATER时,该批消息都需要发送ACK消息,如果消息发送失败,则直接将本批ACK消费发送失败的消息再次封装为ConsumeRequest,然后延迟5s提交消费线程池进行重新消费。如果ACK消息发送成功,则该消息会延迟消费。
  • 最后从ProcessQueue中移除这批消息,这里返回的偏移量是移除该批消息后ProcessQueue中最小的偏移量。然后用该偏移量更新消息消费进度,以便消息者重启后能从上一次的消费进度开始消费,避免消息重复消费。 值得注意的是,当消息监听器返回RECONSUME_LATER时,消息消费进度也会向前推进,并用ProcessQueue中最小的队列偏移量调用消息消费进度存储器OffsetStore更新消费进度。这是因为当返回RECONSUME_LATER时,Rocket会创建一条与原消息属性相同的消息,拥有一个唯一的新msgId,并存储原消息ID,该消息会存入CommitLog文件,与原消息没有任何关联,所以该消息也会进入ConsumeQueue,并拥有一个全新的队列偏移量。

消息确认

如果消息监听器返回额消费结果为RECONSUME_LATER,则需要将这些消息发送给Broker以延迟消息。如果发送ACK消息失败,将延迟5s后提交线程池进行消费。Broker服务端处理ACK消息的逻辑如下:

  • 获取消费组订阅配置信息,如果配置信息为空,返回配置组信息不存在错误,如果重试队列数量小于1,则直接返回成功,说明该消费组不支持重试。
  • 创建重试主题,重试主题名称为%RETRY%+消费组名称,从重试队列中随机选择一个队列,并构建TopicConfig主题配置信息。
  • 根据消息物理偏移量从CommitLog文件中获取消息,同时将消息的主题存入属性。
  • 设置消息重试次数,如果消息重试次数已超过maxReconsumeTimes,再次改变newTopic主题为DLQ(“%DLQ%”也称为死信队列),该主题的权限为只写,说明消息一旦进入DLQ队列,RocketMQ将不负责再次调度消费了,需要人工干预。
  • 根据原先的消息创建一个新的消息对象,重试消息会拥有一个唯一消息ID(msgId)并存入CommitLog文件。这里不会更新原先的消息,而是会将原先的主题、消息ID存入消息属性,主题名称为重试主题,其他属性与原消息保持一致。
  • 将消息存入CommitLog文件。重点强调一下,消息重试机制的实现依托于定时任务。
  • 在存入CommitLog文件之前,如果消息的延迟级别delayTimeLevel大于0,将消息的主题与队列替换为定时任务主题“SCHEDULE_TOPIC_XXX”,队列ID为延迟级别减1。再次将消息主题、队列存入消息属性,键分别为PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID。 ACK消息(也就是需要重试的消息)存入CommitLog文件后,将依托RocketMQ定时消息机制在延迟时间到期后,再次拉取消息,提交至消费线程池。ACK消息(也就是需要重试的消息)是同步发送的,如果在发送过程中出现错误,将记录所有发送ACK消息失败的消息,然后再次封装成ConsumeRequest,延迟5s执行。

消费进度管理

消息消费者在消费一些消息后,需要记录该批消息已经消费完毕,否则当消费者重新启动时,又要从消息消费队列最开始消费。从上文可知,一次消息消费后会从ProcessQueue消费队列中移除该批消息,返回ProcessQueue的最小偏移量,并存入消息进度表。那消息进度文件存储在哪里合适呢?

  • 广播模式:同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。因此广播模式消息消费进度存储在消费者本地。
  • 集群模式:同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化而重新负载,因此消费进度需要保存在每个消费者都能访问到的地方。集群模式消息消费进度文件存放在消息服务端。

我们知道,集群模式消息进度存储文件放在消息服务端。消息消费进度集群模式实现类RemoteBrokerOffsetStore。 集群模式下如果从内存中读取消费进度,则是从RemoteBrokerOffsetStore的ConcurrentMap<MessageQueue,Atomic-Long> offsetTable中根据消息消费队列获取其消息消费进度。如果从磁盘读取,则发送网络请求。持久化消息进度,更新ConsumerOffsetManager的ConcurrentMap<String / topic@group /,ConcurrentMap<Integer/ 消息队列ID /,Long/ 消息消费进度 />> offsetTable,Broker端默认10s持久化一次消息进度

结合并发消息消费的整个流程,思考一下并发消息消费关于消息进度更新的问题:

  • 消费者线程池每处理完一个消息消费任务(ConsumeRequest),会从ProcessQueue中移除本批消费的消息,并返回ProcessQueue中最小的偏移量,用该偏移量更新消息队列消费进度,也就是说更新消费进度与消费任务中的消息没有关系。这就是说,如果消费队列中有某个偏移量很小的消息发生了死锁,就会导致消息进度无法向前推进。为了避免这种情况,RocketMQ引入了一种消息拉取流控措施:消息处理队列ProcessQueue中最大消息偏移量和最小消息偏移量不能超过设置的值,如果超过该值,将触发流控,延迟该消息队列的消息拉取。
  • 在进行消息负载时,如果消息消费队列被分配给其他消费者,会将该ProcessQueue状态设置为droped=true,持久化该消息队列的消费进度,并从内存中将其移除。

定时消息是指消息发送到Broker后,不会立即被消费者消费,而是要等到特定的时间后才能被消费,Rocket并不支持任意的时间精度,因为如果要支持任意时间精度到的定时调度,则不可避免地需要在Broker层做消息排序,在加上持久化方面的考量,将不可避免地带来巨大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。

消息延迟级别在Broker段通过messageDelaylevel进行配置,默认为“1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h”,delayLevel=1表示延迟1s,delayLevel=2表示延迟5s,依次类推。RocketMQ定时消息实现类为ScheduleMessageService。该类的实例在DefaultMessageStore中创建,通过在DefaultMessageStore中调用load()方法加载并调用start()方法进行启动。

start()方法根据延迟级别创建对应的定时任务,启动定时任务持久化存储延迟消息队列进度。

  • 根据延迟队列创建定时任务。遍历延迟级别,根据延迟级别从offsetTable中获取消息队列的消费进度,如果不存在,则使用0。也就是说每个延迟级别对应一个消息消费队列。然后创建定时任务,每个定时任务第一次启动时,默认延迟1s执行一次定时任务,从第二次调度开始,才使用相应的延迟时间执行定时任务。延迟级别与消息消费队列的映射关系为消息队列ID=延迟级别-1。 定时消息的第一个设计关键点是,定时消息单独一个主题:SCHEDULE_TOPIC_XXX,该主题下的队列数量等于MessageStoreConfig#messageDelayLevel配置的延迟级别,其对应关系为queueID等于延迟级别减1。ScheduleMessageService为每个延迟级别创建一个定时器,根据延迟级别对应的延迟时间进行延迟调度。在消息发送时,如果消息的延迟级别delayLevel大于 0,将消息的原主题名称、队列ID存入消息属性,然后改变消息的主题、队列和延迟主题所属队列,消息将最终转发到延迟队列的消费队列中。
  • 创建定时任务,每隔10s持久化一次延迟队列的消息消费进度,持久化频率可以通过flushDelayOffsetInterval进行配置。

定时调度逻辑

ScheduleMessageService的start()方法启动后,会为每一个延迟级别创建一个调度任务,每个延迟级别对应SCHEDULE_TOPIC_XXX主题下的一个消息消费队列。定时调度任务的实现类为DeliverDelayedMessageTimerTask,其核心实现为executeOnTimeup。

  • 根据队列ID和延迟主题查找消息消费队列,如果未找到,说明当前不存在该延时级别的消息,则忽略本次任务,根据延时级别创建下一次调度任务。
  • 根据offset从消息消费队列中获取当前队列中所有有效的消息。如果未找到,则更新延迟队列的定时拉取进度并创建定时任务,待下一次继续尝试。
  • 遍历ConsumeQueue文件,每个标准ConsumeQueue条目为20个字节。解析出消息的物理偏移量、消息长度、消息标志的哈希码,为从CommitLog文件加载具体的消息做准备。
  • 根据消息物理偏移量与消息大小从CommitLog文件中查找消息。如果未找到消息,则打印错误日志,根据延迟时间创建下一个定时器。
  • 根据消息属性重新构建新的消息对象,清除消息的延迟级别属性(delayLevel),恢复消息原先的消息主题与消息消费队列,消息的消费次数(reconsumeTimes)并不会丢失。
  • 将消息再次存入CommitLog文件,并转发到主题对应的消息队列上,供消费者再次消费。
  • 更新延迟队列的拉取进度。

定时任务设计额第二个关键点是消息存储时,如果消息的延迟级别属性delayLevel大于0,则会备份原主题、原队列到消息属性中,其键分别为PROPERTY_TOPIC、PROPERTY_REAL_QUEUE_ID,通过为不同的延迟级别创建不同的调度任务,到达延迟时间后执行调度任务。调度任务主要是根据延迟拉取消息消费进度从延迟队列中拉取消息,然后从CommitLog文件中加载完整消息,清除延迟级别属性并恢复原先的主题、队列,再次创建一条新的消息存入CommitLog文件并转发到消息消费队列中供消息消费者消费。

RocketMQ支持表达式过滤类过滤两种消息过滤机制。表达式模式分为TAG与SQL92模式,SQL93模式以消息属性过滤上下文,实现SQL条件过滤表达式,而TAG模式就是简单为消息定义标签,根据消息属性tag进行匹配。

RocketMQ消息过滤方式不同于其他消息中间件,是在订阅时进行过滤的。 消息发送者在消息发送时如果设置了消息的标志属性,便会存储在消息属性中,将其从CommitLog文件转发到消息消费队列中,消息消费队列会用8个字节存储消息标志的哈希码。之所以不直接存储字符串,是因为将ConsumeQueue设计为定长结构,以加快消息消费的加载性能。在Broker端拉取消息时,遍历ConsumeQueue,只对比消息标志的哈希码、如果匹配则返回,否则忽略该消息。消费端在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息标志的值而不是哈希码。 RocketMQ实现消息过滤的过程如下

  • 消费者订阅消息主题与消息过滤表达式。
  • 根据订阅消息属性构建消息属性拉取标记,设置subExpression、classFilter等与消息过滤相关参数。
  • 根据主题、消息过滤表达式构建订阅消息实体,如果不是TAG模式,构建过滤数据ConsumeFilterData。
  • 构建消息过滤对象,ExpressionForRetryMessageFilter支持对重试主题的过滤,ExpressMessageFilter表示不支持对重试主题的属性进行过滤,也就是如果是TAG模式,执行isMatchedByCommitLog方法将直接返回true。
  • 根据偏移量拉取消息后,首先根据ConsumeQueue条目进行消息过滤,如果不匹配则直接跳过该条消息,继续拉取下一条消息。
  • 如果消息根据ConsumeQueue条目进行过滤,则需要从CommitLog文件中加载整个消息体,然后根据属性进行过滤。当然如果过滤方式是TAG模式,该方法默认返回true。
  • Rocket会在消息接收端再次进行消息过滤。

RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息按顺序消费,如果需要做到全局顺序消费,则可以将主题配置成一个队列,适用于数据库Binlog等严格要求顺序消息消费的场景。顺序消息消费包含4个步骤:消费队列负载消费拉取消息消费消息消费进度存储

消息队列负载

RocketMQ首先需要通过RebalanceService线程实现消息队列的负载,集群模式下同一个消费者组内的消费者共同承担其订阅主题下消息队列的消费,同一个消息消费队列在同一时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列。 经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成功,则创建消息队列的拉取任务,否则跳过,等待其他消费者释放该消息队列的锁,然后再一次队列重新负载时再尝试加锁。

顺序消息消费与并发消息消费的一个关键区别是,顺序消息在创建消息队列拉取任务时,需要在broker服务器锁定该消息队列

消息拉取

RocketMQ消息拉取由PullMessageService线程负责,根据消息拉取任务循环拉取消息。 如果消息处理队列未被锁定,则延迟3s后再将PullRequest对象放入拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取偏移量,然后向消息服务端拉取消息。

消息消费

顺序消息消费实现类是ConsumeMessageOrderLyService。 如果消费模式为集群模式,启动定时任务,默认每隔20s锁定一次分配给自己的消息消费队列。通过lockInterval设置间隔,该值建议与一次消息负载频率相同。我们知道,集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,在未锁定消息队列时无法执行消息拉取任务,ConsumeMessageOrderLyService以20s的频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。

  • ConcurrentMap<MessageQueue,ProcessQueue> processQueueTable表示将消息队列按照Broker组织成Map<String/ brokerName /,Set>。
  • 向broker(主节点)发送锁定消息队列,该方法会返回成功被当前消费者锁定的消息消费队列。
  • 将成功锁定的消息消费队列对应的处理队列设置为锁定状态,同时更新加锁时间。
  • 遍历当前处理队列中的消息消费队列,如果当前消费者不持有该消息队列的锁,则将处理队列锁的状态设置为true,暂停该消息消费队列的消息拉取与消息消费。

构建消费任务ConsumeRequest并提交到消费线程池中。顺序消息的ConsumeRequest消费任务不会直接消费本次拉取的消息,而是在消息消费时从处理队列中拉取消息,接下来详细分析ConsumeRequest的run()方法。

  • 如果消息处理队列为丢弃,这停止本次消费任务。
  • 根据消息队列获取一个对象。消费消息时申请独占objLock,顺序消息消费的并发度为消息队列,也就是一个消息消费队列同一时刻只会被一个消费线程池中的一个线程消费。
  • 如果是广播模式,则直接进入消费,无须锁定处理队列,因为相互之间无竞争。如果是集群模式,消息消费的前提条件是ProcessQueue被锁定并且锁未超时。思考这个问题:如果消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,还未来得及将ProcessQueue解除锁定,就被另外一个消费者添加进去,此时会不会出现多个消息消费者同时消费一个消息队列的情况?答案是不会的,因为当一个新的消费队列分配给消费者时,在添加其拉取任务之前必须先向Broker发送对该消息队列加锁的请求,只有加锁成功后,才能添加拉取消息,否则等到下一次负载后,只有消费队列被原先占有的消费者释放后,才能开始新的拉取任务。集群模式下,如果未锁定处理队列,则延迟该队列的消息消费
  • 顺序消息消费处理逻辑,每一个ConsumeRequest消费任务不是以消费消息条数来计算的,而是根据消费时间,默认当消费时长大MAX_TIME_CONSUME_CONTINUOUSLY后,结束本次消费任务,由消费组内其他线程继续消费。
  • 每次从处理队列中按顺序取出consumeBatchSize消息,如果未取到消息,则设置continueConsume为false,本次消费任务结束。消息顺序消费时,从ProcessQueue中取出的消息会临时存储在ProcessQueue的consumingMsgOrderlyTreeMap属性中。
  • 实行消息消费钩子函数。
  • 申请消息消费锁,如果消息队列被丢弃,则放弃消费该消息消费队列,然后执行消息消费监听器,调用业务方具体的消息监听器执行真正的消息消费处理逻辑,并通知RocketMQ消息消费结果。
  • 执行消息消费钩子函数,就算消息消费过程中应用程序抛出异常,钩子函数的后处理逻辑也会被调用。
  • 如果消息消费结果为ConsumeOrderLyStatus.SUCCESS,执行ProcessQueue的commit()方法,并返回带更新的消息消费进度。

提交就是将该批消息从ProcessQueue中移除,维护msgCount(消息处理队列中的消息条数)并获取消息消费的偏移量offset,然后将该批消息从msgTreeMapTemp中移除,并返回待保存的消息消费进度(offset+1)。可以看出,offset表示消息消费队列的逻辑偏移量,类似于数组的下标,代表第n个ConsumeQueue条目。

检查消息的重试次数。如果消息重试次数大于或等于允许的最大重试次数,将该消息发送到Broker段。该消息在消息服务端最终会进入DLQ(死信队列),也就是RocketMQ不会再次消费,需要人工干预。如果消息成功进入DLQ队列,checkReconsumeTimes返回false,将直接调用ProcessQueue#commit提交该批消息,表示消息消费成功,如果这批消息中有任意一条消息的重试次数小于允许的最大重试次数,将返回true,执行消息重试。

消息消费重试是先将该批消息重新放入ProcessQueue的msgTreeMap,然后清除consumingMsgOrderlyTreeMap,默认延迟1s再加入消费队列并结束此次消息消费。

可以通过DefaultMQPushConsumer#setSuspendCurrentQueueTimeMillis设置当前队列的重试挂起时间。执行消息重试时,如果消息消费进度并未向前推进,则将本次消费视为无效消费,将不更新消息消费进度。

注意:对于RocketMQ顺序消费,失败重试次数为Integer.MAX_VALUE,即一直重试,会阻止消息进度向前推进,故应用需要在超过重试次数时,引入人为干预机制。特别是要区分业务异常和系统异常,业务异常通常是因为不满足某项业务限制,重试注定无法成功,故一定要设置一定的规则,进行业务降级。

  • 存储消息消费进度

顺序消息消费的各个环节基本都是围绕消息消费队列(MessageQueue)与消息处理队列(ProcessQueue)展开的。拉取消息消费进度,要判断ProcessQueue的locked是否为true,为true的前提条件是消息消费者向Broker端发送锁定消息队列的请求并返回加锁成功。

学习更多有关RocketMQ的知识请参见 RocketMQ之简介

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://blog.csdn.net/qq_38571892/article/details/121492558
-- 展开阅读全文 --
Web安全—逻辑越权漏洞(BAC)
« 上一篇 03-13
Redis底层数据结构--简单动态字符串
下一篇 » 04-10

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复