96SEO 2026-02-20 09:14 0
ReadSocketService线程读取从Broker复制进度

为了提高消息消费的高可用性避免Broker发生单点故障引起存储在Broker上的消息无法及时消费
RocketMQ引入Broker主备机制即消息消费到达主服务器后需要将消息同步到消息从服务器如果主服务器Broker宕机后消息消费者可以从从服务器拉取消息。
下图所示是Broker的HA交互机制流程图及类图。
主从同步模式分为同步、异步。
step1主服务器启动并在特定端口上监听从服务器的连接step2从服务器主动连接主服务器主服务器接收客户端的连接并建立相关TCP连接step3从服务器主动向主服务器发送待拉取消息偏移量主服务器解析请求并返回消息给从服务器step4从服务器保存消息并继续发送新的消息同步请求。
1.
参考rocketmq-distribution项目的conf目录下有2主2从异步HA配置2m-2s-async、2主2从同步HA配置2m-2s-sync。
以下1主1从异步HA配置实例如下。
truenamesrvAddr192.168.1.55:9876;172.17.0.3:9876
0时则为从0时则为主brokerRole角色为SLAVE从刷盘类型为ASYNC_FLUSH异步刷盘。
truenamesrvAddr192.168.1.55:9876;172.17.0.3:9876
org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法如下图所示是其调用链及相关HA部分代码。
BrokerController#startBasicService()*
(!messageStoreConfig.isEnableDLegerCommitLog()
!this.messageStoreConfig.isDuplicationEnable())
{this.haService.init(this);}......if
{this.haService.start();}......
org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法如下代码所示。
注意从Broker的broker.conf配置的brokerRole为SLAVE才能创建HAClient从Broker注册到主Broker。
defaultMessageStore;this.acceptSocketService
DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()
DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService
HAConnectionStateNotificationService(this,
org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。
注意
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService主Broker接收从Broker的连接事件org.apache.rocketmq.store.ha.GroupTransferService负责主Broker向从Broker发送同步数据org.apache.rocketmq.store.ha.HAClient从Broker向主Broker发送连接事件Broker启动时根据配置brokerRole配置ASYNC_MASTER、SYNC_MASTER、SLAVE判定Broker是主还是从。
若是Slave角色在broker配置文件中获取haMasterAddress并更新至masterAddress但是haMasterAddress配置为空则启动成功但是不会执行HA。
AcceptSocketService}接收从Broker的注册事件方法是{link
AcceptSocketService#beginAccept()}*
AcceptSocketService}线程监听从Broker发送心跳*
GroupTransferService}线程启动主Broker向从Broker发送数据*
主接收从Broker的连接事件SelectionKey.OP_ACCEPT连接事件this.acceptSocketService.beginAccept();//
启动主Broker线程this.acceptSocketService.start();//
主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();//
org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类是个线程。
其主要属性如下代码所示。
反馈HA的复制进度从Broker的Commitlog文件的最大偏移量
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务其调用链和代码如下。
DefaultHAClient#connectMaster()从Broker连接到主Broker。
DefaultHAClient#transferFromMaster()向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中。
DefaultHAClient#connectMaster()}从Broker连接到主Broker*
DefaultHAClient#transferFromMaster()}向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中*/
{log.info(this.getServiceName()
started);this.flowMonitor.start();while
this.masterHaAddress.get());this.waitForRunning(1000
向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中if
没有可拉取消息时设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000
this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval())
interval);this.closeMaster();log.warn(AutoRecoverHAClient,
{log.warn(this.getServiceName()
e);this.closeMasterAndWait();}}log.info(this.getServiceName()
注意一旦HAClient线程启动后在状态READY、TRANSFER来回变化READY状态下发送从Broker连接事件到主Broker开启Socket连接TRANSFER状态下主从发送相关数据信息如从向主发送HA复制进度currentReportedOffset即从Broker的Commitlog文件的最大偏移量主向从发送同步消息。
org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法其代码如下。
Broker启动时若是Slave角色从broker配置文件中获取haMasterAddress并更新至masterAddress*
若是Slave角色但是haMasterAddress配置为空则启动成功但是不会执行HA*
根据地址创建SocketAddress对象SocketAddress
RemotingUtil.string2SocketAddress(addr);//
获取SocketChannelthis.socketChannel
RemotingUtil.connect(socketAddress);if
SocketChannel注册OP_READ网络读事件this.socketChannel.register(this.selector,
SelectionKey.OP_READ);log.info(HAClient
addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}//
获取Commitlog最大偏移量HA同步进度this.currentReportedOffset
this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp
System.currentTimeMillis();}return
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类是一个线程。
其主要属性如下代码所示。
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。
ServerSocketChannel.open();this.selector
RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);
TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen);
messageStoreConfig.getHaListenPort())
{messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info(OS
messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false);
非阻塞模式this.serverSocketChannel.register(this.selector,
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动负责M-S的数据同步逻辑。
{log.info(this.getServiceName()
选择器每1s处理一次连接就绪事件this.selector.select(1000);SetSelectionKey
this.selector.selectedKeys();if
若是连接事件时创建SocketChannelSocketChannel
{DefaultHAService.log.info(HAService
sc.socket().getRemoteSocketAddress());try
每一个连接创建HAConnection并启动负责M-S的数据同步逻辑HAConnection
createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);}
k.readyOps());}}selected.clear();}}
{log.error(this.getServiceName()
e);}}log.info(this.getServiceName()
org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时启动读、写线程服务。
其关键属性如下代码所示。
writeSocketService主Broker向从Broker写数据服务类private
readSocketService主Broker读取从Broker数据服务类
org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法代码如下所示该方法有两大功能
从Broker向主Broker反馈HA复制进度即currentReportedOffset从Broker的Commitlog文件的最大偏移量方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
从Broker接收主BrokerHA同步消息内容方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
向主反馈HA复制进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中*
DefaultHAClient#reportSlaveMaxOffset(long)}向主反馈HA复制进度即currentReportedOffset从Broker的Commitlog文件的最大偏移量*
DefaultHAClient#processReadEvent()}处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中*/
this.reportSlaveMaxOffset(this.currentReportedOffset);if
false;}}this.selector.select(1000);//
org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度代码如下。
对于Master端本次请求拉取的偏移量也可以理解为同步ACK*
Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel基于NIO模式的写范例*
偏移量写入ByteBufferthis.reportOffset.position(0);
写缓存位置this.reportOffset.limit(8);
写缓存字节长度this.reportOffset.putLong(maxOffset);
读模式this.reportOffset.position(0);this.reportOffset.limit(8);//
循环并判定ByteBuffer是否完全写入SocketChannelfor
this.reportOffset.hasRemaining();
{this.socketChannel.write(this.reportOffset);}
{log.error(this.getServiceName()
this.defaultMessageStore.getSystemClock().now();return
!this.reportOffset.hasRemaining();
ReadSocketService线程读取从Broker复制进度
org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求获取内容是HA复制进度。
其代码如下看出主Broker获取从Broker的HA复制进度后赋值给DefaultHAConnection#slaveRequestOffset属性后立即唤醒GroupTransferService线程执行消息同步。
step1判定byteBufferRead是否有剩余空间没有则{link
step2用剩余空间从SocketChannel读数据到缓存中读取到的内容是从Broker拉取消息的偏移量*
0;/*byteBufferRead没有剩余空间时则position
(!this.byteBufferRead.hasRemaining())
ByteBuffer重置处理this.processPosition
ByteBuffer有剩余空间循环至byteBufferRead没有剩余空间while
(this.byteBufferRead.hasRemaining())
this.socketChannel.read(this.byteBufferRead);if
DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();//
((this.byteBufferRead.position()
(this.byteBufferRead.position()
this.byteBufferRead.getLong(pos
从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset
(DefaultHAConnection.this.slaveRequestOffset
{DefaultHAConnection.this.slaveRequestOffset
DefaultHAConnection.this.clientAddress
通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}}
DefaultHAConnection.this.clientAddress
ReadSocketService线程读取从Broker发送的HA复制进度由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。
其如下代码所示WriteSocketService#run方法是同步消息核心逻辑。
step1slaveRequestOffset为-1时说明主Broker没有收到从Broker的拉取请求忽略本次写事件*
step2nextTransferFromWhere为-1时说明初次传输计算nextTransferFromWhere待传输offset*
step4根据从Broker待拉取消息offset查找之后的所有可读消息*
{log.info(this.getServiceName()
slaveRequestOffset为-1时说明主Broker没有收到从Broker的拉取请求忽略本次写事件if
DefaultHAConnection.this.slaveRequestOffset)
{Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时说明初次传输初次传输时计算nextTransferFromWhere待传输offset*///
DefaultHAConnection.this.slaveRequestOffset)
DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset
DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if
DefaultHAConnection.this.slaveRequestOffset;}log.info(master
DefaultHAConnection.this.clientAddress
DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完当前时间
size消息长度0避免长连接由空闲而关闭上次传输没有完成继续传输忽略本次写事件*///
DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval())
size消息长度0避免长连接由空闲而关闭this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();//
(!this.lastWriteOver)continue;}}//
lastWriteOver为false则上次传输没有完成则继续传输else
继续传输上次拉取请求还未完成则忽略本次写事件this.lastWriteOver
(!this.lastWriteOver)continue;}//
根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult
DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if
DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize())
DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int
flowMonitor.canTransferMaxByteNum();if
flowMonitor.maxTransferByteInSecond()
flowMonitor.getTransferredByteInSecond()
System.currentTimeMillis();}size
this.nextTransferFromWhere;this.nextTransferFromWhere
下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult
传输size大小的消息内容不一定是完整的消息this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();//
{DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}}
{DefaultHAConnection.log.error(this.getServiceName()
e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if
{this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey
this.socketChannel.keyFor(this.selector);if
{this.selector.close();this.socketChannel.close();}
{DefaultHAConnection.log.error(,
e);}DefaultHAConnection.log.info(this.getServiceName()
org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后通知阻塞的消息发送者线程。
同步主从Broker模式即消息刷磁盘后继续等待新消息被传输到从Broker等待传输结果并通知消息发送线程。
org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息参考《RocketMQ5.0.0消息存储二_消息存储流程》该方法会根据同步或异步模式默认来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法完成刷盘和HA复制方法调用链如下。
生产者把消息发送到Broker完成commit操作消息提交到文件内存映射中
随后根据同步/异步模式完成刷盘和HA。
HA操作时把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。
以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。
CompletableFuturePutMessageStatus
CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService
this.defaultMessageStore.getHaService();long
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(),
needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return
HAService启动时会启动GroupTransferService线程。
GroupTransferService#run执行任务如下代码所示。
{log.info(this.getServiceName()
间隔10sthis.waitForRunning(10);//
主从同步复制结束后通知阻塞的消息发送者线程this.doWaitTransfer();}
{log.warn(this.getServiceName()
e);}}log.info(this.getServiceName()
其中执行waitForRunning()方法时会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法使得requestsWrite与requestsRead两个集合对调
ListCommitLog.GroupCommitRequest
requestsWrite主Broker待需要HA的消息集合private
ListCommitLog.GroupCommitRequest
{ListCommitLog.GroupCommitRequest
this.requestsWrite;this.requestsWrite
this.requestsRead;this.requestsRead
org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后通知阻塞的消息发送者线程如下代码所示。
step1遍历消息提交请求内存提交到Commitlog文件的内存映射*
commit请求即内存提交到Commitlog文件的内存映射for
MixAll.ALL_ACK_IN_SYNC_STATE_SET;for
等待1sthis.notifyTransferObject.waitForRunning(1000);}if
消息生产者发送消息后返回下一条消息的偏移量transferOK
haService.getPush2SlaveMaxOffset().get()
req.getNextOffset();continue;}if
autoSwitchHAService.getSyncStateSet();if
(syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress())
autoSwitchHAConnection.getSlaveAckOffset()
从完成复制后唤醒消息发送者线程req.wakeupCustomer(transferOK
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}}
RocketMQ读写分离与其他中间件的实现方式完全不同RocketMQ是消费者首先向主服务器发起拉取消息请求然后主服务器返回一批消息然后会根据主服务器负载压力与主从同步情况向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。
Broker组织中根据brokerName获取一组Broker服务器M-S它们的brokerName相同但brokerId不同主服务器的brokerId为0从服务器的brokerId大于0。
其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
详细消费拉取消息时实现读写分离机制见后续章节参考《RocketMQ5.0.0消息消费一
【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理
RocketMQ5.0.0消息存储二_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq
消息写入流程RocketMQ5.0.0消息存储三_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储四_刷盘机制_爱我所爱0505的博客-CSDN博客
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback