跳到主要内容

十五、RocketMQ源码分析消息PULL-长轮询模式


本节目录

1、 长轮询、短轮询概述;
2、 RocketMQ拉轮询拉取任务创建;
3、 源码分析PullRequestHoldService线程;

  • 3.1 PullRequestHoldService#suspendPullRequest
  • 3.2 run方法详解

4、源码分析DefaultMessageStore#ReputMessageService

  • 4.1 run方法
  • 4.2 doReput

1、长轮询、短轮询概述

消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。

短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。

长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。

  • PullRequestHoldService:每隔5s重试一次。
  • DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。

2、RocketMQ拉轮询拉取任务创建

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest

首先看一下processRequest方法参数:

private RemotingCommand processRequest(
final Channel channel,
RemotingCommand request,
boolean brokerAllowSuspend)