RocketMQ源码(五):RocketMQ消息存储流程

发布时间 2023-09-11 08:46:28作者: 无虑的小猪

  在RocketMQ源码(四):RocketMQ生产者发送消息流程中已经对生产者同步发送消息的源码做了分析,下面继续分析,消息发送到Broker做了哪些处理。

一、Broker处理请求消息的入口

  在RocketMQ源码(三):RocketMQ服务端启动流程中提到,在启动BrokerControler流程中,启动NRS(Netty Remoting Server)节点,向netty注册请求消息处理器NettyServerHandler,NettyServerHandler为NettyRemotingServer的内部类,NettyServerHandler详情如下:

 1 /**
 2  * 请求消息处理器
 3  */
 4 @ChannelHandler.Sharable
 5 class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 6 
 7     @Override
 8     protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 9         // broker接收到的消息
10         processMessageReceived(ctx, msg);
11     }
12 }

  broker接收消息处理,会根据请求类型判断是执行请求还是执行响应,NettyRemotingAbstract#processMessageReceived() 核心代码:

 1 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 2     final RemotingCommand cmd = msg;
 3     if (cmd != null) {
 4         switch (cmd.getType()) {
 5             // 请求
 6             case REQUEST_COMMAND:
 7                 processRequestCommand(ctx, cmd);
 8                 break;
 9             // 响应
10             case RESPONSE_COMMAND:
11                 processResponseCommand(ctx, cmd);
12                 break;
13             default:
14                 break;
15         }
16     }
17 }

  NettyRemotingAbstract#processRequestCommand() 代码段;

 

  通过请求中的RequestCode获取缓存表中请求处理器NettyRequestProcessor,执行请求处理器中的processRequest() 方法。

 0

  在RocketMQ中,每个RequestCode都有各自的请求处理器及处理线程池,Broker根据客户端发送的请求码调用注册的处理器,执行不同的逻辑。在同步发送消息MQClientAPIImpl的sendMessage方法中,完成对客户端请求码的赋值操作,对于发送消息,客户端设置的请求码为RequestCode.SEND_MESSAGE。

  0

  在初始化BrokerController时,请求码RequestCode.SEND_MESSAGE对应的处理器为SendMessageProcessor。

  0

  请求处理器中处理接收消息,SendMessageProcessor#processRequest() 核心代码:

 1 @Override
 2 public RemotingCommand processRequest(ChannelHandlerContext ctx,
 3                                       RemotingCommand request) throws RemotingCommandException {
 4     RemotingCommand response = null;
 5     try {
 6         // 1、处理请求;2、获取执行结果
 7         response = asyncProcessRequest(ctx, request).get();
 8     } catch (InterruptedException | ExecutionException e) {
 9         log.error("process SendMessage error, request : " + request.toString(), e);
10     }
11     return response;
12 }

  最终会执行到SendMessageProcessor的asyncProcessRequest方法。

二、消息存储流程图

 

  消息写入核心流程图:

三、消息写入流程分析

  SendMessageProcessor#asyncProcessRequest() 代码段:

  0

在SendMessageProcessor的asyncSendMessage方法中,主要完成以下几件事:

3.1、预处理

  SendMessageProcessor#asyncSendMessage() 代码段

  0

  SendMessageProcessor中的preSend方法

  0

  1、创建response对象;

  2、Topic合法性校验;Topic配置不存在,创建TopicConfig,并设置进缓存表中;

  3、消息队列合法性校验

3.2、构建储存到broker的消息对象

  SendMessageProcessor#asyncSendMessage() 代码段

  

  创建并设置写入到Broker中的消息对象。

