Rocket之消息存储

本文阅读 26 分钟
首页 代码,Java 正文

RocketMQ存储的文件主要包括CommitLog文件、ConsumeQueue文件、Index文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时按顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。因为消息中间件一般是基于消息主题的订阅机制,所以给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息消费队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。Index索引文件的设计理念是为了加速消息的检索性能,根据消息的属性从CommitLog文件中快速索引消息。

RocketMQ存储路径为${ROCKET_HOME}/store,主要存储文件夹如下:

  • commitlog:消息存储目录
  • config:运行期间的一些配置信息,主要包括下列信息。
    - consumerFilter.json:主题消息过滤信息。 - consumerOffset.json:集群消费模式下的消息消费进度。 - delayOffset.json:延时消息队列拉取进度。 - subscriptionGroup.json:消息消费组的配置信息 - topics.json:topic配置属性
  • consumequeue:消息消费队列存储目录。
  • index:消息索引文件存储目录。
  • abort:如果存在abort文件,说明Broker非正常关闭,该文件默认在启动Broker时创建,在正常退出之前删除。
  • checkpoint:检测点文件,存储CommitLog文件最后一次刷盘时间戳、ConsumeQueue最后一次刷盘时间、index文件最后一次刷盘时间戳。

Commitlog文件

RocketMQ在消息写入过程中追求极致的磁盘顺序写,所有主题的消息全部下入一个文件,即CommitLog文件。所有消息按抵达顺序依次追加到CommitLog文件中,消息一旦写入,不支持修改。CommitLog文件特点是每一条消息长度不相同。

CommitLog文件的存储目录默认为${ROCKET_HOME}/store/commitlog,可以通过在broker配置文件中设置storePathRootDir属性改变默认路径。CommitLog文件默认大小为1GB,可通过在broker配置文件中设置mapedFileSizeCommitLog属性改变默认大小。

类似关系型数据库会为每条数据引入一个ID字段,基于文件编程也会为每条消息引入一个身份标志:消息物理偏移量,即消息存储在文件的起始位置。

正是有了物理偏移量的概念,CommitLog文件的命名方式也是极具技巧性,使用存储在该文件的第一条消息在整个CommitLog文件组中的偏移量来命名。这样做的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。

另外,文件名长度为20位,左边补0,剩余为起始偏移量。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

需要注意的是,CommitLog文件的设计理念是追求极致的消息写,但我们知道消息消费模型是基于主题订阅机制的,即一个消费组是消费特定主题的消息。根据主题从CommitLog文件中检索消息,这绝对不是一个好主意,这样只能从文件的第一条消息逐条检索,其性能就不容乐观了,为了解决基于topic的消息检索问题,RocketMQ引入了ConsumeQueue文件。

ConsumeQueue文件

ConsumeQueue文件是消息消费队列文件,是CommitLog文件基于topic的索引文件,消息到达CommitLog文件后,将异步转发到ConsumeQueue文件中,主要用于消费者根据topic消费消息,其组织方式为/topic/queue/file,具体存储路径为¥HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

ConsumeQueue文件的设计也很有特点,每个条目长度固定20字节(8字节CommitLog物理偏移量、4字节消息长度、8字节tag哈希码),单个文件有30万个条目组成,每个ConsumeQueue文件大小为5.72MB。这里不是存储tag的原始字符串,而是存储哈希码,目的是确保每个条目的长度固定,可以使用访问类似数组下标的方式快速定位条目,极大地提高了ConsumeQueue文件的读取性能。单个ConsumeQueue文件可以看做一个ConsumeQueue条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。消息消费者根据topic、消息消费进度(ConsumeQueue逻辑偏移量)、即第几个ConsumeQueue条目,这样的消费进度去访问消息,通过逻辑偏移量logicOffset*20,即可找到该条目的起始偏移量(ConsumeQueue文件中的偏移量),然后读取该偏移量后20个字节即可得到一个条目,无须遍历ConsumeQueue文件。

ConsumeQueue文件的构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务,从而构建ConsumeQueue文件与下文提到额Index文件。

如何通过消息逻辑偏移量查找消息? 答:根据startIndex获取消息消费队列条目。通过startIndex*20得到在ConsumeQueue文件中的物理偏移量,如果该偏移量小于minLogicOffset,则返回null,说明该消息已经被删除,如果大于minLogicOffset,则根据偏移量定位到具体的物理文件。通过将该偏移量与物理文件的大小取模获取在该文件的偏移量,从偏移量开始连续读取20个字节即可。

