gpt4 book ai didi

Rocketmq学习4——Broker消息持久化原理源码浅析

转载 作者:来者不拒 更新时间:2024-01-31 09:58:47 25 4
gpt4 key购买 nike

一丶前言

在《Rocketmq学习3——消息发送原理源码浅析》中,我们学习了消息发送的要点:

  • 本地缓存+rpc 请求namesever + 定时刷新,topic路由信息
  • 负载均衡的选择一个Broker进行发送,还支持【故障转移(即支持规避短时间内发送失败的broker)】
  • 基于netty实现的rpc进行消息发送

这一篇我们将学习,消息是如何持久化在broker上的 。

二丶概述

消息存储的流程如下:

  1. 发送消息: 生产者(Producer)发送消息到 Broker。
  2. 消息存储:Broker 接收到消息后,将消息存储在消息存储文件中,通常是 CommitLog 文件。 RocketMQ 使用了内存映射文件(MappedByteBuffer)来提高文件的读写速度,它可以将文件直接映射到虚拟内存,减少了文件 I/O 操作。
  3. 写入磁盘:RocketMQ 使用了顺序写的方式将消息写入到 CommitLog,这是因为顺序写磁盘的速度远快于随机写。
  4. 索引文件更新:为了提高查询效率,消息会被索引,索引信息存储在 ConsumerQueue 和 IndexFile 中。ConsumerQueue 存储了消息在 CommitLog 中的偏移量,而 IndexFile 存储了关键字到消息偏移量的映射。这一步和broker处理消息发送请求是异步的,由后台线程定时处理。
  5. 数据刷盘:RocketMQ 提供两种消息刷盘方式:
    1. 同步刷盘和异步刷盘。同步刷盘会在消息确实写入磁盘后再向生产者确认消息发送成功,
    2. 异步刷盘则在写入操作系统 PageCache 后就确认,依靠操作系统异步将数据刷写到磁盘。
  6. HA 机制:为了保证数据的高可用性,RocketMQ 还提供了主从同步机制,从服务器可以从主服务器上复制数据,确保在主服务器宕机时,从服务器可以接管消息服务。

三丶broker是接收消息发送请求

broker在启动的时候,会启动BrokerController,BrokerController会触发remotingServer的启动。remotingServer基于netty实现,其中关联了RequestCode(rocketmq协议中使用一个int表明请求类型)和对应的请求处理的processor.

image-20240127194137820

其中SEND_MESSAGE对应的processor——SendMessageProcessor.

在broker启动时,会触发基于netty的服务端启动,其中注册的NettyServerHandler实现了ChannelInboundHandler,在数据客户端数据到达的时候会先经由解码器ByteToMessageDecoder(rocketmq根据自己的协议实现了解码器——NettyDecoder),解码后将调用到如下的NettyServerHandler! 。

image-20240127194340690

其中会根据请求类型,获取到对应的Processor,消息发送一般最后由SendMessageProcessor处理 。

四丶rocketmq基于netty实现的远程服务处理请求的流程

image-20240127195406001

SendMessageProcessor接收到请求的时候,不是立马在当前线程进行处理,而是将封装成一个任务,提交到业务线程池.

在提交之前,还是会进行当前broker是否关闭中,是否拒绝请求的判断.

如下是处理请求的大致流程 。

image-20240127195943463

可看到绿色部分才是真正处理请求的部分,处理后将响应写到netty的channel中,实习响应! 。

五丶SendMessageProcessor 处理请求大致流程

image-20240127200425984

rocketmq留了一堆扩展的钩子,最终在sendMessage方法中进行一系列的校验,包装消息为MessageExtBrokerInner,然后进行消息存储流程,源码如下 。

image-20240127200917150

消息存储最后交给MessageStore,调用asyncPutMessage进行异步存储消息,也就是说业务处理线程并没有一直阻塞到消息存储完毕,而是提交后就释放了 。

看到这里你可能会疑问,那么同步消息发送者岂不是收不到响应,同步消息消费者还会block住么?

还是会的,因为 。