3.3、消息写入

  SendMessageProcessor#asyncSendMessage() 核心代码

  0

  DefaultMessageStore#asyncPutMessage() 核心代码

  0

  CommitLog的aysncPutMessage方法时消息写入到broker服务端的核心方法,包含消息写入OS缓存页、刷盘机制、主从同步等消息存储的核心流程。 

  CommitLog#aysncPutMessage() 核心代码:

  1 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
  2     // 设置消息存储时间
  3     msg.setStoreTimestamp(System.currentTimeMillis());
  4     // 加密消息体 (CRC加密方式)
  5     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  6     // 消息写入结果
  7     AppendMessageResult result = null;
  8     // 消息统计信息服务
  9     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
 10     // 获取消息的topic、queueId
 11     String topic = msg.getTopic();
 12     int queueId = msg.getQueueId();
 13     // 获取类型
 14     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 15     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
 16             || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 17         // 步骤一:延迟消息的处理
 18         if (msg.getDelayTimeLevel() > 0) {
 19             // 消息的延迟级别超出RocketMQ内置的最大延迟级别,按内置最大延迟级别处理
 20             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 21                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 22             }
 23             // 获取延迟消息暂存的Topic、queueId
 24             topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
 25             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 26             // 备份目标topic、queueId,消息的原topic、queueId设置到propertiesString属性
 27             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 28             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 29             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 30             // 使用延迟消息主题、消息队列ID替换消息的原topic、queueId
 31             msg.setTopic(topic);
 32             msg.setQueueId(queueId);
 33         }
 34     }
 35 
 36     long elapsedTimeInLock = 0;
 37     MappedFile unlockMappedFile = null;
 38     // 获取可以写入的commitLog文件
 39     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 40 
 41     // 根据存储配置决定使用自旋锁还是排它锁
 42     putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
 43     try {
 44         // 开始加锁时间
 45         long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
 46         this.beginTimeInLock = beginLockTimestamp;
 47         // 设置开始存储时间,为了保证全局有序
 48         msg.setStoreTimestamp(beginLockTimestamp);
 49 
 50         // 步骤二:mappedFile为空或者文件已达最大容量,创建commitLog文件
 51         if (null == mappedFile || mappedFile.isFull()) {
 52             // 获取可用的CommitLog,没有则创建
 53             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
 54         }
 55         // 创建失败,抛出CREATE_MAPEDFILE_FAILED异常
 56         if (null == mappedFile) {
 57             log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 58             beginTimeInLock = 0;
 59             return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
 60         }
 61 
 62         // 步骤三:写入消息,将消息追加到MappedFile
 63         result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 64         switch (result.getStatus()) {
 65             case PUT_OK:
 66                 break;
 67             case END_OF_FILE:
 68                 unlockMappedFile = mappedFile;
 69                 // commitlog中文件存储空间不足无法写入消息,创建新文件,重写消息
 70                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 71                 if (null == mappedFile) {
 72                     // XXX: warn and notify me
 73                     log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 74                     beginTimeInLock = 0;
 75                     return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
 76                 }
 77                 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 78                 break;
 79             case MESSAGE_SIZE_EXCEEDED:
 80             case PROPERTIES_SIZE_EXCEEDED:
 81                 beginTimeInLock = 0;
 82                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
 83             case UNKNOWN_ERROR:
 84                 beginTimeInLock = 0;
 85                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 86             default:
 87                 beginTimeInLock = 0;
 88                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 89         }
 90 
 91         elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
 92         beginTimeInLock = 0;
 93     } finally {
 94         // 释放锁
 95         putMessageLock.unlock();
 96     }
 97 
 98     if (elapsedTimeInLock > 500) {
 99         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
100     }
101     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
102         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
103     }
104     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
105     // Statistics
106     // 统计服务
107     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
108     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
109 
110     // 步骤四:刷盘入口,提交刷盘请求, 同步刷盘会阻塞等待刷盘结果
111     CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
112     // 步骤五:提交主从复制请求,同步slave请求
113     CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
114     // 并行执行刷盘和主从同步,并处理执行结果
115     return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
116         if (flushStatus != PutMessageStatus.PUT_OK) {
117             putMessageResult.setPutMessageStatus(flushStatus);
118         }
119         if (replicaStatus != PutMessageStatus.PUT_OK) {
120             putMessageResult.setPutMessageStatus(replicaStatus);
121             if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
122                 log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
123                         msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
124             }
125         }
126         return putMessageResult;
127     });
128 }

  消息的写入涉及零拷贝技术,表现在是MappedFileQueue对映射文件MappedFile的操作、MappedFile映射文件的写入等处理。