如何通过消息存储时间查找消息? 答:暂略

RocketMQ与Kafka相比具有一个强大的优势,就是支持按消息属性检索消息,引入ConsumeQueue文件解决了基于topic查找消息的问题,但如果想基于消息的某一个属性进行查找,ConsumeQueue文件就无能为力了。故RocketMQ又引入了Index索引文件,实现基于文件的哈希索引。

Index文件

ConsumeQueue是RocketMQ专门为消息订阅构建的索引文件,目的是提高根据主题与消息队列检索消息的速度。而Index文件基于物理磁盘文件实现哈希索引。Index文件有40字节的文件头、500万个哈希槽、2000万个Index条目组成,每个哈希槽4字节、每个index条目含有20个字节,分别为4字节索引key的哈希码、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(哈希冲突的链表结构)。

哈希冲突链式解决方案的关键实现,哈希槽存储的是落在该哈希槽的哈希码最新的Index索引,新的index条目最后4个字节存储该哈希码上一条条目的index下标。如果哈希槽中存储的值为0或大于当前index文件最大条目数或小于-1,表示该哈希槽当前并没有与之对应的index条目。值得注意的是,Index文件条目中存储的不是消息索引key,而是消息属性key的哈希,在根据key查找时需要根据消息物理偏移量找到消息,进而验证消息key的值,之所以只存储哈希,而不存储具体的key,是为了将index条目设计为定长结构,才能方便地检索与定位条目。

checkpoint文件

checkpoint(检查点)文件的作用是记录CommitLog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4KB,其中只用该文件耳朵前面24字节。

  • physicMsgTimestamp:CommitLog文件刷盘时间点,占用8字节。
  • logicsMsgTimestamp:ConsumeQueue文件刷盘时间点,占用8字节。
  • indexMsgTimestamp:Index文件刷盘时间点,占用8字节。

实时更新ConsumeQueue与Index文件

因为ConsumeQueue文件,Index文件都是基于CommitLog文件构建的,所以当消息生产者提交的消息存储到CommitLog文件中时,ConsumeQueue文件、Index文件需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件的更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue文件、Index文件。

Broker服务器在启动时会启动ReputMessageService线程,并初始化一个非常关键的参数reputFromOffset,该参数的含义是ReputMessageService从哪个物理偏移量开始转发消息给ConsumeQueue和Index文件。如果允许重复转发,将reputFromOffset设置为CommitLog文件的提交指针。如果不允许重复转发,将reputFromOffset设置为CommitLog文件的内存中最大偏移量。

ReputMessageService线程每执行一次任务推送,休息1ms后继续尝试推送消息到ConsumeQueue和Index文件。

根据消息更新ConsumeQueue文件 大致步骤如下:

  • 根据消息主题和队列ID,先获得对应的ConsumeQueue问价,其逻辑比较简单,因为每一个消息主题对应一个ConsumeQueue目录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后的ConsumeQueue文件即可。
  • 依次将消息偏移量、消息长度、tag哈希码写入ByteBuffer,并根据consumeQueueOffset计算ConsumeQueue中的物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘

根据消息更新Index文件 哈希索引文件转发任务实现类为CommitLogDispatchBuildIndex。当然,是否生成Index文件,可以通过messageIndexEnable进行控制,如果设置为true,则调用IndexService#buildIndex构建哈希索引,否则忽略本次转发任务。 大致步骤如下:

  • 获取或创建Index文件并获取所有文件最大的物理偏移量。如果该消息的物理偏移量小于Index文件中的物理偏移量,则说明是重复数据,忽略本次索引构建。
  • 如果消息的唯一键不为空,则添加到哈希索引中,以便加速根据唯一键检索消息。
  • 构建索引键,RocketMq支持为同一个消息建立多个索引,多个索引键用空格分开。

虽然基于磁盘的顺序写消息可以极大提高I/O的写效率,但如果基于文件的存储采用常规的Java文件操作API,例如FileOutputStream等,则性能提升会很有限,故RocketMQ又引入了内存映射,将磁盘文件映射到内存中,以操作内存的方式操作磁盘,将性能又提升了一个档次。

