Rocket之消息发送

本文阅读 18 分钟
首页 代码,Java 正文
  • 生产者组:一个逻辑概念,在使用生产者实例的时候需要指定一个组名。一个生产者组可以生产多个Topic的消息。
  • 生产者实例:一个生产者组部署了多个进程,每个进程都可以称为一个生产者实例。
  • Topic:主题名称,一个Topic由若干Queue组成。
  • 同步发送:发送者向RocketMQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
  • 异步发送:发送者向RocketMQ执行发送消息API时,指定消息发送成功后的回调函数,调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向发送:消息发送者向RocketMQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数。简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
  • 普通消息 普通消息也称为并发消息,和传统的队列相比,并发消息没有顺序,但是生产消费都是并行进行的。
  • 顺序消息 与Kafka中的分区类似,RocketMQ中把一个topic分为多个队列(与kafka中的分区对应)保存和消费,在一个队列内的消息就是传统的队列,遵循FIFO(先进先出)原则。所以一个topic下的消息是局部有序的,如果想要某个topic的所有消息全局有序,只需将此topic对应的队列数设置为1即可,但这也会降低读写性能。
  • 延迟消息 消息发送后,消费者要在一定时间后,或者指定某个时间点才可以消费。在没有延迟消息时,基本的做法是基于定时计划任务调度,定时发送消息。在RocketMQ中只需要在发送消息时设置延迟级别既可以实现。
  • 事务消息 主要涉及分布式事务,即需要保证在多个操作同时成功或者同时失败时,消费者才能消费消息。RocketMQ通过发送Half消息、处理本地事务、提交消息或者回滚消息有呀的实现分布式事务。但是需要注意,事务消息只是针对消息生产者的,如果消费者消费消息失败并不会回滚消息生产者中的操作,同样会出现数据不一样的情况,后面会详细来讲。
  • 批量消息 严格来说,批量消息并不是一种消息类型,只是一种提高消息发送性能的手段。批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数据越多,性能越好,判断依据是单条消息的长度,如果单条消息内容比较长,则打包发送多条消息会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过配置的值。批量发送消息要解决的是如何将这些消息编码,以便服务端能够正确解码每条消息的内容,下面会详细讲解。

Rocket消息封装类是org.apache.rocketmq.common.message.Message。主要包含的属性:

  • topic:消息所属主题
  • flag:消息标记(Rocket不做处理)
  • properties:消息扩展属性。此属性为Map类型。
    - tags:消息tag,用户消息过滤。 - keys:消息索引键,用空格隔开,Rocket可以根据这些key快速检索消息。 - waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
  • byteArr:消息体。此属性为byte[]类型。

Topic路由机制

消息发送者向某一个topic发送消息时,需要查询topic的路由信息。初次发送时会根据topic的名称向NameServer集群查询topic的路由信息,然后将其存储在本地内存缓存中,并且每隔30s依次遍历缓存中的topic,向NameServer查询最新的路由信息。如果成功查询到路由信息,会将这些信息更新至本地缓存,实现topic路由信息的动态感知。

如果消息发送者向一个不存在的主题发送消息且配置了消息服务器可以自动创建主题,此时是可以发送消息成功的,但是在生产环境强烈不建议开启自动创建主题

注意:Rocket中的路由消息是持久化在Broker中的,NameServer中的路由信息来自Broker的心跳包并存储在内存中。

消息发送的高可用设计

我们知道,在消息发送时,Broker可能出现故障无法使用,且宕机的Broker并不能实时的被消息生产者感知,这就有可能导致生产者会将消息发送到已经宕机的Broker上。这种情况发生时,Rocket是如何保证消息发送的高可用呢?

Rocket为了实现消息发送的高可用,引入了两个非常重要的特性:消息发送重试机制故障规避机制

消息发送重试机制

Rocket支持同步、异步发送,无论哪种方式都可以配置失败后重试。例如,配置项retryTimesWhenSendFailed表示同步重试次数,默认为2次,加上正常发送的1次,总共3次机会。

故障规避机制

当消息第一次发送失败时,如果下一次消息还是发送到刚刚失败的Broker上,其消息发送大概率还是会失败,因此为了保证重试的可靠性,在重试时会尽量避开刚刚接收失败的Broker,而是选择其他Broker上的队列进行发送,从而提高消息发送的成功率。

消息发送端在自动发现主题的路由信息后,RocketMQ默认使用轮询算法进行路由的负载均衡。Rocket在消息发送时支持自定义的队列负载算法,需要特别注意的是,使用自定义的路由负载算法后,RocketMQ的重试机制将失效

消息长度验证

在消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范。具体的规范要求是主题名称、消息体不能为空,消息长度不能等于0且默认不能超过允许发送消息的最大长度4MB。

查找主题路由信息

在消息发送之前,还要获取主题的路由信息,只有获取了这些信息我们才能知道消息具体要发送到哪个Broker节点上。

tryToFindPublishInfo是查找主题的路由信息的方法。如果生产者缓存了topic的路由信息,且该路由信息包含消息队列,则直接返回该路由信息。如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终没有找到路由信息,则抛出异常,表示无法找到主题相关路由信息异常。