3.3.1、延迟消息的处理

 

  broker处理延迟消息,会替换消息原来的Topic与QueueId,将原有的主题与消息队列信息设置到消息的属性信息中。在延迟消息还未到投递时间,优先将延迟消息投递到主题SCHEDULE_TOPIC_XXXX中,消息队列ID根据延迟时间级别获取,此时消费者无法消费延迟消息。

 1 // 步骤一:延迟消息的处理
 2 if (msg.getDelayTimeLevel() > 0) {
 3     // 消息的延迟级别超出RocketMQ内置的最大延迟级别,按内置最大延迟级别处理
 4     if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 5         msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 6     }
 7 
 8     // 获取延迟消息暂存的Topic、queueId
 9     topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
10     queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
11 
12     // 备份消息原来的topic、queueId,消息的原topic、queueId设置到propertiesString属性
13     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
14     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
15     msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
16 
17     // 使用延迟消息主题、消息队列ID替换消息的原topic、queueId
18     msg.setTopic(topic);
19     msg.setQueueId(queueId);
20 }

3.3.2、获取可用的MappedFile对象

  获取可用的CommitLog文件,若首次写入消息,此时未生成commitLog文件,创建CommitLog映射文件对象MappedFile;最新的CommitLog文件写入空间不足,创建创建CommitLog映射文件对象MappedFile。CommitLog文件名称为文件偏移量信息。映射文件创建失败,返回PutMessageStatus.CREATE_MAPEDFILE_FAILED。

 1 // 获取可以写入的commitLog文件
 2 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 3 // ...
 4 // 步骤二:mappedFile为空或者文件已达最大容量,创建commitLog文件
 5 if (null == mappedFile || mappedFile.isFull()) {
 6     // 获取可用的CommitLog,没有则创建
 7     mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 8 }
 9 // 创建失败,抛出CREATE_MAPEDFILE_FAILED异常
10 if (null == mappedFile) {
11     log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
12     beginTimeInLock = 0;
13     return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
14 }

  获取commitLog文件夹下所有的映射文件对象MappedFile,没有文件,即首次写入消息,创建偏移量为0的映射文件MappedFile,并设置首文件标识,添加到MappedFileQueue中的mappedFiles集合中;有文件,创建的偏移量为文件偏移量与要创建的文件大小之和的映射文件MappedFile。

  MappedFileQueue#getLastMappedFile() 核心代码:
 1 /**
 2  * 创建起始偏移量为startOffset的MappedFile文件
 3  * @param startOffset
 4  * @return
 5  */
 6 public MappedFile getLastMappedFile(final long startOffset) {
 7     return getLastMappedFile(startOffset, true);
 8 }
 9 
10 /**
11  * 获取可用的映射文件MappedFile
12  */
13 public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
14     long createOffset = -1;
15     // 获取最新的MappedFile文件
16     MappedFile mappedFileLast = getLastMappedFile();
17     // commitlog文件夹下无可写入的文件,设置写入偏移量
18     if (mappedFileLast == null) {
19         createOffset = startOffset - (startOffset % this.mappedFileSize);
20     }
21     // commitlog文件夹下有可写入的MappedFile文件,设置写入偏移量
22     if (mappedFileLast != null && mappedFileLast.isFull()) {
23         createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
24     }
25 
26     // 创建MappedFile文件
27     if (createOffset != -1 && needCreate) {
28         // CommitLog文件的目标存储路径
29         String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
30         String nextNextFilePath = this.storePath + File.separator
31             + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
32         MappedFile mappedFile = null;
33 
34         // 创建MappedFile文件对象
35         if (this.allocateMappedFileService != null) {
36             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
37                 nextNextFilePath, this.mappedFileSize);
38         } else {
39             try {
40                 mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
41             } catch (IOException e) {
42                 log.error("create mappedFile exception", e);
43             }
44         }
45 
46         if (mappedFile != null) {
47             // 为第一个CommitLog文件设置标识
48             if (this.mappedFiles.isEmpty()) {
49                 mappedFile.setFirstCreateInQueue(true);
50             }
51             // 映射文件加入映射文件集mappedFiles
52             this.mappedFiles.add(mappedFile);
53         }
54 
55         return mappedFile;
56     }
57 
58     return mappedFileLast;
59 }

  MappedFile映射文件,可通过线程服务AllocateMappedFileService创建,本质上也是通过MappedFile构造方法初始化MappedFile,AllocateMappedFileService内部封装了堆外内存池是否开启判断,通过此开关调用MappedFile不同的构造函数,同时判断是否对映射文件MappedFile进行预热处理。