在Java中可通过FileChannel的map方法创建内存映射文件。在linux服务器中由该方法创建的文件使用的是操作系统的页缓存。Linux操作系统中内存使用策略会尽可能地利用机器的物理内存,并常住内存中,即页缓存。在操作系统的内存不够的情况下,采用缓存置换算法,例如LRU将不常用的页缓存回收,即操作系统会自动管理这部分内存。

如果RocketMQ Broker进程异常退出,存储在页缓存中的数据并不会丢失,操作系统会定时将页缓存中的数据持久化到磁盘,实现数据安全可靠。不过如果是机器断电等异常情况,存储在页缓存中的数据也有可能丢失。

有了顺序写和内存映射的加持,RocketMQ的写入性能得到了极大的保证,但凡事都有利弊,引入了内存映射和页缓存机制,消息会先写入页缓存,此时消息并没有真正持久化到磁盘。那么Broker收到客户端消息后,是存储到页缓存中就直接返回成功,还是要持久化到磁盘中返回成功呢?这需要在性能与消息可靠性方面的权衡。为此,RocketMQ提供了两种策略:同步刷盘异步刷盘

RocketMQ的存储和读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中,再根据配置的刷盘策略在不同时间刷盘。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认是异步刷盘。

ConsumeQueue文件、Index文件刷盘原理与CommitLog刷盘机制类似。值得注意的是,Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘。

Broker同步刷盘

同步刷盘是将消息追加到内存后,将同步调用MappedByteBuffer的force()方法。同步刷盘在RocketMQ的实现中称作组提交,即GroupCommitService从队列中拿出待刷盘的请求,然后执行刷盘动作,此时会将write指针与flush指针之间的所有数据刷写到磁盘中。

消费发送线程将消息追加到内存映射文件后,将同步任务GroupCommitRequest提交到GroupCommitService线程,然后调用阻塞等待刷盘结果,超时时间默认为5s。

GroupCommitService线程处理GroupCommitRequest对象后调用wakeupCustomer方法将消费发送线程唤醒,并将刷盘请求告知GroupCommitRequest。

客户端提交同步刷盘任务到GroupCommitService线程,如果该线程处于等待状态则将其唤醒。

为避免同步刷盘消费任务与其他消息生产者提交任务产生锁竞争,GroupCommitService提供读容器写容器,这两个容器每执行完一次任务后交互,继续消费任务。

GroupCommitService组提交线程,每处理一批刷盘请求后,如果后续有待刷盘的请求需要处理,组提交线程会马不停蹄地处理下一批;如果没有待处理的任务,则休息10ms,即每10ms空转一次。 具体步骤如下

  • 执行刷盘操作,即调用MappedByteBuffer#force方法。遍历同步刷盘任务列表,根据加入顺序逐一执行刷盘逻辑。如果已刷盘指针大于、等于提交的刷盘点,表示刷盘成功,每执行一次刷盘操作后,立即调用GroupCommitRequest#wakeupCustomer唤醒消息发送线程并通知刷盘结果。
  • 处理完所有同步刷盘任务后,更新刷盘检查点StoreCheckpoint中的physicMsgTimestamp,但并没有执行检测点的刷盘操作,刷盘检测点的刷盘操作将在刷写消息队列文件时触发

Broker异步刷盘

同步刷盘的优点是保证消息不丢失,即向客户端返回成功就代表这条消息已经持久化到磁盘,但这是以牺牲写入性能为代价的,不过因为RocketMQ的消息是先写入pagecache,所以消息丢失的可能较小,如果能容忍一定概率的消息丢失或者在丢失后能够低成本的快速重推,可以考虑使用异步刷盘策略。

异步刷盘值的是broker将消息存储在pagecache后就立即返回成功,然后开启一个异步线程定时执行FileChannel的force方法,将内存中的数据定时写入磁盘,默认间隔时间为500ms。

开启transientStorePoolEnable机制则启动异步刷盘方式,刷盘实现较同步刷盘有细微差别。如果transientStorePoolEnable为true,RocketMQ会单独申请一个与目标物理文件(CommitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,再经flush操作到磁盘。如果transientStorePoolEnable为false,消息将追加到物理文件直接映射的内存中,然后写入磁盘。

异步刷盘流程

  • 将消息直接追加到ByteBuffer(堆外内存DirectByteBuffer),wrotePosition随着消息的不断追加向后移动。
  • CommitRealTimeService线程默认每200ms将ByteBuffer新追加的数据提交到FileChannel中。
  • FileChannel在通道中追加提交的内容,其wrotePosition指针向前后移动,然后返回。
  • commit操作成功返回,将commitedPosition向前后移动本次提交的内容长度,此时wrotePosition指针依然可以向前推进。
  • FlushRealTimeService线程默认每500ms将FileChannel中新追加的内存(wrotePosition减去上一次写入位置flushedPositiont),通过调用FileChannel#force()方法将数据写入磁盘。

因为RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载comitlog、consumeQueue目录下的所有文件,所以为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,这就需要引入一种机制来删除已过期的文件,RocketMQ顺序写CommitLog文件、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog文件或ConsumeQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法:如果非当前写文件在一定时间间隔内没有再次更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每隔文件的过期时间为72h,通过在broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

RocketMQ设计和实现过期文件删除机制: RocketMQ每隔10s调度一次cleanFilesPeriodically,检测是否需要清除过期文件。执行频率可以通过cleanResourceInterval进行设置,默认10s。

分别清除CommitLog文件和ConsumeQueue文件。CommitLog文件和ConsumeQueue文件公用一套过期文件删除机制。

  • 首先判断文件最新更新时间和当前时间的差值是否超过了设置的文件保留时间,如果超过了该值,则认为文件是过期文件
  • 在清除过期文件时,如果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会阻止此此删除任务,同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时间后,会将引用次数设置为负数,文件将被强制删除。

RocketMQ满足如下任意一种情况将继续执行删除文件的操作

  • 指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在固定时间执行一次删除过期文件操作,默认凌晨4点。
  • 检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作。
  • 预留手工触发机制,可以通过调用excuteDeleteFilesManualy方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发删除的命令。

RocketMQ为了优化同步复制的性能,在RocketMQ4.7.0中正式对原先的同步复制做了重大改造,大大提高了同步复制的性能。

在RocketMQ4.7.0之前版本的同步复制:消息发送线程SendMessageThread在收到客户端请求时会调用SendMessageProcessor中的方法,将消息写入Broker。如果消息复制模式为同步复制,这需要将消息同步复制到从节点,本次消息发送才会返回,即SendMessageThread线程需要在收到从节点的同步结果后才能继续处理下一条消息。

在在RocketMQ4.7.0中又是如何优化的呢?因为同步复制的语义就是将消息同步到从节点,所以这个复制过程没有什么可优化的,但是我们可以减少SendMessageThread线程的等待时间,即在同步复制的过程中,SendMessageThread线程可以继续处理其他消息,只是收到从节点的同步结果后再想客户端返回结果。提高Broker的消息处理能力,重复利用Broker的资源。

大致过程如下

  • 消息首先进入pageCache,然后执行刷盘操作(submitFlushRequest),接着调用submitReplicaRequest方法将消息提交到HaService,进行数据复制,这里使用了ComplateFuture的thenCombine方法,将刷盘、复制当成一个联合任务执行,这里设置消息追加的最终状态
  • 向HaService提交GroupCommitRequest对象后,返回的并不是同步结果,而是一个CompletableFuture对象,该对象的thenApply方法是在上文提到的handlePutMessageResultFuture方法中定义的,而CompletableFuture的complete方法会在消息被复制到从节点后被调用。
  • 在消息成功复制和复制失败后,CompletableFuture的complete方法将被调用,从而CompletableFuture的thenApply方法被触发调用,通过该方法向客户端返回消息发送的最终结果,实现在broker端的异步编程,使同步复制的性能接近异步复制,大大提高消息的复制性能。

ConsumeQueue和Index文件恢复

暂略

学习更多有关RocketMQ的知识请参见 RocketMQ之简介

本文为互联网自动采集或经作者授权后发布,本文观点不代表立场,若侵权下架请联系我们删帖处理!文章出自:https://blog.csdn.net/qq_38571892/article/details/121768944
-- 展开阅读全文 --
Web安全—逻辑越权漏洞(BAC)
« 上一篇 03-13
Redis底层数据结构--简单动态字符串
下一篇 » 04-10

发表评论

成为第一个评论的人

热门文章

标签TAG

最近回复