在《Rocketmq学习3——消息发送原理源码浅析》中,我们学习了消息发送的要点:
这一篇我们将学习,消息是如何持久化在broker上的 。
消息存储的流程如下:
broker在启动的时候,会启动BrokerController,BrokerController会触发remotingServer的启动。remotingServer基于netty实现,其中关联了RequestCode(rocketmq协议中使用一个int表明请求类型)和对应的请求处理的processor.
其中SEND_MESSAGE对应的processor——SendMessageProcessor.
在broker启动时,会触发基于netty的服务端启动,其中注册的NettyServerHandler实现了ChannelInboundHandler,在数据客户端数据到达的时候会先经由解码器ByteToMessageDecoder(rocketmq根据自己的协议实现了解码器——NettyDecoder),解码后将调用到如下的NettyServerHandler! 。
其中会根据请求类型,获取到对应的Processor,消息发送一般最后由SendMessageProcessor处理 。
SendMessageProcessor接收到请求的时候,不是立马在当前线程进行处理,而是将封装成一个任务,提交到业务线程池.
在提交之前,还是会进行当前broker是否关闭中,是否拒绝请求的判断.
如下是处理请求的大致流程 。
可看到绿色部分才是真正处理请求的部分,处理后将响应写到netty的channel中,实习响应! 。
rocketmq留了一堆扩展的钩子,最终在sendMessage方法中进行一系列的校验,包装消息为MessageExtBrokerInner,然后进行消息存储流程,源码如下 。
消息存储最后交给MessageStore,调用asyncPutMessage进行异步存储消息,也就是说业务处理线程并没有一直阻塞到消息存储完毕,而是提交后就释放了 。
看到这里你可能会疑问,那么同步消息发送者岂不是收不到响应,同步消息消费者还会block住么?
还是会的,因为 。