预热处理开关warmMapedFileEnable,默认为false,方开关打开时,会提前为CommitLog文件填充1G的0值作为占位符,提前分配物理内存,防止消息写入时发生缺页异常。

3.3.3、消息写入内存

  将消息写入OS缓存页,写入消息大于commitLog文件可写入空间,返回状态END_OF_FILE,创建新的MappedFile文件,重写消息。

 1 // 步骤三:写入消息,将消息追加到MappedFile
 2 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 3 switch (result.getStatus()) {
 4     case PUT_OK:
 5         break;
 6     case END_OF_FILE:
 7         unlockMappedFile = mappedFile;
 8         // commitlog中文件存储空间不足无法写入消息,创建新的MappedFile文件,重写消息
 9         mappedFile = this.mappedFileQueue.getLastMappedFile(0);
10         if (null == mappedFile) {
11             log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
12             beginTimeInLock = 0;
13             return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
14         }
15         result = mappedFile.appendMessage(msg, this.appendMessageCallback);
16         break;
17     case MESSAGE_SIZE_EXCEEDED:
18     case PROPERTIES_SIZE_EXCEEDED:
19         beginTimeInLock = 0;
20         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
21     case UNKNOWN_ERROR:
22         beginTimeInLock = 0;
23         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
24     default:
25         beginTimeInLock = 0;
26         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
27 }

  获取可用的CommitLog文件,若没有可用文件,则创建CommitLog文件,为消息写入做准备;消息写入到内存,若写入的消息大于CommitLog文件的可写空间,返回END_OF_FILE,创建新的CommitLog文件重写消息。

  MappedFile#appendMessagesInner() 核心代码:

 

   根据是否开启堆外内存池,使用不同的缓冲区。堆外内存池开启,先将消息写入writeBuffer缓冲区中,再将消息写到FileChannel通道,等待由刷盘线程将FileChannel中的数据刷写到磁盘;堆外内存池关闭,先将消息写入mappedByteBuffer缓冲区中,等待刷盘线程将内存中的数据刷写到磁盘中。

  消息写入到内存最终执行DefaultAppendMessageCallback的doAppend方法,主要完成以下几件事:

1、获取队列偏移量

  DefaultAppendMessageCallback#doAppend() 核心代码段

  

2、序列化消息

  DefaultAppendMessageCallback#doAppend() 核心代码段:

 

3、判断CommitLog文件是否有足够的写入空间

 

4、消息写到内存,等待刷盘线程持久化到磁盘

  DefaultAppendMessageCallback#doAppend() 核心代码段

 

 

5、构建消息写入结果对象

 

  在将消息写入到内存后,不会阻塞等待,直接返回写入OS缓存页的结果。

3.3.4、刷盘机制

  RocketMQ的消息写入不会直接将消息写入到磁盘,通过buffer缓冲区,先将消息写到内存,通过刷盘机制将内存中的页数据刷写到磁盘中。    MappedFileQueue#getLastMappedFile() 核心代码段:

// 步骤四:刷盘入口,提交刷盘请求, 同步刷盘会阻塞等待刷盘结果
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);