第一次发送消息时,本地没有缓存topic的路由信息,查询NameServer尝试获取路由信息,如果路由信息未找到,再次尝试用默认主题去查询。当然在使用默认主题查询之前,需要判断相关配置,如果允许使用默认主题,NameServer将返回路由信息;如果不允许使用默认主题,这抛出无法找到topic路由异常。

具体过程如下

  • 如果允许使用默认主题,则使用默认主题查询,如果查询到路由信息,则将路由信息中读写队列的个数替换为消息生产者默认的队列个数;如果不允许使用默认主题,则使用参数topic查询,如果未查询到路由信息,则返回false,表示路由信息未变化。
  • 如果找到路由信息,则与本地缓存中的路由信息进行对比,判断路由信息是否发生了改变,如果未发生变化,则直接返回false。
  • 更新MQClientInstance Broker地址缓存表。
  • 将topicRouteData中的List转换成topicPublishInfo的List列表,具体实现在topicRouteData2TopicPublishInfo中。然后更新该MQClientInstance管辖的所有消息,发送关于topic的路由信息。 循环遍历路由信息的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData。根据brokerName找到brokerData信息,如果找不到或没有找到主节点,则遍历下一个QueueData。根据写队列个数,topic+序号创建MessageQueue,填充topicPublishInfo的List,完成消息发送的路由查找。

选择消息队列

在找到路由信息后,就需要根据路由信息选择消息队列了,返回的消息队列按照broker序号和队列序号进行排序。那么RocketMQ如何选择消息队列呢?

首先消息发送端采用重试机制,有retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结果执行回调之前进行重试,有retryTimesWhenSendAsyncFailed执行异常重试次数。接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择队列有两种方式

  • sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。 如果sendLatencyFaultEnable=false,则表示不启用Broker故障延迟机制。 在消息发送过程中,可能会多次执行选择消息队列这个方法(selectOneMessageQueue),lastBrokerName属性就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列的个数取模,如果消息发送失败,下次进行消息队列选择时会规避上次MesageQueue所在的Broker,否则有可能再次失败。
  • sendLatencyFaultEnable=true,启用Broker故障延迟机制。 开启Broker故障延迟机制,就是在一定时间内不再向上次发送消息失败的Broker发送任何消息。超过这时间还是会尝试向其发送消息。

需要注意的是,开启不开启sendLatencyFaultEnable机制在消息发送时都能规避故障的Broker,那么这两种机制有何区别呢? 开启所谓的故障延迟机制,即设置sendLatencyFaultEnable为ture,其实是一种较为悲观的做法。当消息发送者遇到一次消息发送失败后,就会悲观的认为Broker不可用,在接下来的一段时间内就不再向其发送任何消息,直接避开该Broker。而不开启延迟规避机制,就只会在本次消息发送的重试过程中规避该Broker,下一次消息发送还是会继续尝试。

消息发送

消息发送的大致流程:

  • 根据MessageQueue获取Broker的网络地址。如果MQClientInstance的brokerAddrTable未缓存该Broker的信息,则从NameServer主动更新topic的路由信息。如果路由更新后还是找不到Broker信息,则抛出异常,提示Broker不存在。
  • 为消息分配全局唯一ID,如果消息体默认超过4KB,则对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息,这设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
  • 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook注册钩子处理类,并且可以注册多个。
  • 构建消息发送请求包。 主要包含如下重要信息:生产者组、主题名称、默认创建主题key、该主题在单个Broker上的默认队列数、队列ID(队列需要)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(Rocket对消息中的标记不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
  • 根据消息发送方式(同步、异步、单向)进行网络传输。
  • 如果注册了消息发送钩子函数,则执行after逻辑。注意,就算消息发送过程中出现异常,该方法也会执行。

同步发送

具体步骤如下:

  • 检查消息发送是否合理。
    - 检查Broker是否有写权限。 - 检查topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅供路由查找。 - 在NameServer端存储主题的配置信息。 - 检查队列,如果队列不合法,则返回错误码。
  • 如果消息重试次数超过允许的最大重试次数,消息将进入死信队列。
  • 调用相应方法进行消息存储。

异步发送

异步发送是指消息生产者调用发送的API后,无需等待消息服务器返回本次消息发送的结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果后回调。异步发送方式相比于同步发送方式,虽然消息发送端的发送性能会显著提高,但是为了降低消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue实现,默认为65535。异步消息发送虽然也可以控制消息的发送重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等情况将不会重试

单向发送

单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送的结果,并且无须提供回调函数,这表示压根不关心本次消息发送是否成功,其实现原理与异步发送相同,只是消息发送客户端在收到响应结果后什么都不做了,并且没有重试机制。

学习更多有关RocketMQ的知识请参见 RocketMQ之简介

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://blog.csdn.net/qq_38571892/article/details/121442805
-- 展开阅读全文 --
Web安全—逻辑越权漏洞(BAC)
« 上一篇 03-13
Redis底层数据结构--简单动态字符串
下一篇 » 04-10

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复