跳到主要内容

七、RocketMQ源码分析之消息消费重试机制

主要关注业务方在消息消费失败后,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,专业术语:业务方每条消息消费后要告诉 MQ 消费者一个结果(ack,message back),触发 MQ 消息消费重试机制,然后 MQ 消费者需要反馈给 MQ(Broker)。

备注:主要针对的还是非顺序消息,顺序消息在后续专题详细分析。

1、消息消费处理

代码入口:ConsumeMessageConcurrentlyService ConsumeRequest run方法

 

然后进入到结果处理:ConsumeMessageConcurrentlyService processConsumeResult

 

 

如果返回结果是 CONSUME_SUCCESS,此时 ackIndex = msg.size() – 1,再看发送 sendMessageBack 循环的条件,for (int i = ackIndex + 1; i < msg.size() ;;) 从这里可以看出如果消息成功,则无需发送sendMsgBack 给 broker。

如果返回结果是 RECONSUME_LATER, 此时 ackIndex = -1 ,则这批所有的消息都会发送消息给Broker,也就是这一批消息都得重新消费。如果发送 ack 失败,则会延迟5s后重新在消费端重新消费。

消费者向 Broker 发送 ACK 消息,如果发送成功,重试机制由 broker 处理,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认演出5s后在消费者重新消费,其关键总结如下: