Rocket之nameServer

本文阅读 12 分钟
首页 代码,Java 正文

消息中间件的设计思路一般都是基于主题的订阅发布机制,消息生产者发送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送给消费者(推模式)或者消息消费者主动向消息服务器拉取消息(拉模式),从而实现消息生产者与消费者的解耦。为了避免因消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么消息生产者如何知道消息要发往哪台消息服务器?如果某一台消息服务器宕机了,生产者如何在不重启服务的情况下感知呢?消费者又是如何知道从哪台消息服务器上获取消息呢?

NameServer就是为了解决上述问题而设计的。NameServer是专门针对RocketMQ开发的轻量级协调者,多个NameServer节点可以组成一个NameServer集群,帮助RocketMQ集群达到高可用。

NameServer的主要功能是临时保存、管理Topic路由信息,各个NameServer节点是无状态的,即每两个NameServer节点之间不通信,互相不知道彼此的存在。

NameServer中保存的数据被称为Topic路由信息,Topic路由决定了Topic消息发送到哪些Broker,消费者从哪些Broker消费消息

Broker消息服务器在启动时向所有的NameServer注册,消息生产者在发送消息之前先从NameServer获取Broker服务器的地址列表,然后根据负载算法从列表中选择一台消息服务器发送消息。NameServer与每台Broker服务器保持长连接,并间隔10s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除,但是路由变化不会马上通知消息生产者。为什么要这么设计呢?这是为了降低NameServer实现的复杂性,因此需要在消息发送端提供容错机制来保证消息发送的高可用性。

上面说过,NameServer集群的节点是无状态的,各个节点之间互不通信。虽然NameServer服务器之间在某一时刻的数据并不完全相同,但对消息发送不会造成重大影响,无非就是短暂造成消息发送不均衡,这种设计主要是为了简化NameServer的复杂度,尽可能的提高性能,这也是NameServer设计最主要的思路。

消息客户端与NameServer、Broker的交互设计要点如下:

  • Brker每个30s向NameServer集群的每一台机器发送心跳包,包含自身创建的topic路由等信息。
  • 消息客户端每隔30s向NameServer更新对应topic的路由信息。
  • NameServer收到Broker发送的心跳包时会记录时间戳。
  • NameServer每隔10s会扫描一次brokerLiveTable(存放心跳包的时间戳信息),如果在120s内没有收到心跳包,则认为Broker失效,更新topic的路由信息,将失效的Broker信息移除。
  • 从角色角度,两者都是协调者
  • 从配置保存角度,NameServer将配置保存在内存;Zookeeper持久化到磁盘
  • 从是否支持选举角度,NameServer不支持选举;Zookeeper支持选举
  • 从数据一致性角度,NameServer是弱一致性,各个节点无状态,互不通信,依靠心跳保持数据一致;Zookeeper是强一致性
  • 从高可用角度,两者都是高可用设计
  • 从设计逻辑角度,NameServer仅仅是数据的CRUD;Zookeeper支持Raft选举,逻辑复杂难懂,排查问题较难

NameServer的主要作用是为消息生产者和消息消费者提供关于topic的路由信息,那么NameServer就需要存储路由的基础信息,并且能够管理Broker节点,包括路由注册、路由删除等功能。

NameServer路由元信息

那么路由信息都包含哪些数据呢?路由数据结构的实现代码都在RouteInfoManager类中,数据结构如下:

  • BROKER_CHANNEL_EXPIRED_TIME:broker存活的时间周日,默认为120s。
  • topicQueueTable:保存Topic和队列的信息,也是真正的路由信息,消息发送时根据路由表进行负载均衡。
  • brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址。
  • clusterAddrTable:Broker集群信息,存储集群哄所有的Broker的名称。
  • brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。
  • filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。类模式过滤机制在4.4及以后版本被废弃

注意:RockertMQ基于订阅发布机制,一个topic拥有多个消息队列,一个Broker默认为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,BrokerName由相同的多台Broker组成主从架构,brokerId=0代表主节点,brokerId>0表示从节点。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

NameServer路由注册

Rocket路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,之后每隔30s向集群中所有的NameServer发送心跳包,NameServer收到Broker心跳包时会先更新brokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimestamp,然后每隔10s扫描一次brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除Broker的路由信息,同时关闭Socket链接。

主要流程: 1.Broker发送心跳包 Rocket的网络传输基于Netty。Broker发送心跳包就是把一些必要的信息发送给NameServer,主要信息如下:

  • brokerAddr:broker地址。
  • brokerId:brokerId=0表示主节点,brokerId>0表示从节点。
  • brokerName:broker名称。
  • clusterName:broker所在集群的名称。
  • haServerAddr:主节点地址,初次请求时该值为空,从节点向NameServer注册后返回。
  • requestBody
    - topicConfigWrapper:主题配置,topicConfigWrapper内部封装的是TopicConfigManager中的topicConfigTable,内部存储的是Broker启动时默认的一些topic。Broker中topic默认存储在${Rocket_Home}/store/config/topics.json中。 - filterServerList:消息过滤服务器列表。

2.NameServer处理心跳包

DefaultRequestProcessor是网络处理器解析请求类型,如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到RouteInfoNanager#registerBroker。

  • 路由注册需要加写锁,防止并发修改RouteInfoManager中的路由表。首先判断Broker所属的集群是否存在,如果不存在,则创建集群,然后将broker名加入集群的Broker集合。
  • 维护BrokerData信息,首先从brokerAddrTable中根据broker名尝试获取Broker信息,如果不存在,则新建BrokerData并放入brokerAddrTable,registerFirst设置为true;如果存在直接替换原先的Broker信息,registerFirst设置为false,表示非第一次注册。
  • 如果Broker为主节点,并且Broker的topic配置信息发生变化或者是初次注册,则需要创建或者更新topic路由元数据,并填充topicQueueTable,其实就是为默认主题自动注册路由信息,其中包含MixAll.DEFAULT_TOPIC的路由信息。
  • 更新BrokerLiveInfo,存储状态正常的Broker信息表,BrokerLiveInfo是执行路由删除操作的重要依据。
  • 注册Broker的过滤器Server地址列表,一个Broker上会关联多个FilterServer消息过滤服务器。如果此Broker为从节点,则需要查找该Broker的主节点信息,并更新对应的masterAddr属性。

注意点:NameServer与Broker保持长连接,Broker的状态信息存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、brokerAddrTable、brokerLiveTable。filterServerTable)。更新上述路由表(HashTable)使用了锁粒度较小的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发。同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行,这也是读写锁经典的使用场景。

NameServer路由剔除

我们知道,Broker每隔30s向NameServer发送一个心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称。那么如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何剔除失效的Broker呢?

NameServer会每隔10s扫描一次brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker的链接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable。filterServerTable。

RocketMQ有两个触发点来触发移除Broker信息:

  • NameServer定时扫描brokerLiveTable,检测上次心跳包与当前系统时间的时间戳差值是否大于120s,如果大于,则移除该Broker信息。
  • Broker在正常关闭的情况下,会执行unregisterBroker指令。

路由删除的具体步骤:

  • 申请写锁。根据brokerAddress从brokerLiveTable、filterServerTable中删除Broker相关的信息。
  • 维护brokerAddrTable。遍历HashMap<brokerName,BrokerData>brokerAddrTable,从BrokerData的HashMap<brokerId,broker address>brokerAddrs中,找到具体的Broker,从BrokerData中将其删除。如果移除后再BrokerData中不再包含其他Broker,则在brokerAddrTable中移除该brokerName对应的条目。
  • 根据BrokerName,从clusterAddeTable中找到Broker并将其从集群中移除。如果移除后,集群中不包含任何Broker,则将该集群从clusterAddrTable中移除。
  • 根据brokerName,遍历所有主题的队列,如果队列中包含当前Broker的队列,则移除,如果topic只包含待移除Broker的队列,从路由表中删除该topic。
  • 释放锁,完成路由删除。

NameServer路由发现

Rocket路由发现是非实时的,当topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由

学习更多有关RocketMQ的知识请参见 RocketMQ之简介

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://blog.csdn.net/qq_38571892/article/details/121439105
-- 展开阅读全文 --
Web安全—逻辑越权漏洞(BAC)
« 上一篇 03-13
Redis底层数据结构--简单动态字符串
下一篇 » 04-10

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复