跳到主要内容

三十二、RocketMQ一个新的消费组初次启动时从何处开始消费呢?


本文目录

  • 1、抛出问题
  • 1.1 环境准备
  • 1.2 消息发送者代码
  • 1.3 消费端验证代码

2、 探究CONSUME_FROM_MAX_OFFSET实现原理;

  • 2.1 CONSUME_FROM_LAST_OFFSET计算逻辑
  • 2.2 CONSUME_FROM_FIRST_OFFSET
  • 2.4 CONSUME_FROM_TIMESTAMP

3、 猜想与验证;

4、 解决方案;

1、抛出问题

一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢?

首先翻阅DefaultMQPushConsumer的API时,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,从字面意思来看是设置消费者从哪里开始消费,正是解开该问题的”钥匙“。ConsumeFromWhere枚举类图如下:
 

  • CONSUME_FROM_MAX_OFFSET
    从消费队列最大的偏移量开始消费。
  • CONSUME_FROM_FIRST_OFFSET
    从消费队列最小偏移量开始消费。
  • CONSUME_FROM_TIMESTAMP
    从指定的时间戳开始消费,默认为消费者启动之前的30分钟处开始消费。可以通过DefaultMQPushConsumer#setConsumeTimestamp。

是不是点小激动,还不快试试。

需求:新的消费组启动时,从队列最后开始消费,即只消费启动后发送到消息服务器后的最新消息。

1.1 环境准备

本示例所用到的Topic路由信息如下:
 

Broker的配置如下(broker.conf)