1、刷盘服务

  在CommitLog构造方法中,初始化了刷盘服务线程,GroupCommitService、FlushRealTimeService、CommitRealTimeService。刷盘策略为同步刷盘,创建同步刷盘服务GroupCommitService;刷盘策略为异步刷盘,创建异步刷盘服务FlushRealTimeService。若开启了堆外内存池,利用CommitRealTimeService服务线程,将writeBuffer内存页中的数据刷新到FileChannel,同时唤醒刷盘线程。

  CommitLog构造函数详情:

 1 public CommitLog(final DefaultMessageStore defaultMessageStore) {
 2     // 创建MappedFileQueue,用于管理MappedFile映射文件
 3     this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
 4         defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
 5     this.defaultMessageStore = defaultMessageStore;
 6 
 7     // 根据不同的刷盘策略,使用不同的刷盘服务
 8     if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 9         // 同步刷盘服务
10         this.flushCommitLogService = new GroupCommitService();
11     } else {
12         // 异步刷盘服务,FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘
13         this.flushCommitLogService = new FlushRealTimeService();
14     }
15     // 堆外内存缓冲,线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中,异步刷盘时使用
16     this.commitLogService = new CommitRealTimeService();
17 
18     // appendMessageCallback将消息写入到内存,通过刷盘将内存中的数据持久化到CommitLog文件
19     this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
20     batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
21         @Override
22         protected MessageExtBatchEncoder initialValue() {
23             return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
24         }
25     };
26     // RocketMQ中,所有的消息都存储在CommitLog文件中,并发写入,存在线程安全问题。
27     // 根据消息存储配置useReentrantLockWhenPutMessage,判断是使用排它锁还是自旋锁
28     // useReentrantLockWhenPutMessage参数默认是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用排它锁
29     this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
30 
31 }

2、刷盘请求

  提交刷盘请求,CommitLog#submitFlushRequest() 核心代码:

 1 /**
 2  * 提交刷盘请求,若commitlog设置为同步刷盘,会阻塞等待异步线程刷盘结果
 3  */
 4 public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
 5     // 同步刷盘 唤醒同步刷盘服务(GroupCommitService),根据producer配置判断是否阻塞等待刷盘结果
 6     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 7         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 8         // 同步刷盘策略 并且 producer设置了msg落盘后再返回,阻塞等待刷盘结果再返回
 9         if (messageExt.isWaitStoreMsgOK()) {
10             GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
11                     this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
12             service.putRequest(request);
13             return request.future();
14         // 同步刷盘策略,若producer未设置msg落盘后再返回,直接唤醒刷盘线程并返回
15         } else {
16             service.wakeup();
17             return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
18         }
19     }
20     // 异步刷盘  根据是否使用堆外内存池,唤醒不同的线程。开启 => commitLogService;未开启 => flushRealTimeService
21     else {
22         // 唤醒后不用等待结果,直接返回
23         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
24             flushCommitLogService.wakeup();
25         } else  {
26             commitLogService.wakeup();
27         }
28         return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
29     }
30 }

  RocketMQ中刷盘策略分为同步刷盘和异步刷盘。同步刷盘时,当Producer设置等消息落盘后再返回,唤醒刷盘服务GroupCommitService,消息存储流程阻塞等待刷盘结果;Producer未设置落盘后再返回,直接唤醒刷盘服务GroupCommitService并返回刷盘状态PutMessageStatus.PUT_OK。异步刷盘时,当开启了堆外内存池,唤醒刷盘服务flushRealTimeService并返回刷盘状态PutMessageStatus.PUT_OK;堆外内存池未开启,唤醒commitLogService服务,优先将writeBuffer内存数据刷新到file channel, 再唤醒刷盘服务flushRealTimeService并返回刷盘状态PutMessageStatus.PUT_OK。

  同步刷盘服务处理,GroupCommitService#run() 核心代码段:

  

   异步刷盘服务处理,FlushRealTimeService#run() 核心代码段: