集群消费下,重试机制的本质是 RocketMQ 的延迟消息功能。
消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。
Broker 端会为每个 topic 创建一个重试队列 ,队列名称是:%RETRY% + 消费者组名 ,达到重试时间后将消息投递到重试队列中进行消费重试(消费者组会自动订阅重试 Topic)。最多重试消费 16 次,重试的时间间隔逐渐变长,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列。
|
第几次重试 |
与上次重试的间隔时间 |
第几次重试 |
与上次重试的间隔时间 |
|
1 |
10 秒 |
9 |
7 分钟 |
|
2 |
30 秒 |
10 |
8 分钟 |
|
3 |
1 分钟 |
11 |
9 分钟 |
|
4 |
2 分钟 |
12 |
10 分钟 |
|
5 |
3 分钟 |
13 |
20 分钟 |
|
6 |
4 分钟 |
14 |
30 分钟 |
|
7 |
5 分钟 |
15 |
1 小时 |
|
8 |
6 分钟 |
16 |
2 小时 |

开源 RocketMQ 4.X 支持延迟消息,默认支持18 个 level 的延迟消息,这是通过 broker 端的 messageDelayLevel 配置项确定的,如下:

Broker 在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟 level 的个数,创建对应数量的队列,也就是说18个 level 对应了18个队列。
我们先梳理下延迟消息的实现机制。
1、生产者发送延迟消息
brMessage msg = new Message();brmsg.setTopic("TopicA");brmsg.setTags("Tag");brmsg.setBody("this is a delay message".getBytes());br//设置延迟level为5,对应延迟1分钟brmsg.setDelayTimeLevel(5);brproducer.send(msg);
2、Broker端存储延迟消息
延迟消息在 RocketMQ Broker 端的流转如下图所示:

第一步:修改消息 Topic 名称和队列信息
Broker 端接收到生产者的写入消息请求后,第一都会将消息写到 commitlog 中。如果是正常非延迟消息,MessageStore 会根据消息中的 Topic 信息和队列信息,将其转发到目标 Topic 的指定队列 consumequeue 中。
但由于消息一旦存储到 consumequeue 中,消费者就能消费到,而延迟消息不能被立即消费,所以 RocketMQ 将 Topic 的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
同时,还会将消息原来要发送到的目标 Topic 和队列信息存储到消息的属性中。

第二步:构建 consumequeue 文件时,计算并存储投递时间


上图是 consumequeue 文件一条消息的格式,最后 8 个字节存储 Tag 的哈希值,此时存储消息的投递时间。
第三步:定时调度服务启动
ScheduleMessageService 类是一个定时调度服务,读取 SCHEDULE_TOPIC_XXXX 队列的消息,并将消息投递到目标 Topic 中。
定时调度服务启动时,创建一个定时调度线程池 ,并根据延迟级别的个数,启动对应数量的 HandlePutResultTask ,每个 HandlePutResultTask 负责一个延迟级别的消费与投递。

第四步:投递时间到了,将消息数据重新写入到 commitlog
消息到期后,需要投递到目标 Topic 。第一步已经记录了原来的 Topic 和队列信息,这里需要重新设置,再存储到 commitlog 中。
第五步:将消息投递到目标 Topic 中
Broker 端的后台服务线程会不停地分发请求并异步构建 consumequeue(消费文件)和 indexfile(索引文件)。因此消息会直接投递到目标 Topic 的 consumequeue 中,之后消费者就可以消费到这条消息。
回顾了延迟消息的机制,消费消息失败后,消费者实例会通过 CONSUMER_SEND_MSG_BACK 请求,将失败消息发回到 Broker 端。
Broker 端 SendMessageProcessor 处理器会调用 asyncConsumerSendMsgBack 方法。

第一判断消息的当前重试次数是否大于等于最大重试次数,如果达到最大重试次数,或者配置的重试级别小于0,则重新创建 Topic ,规则是 %DLQ% + consumerGroup,后续处理消息发送到死信队列。
正常的消息会进入 else 分支,对于首次重试的消息,默认的 delayLevel 是 0 ,RocketMQ 会将 delayLevel + 3,也就是加到 3 ,这就是说,如果没有显示的配置延时级别,消息消费重试首次,是延迟了第三个级别发起的重试,也就是距离首次发送 10s 后重试,其主题的默认规则是 %RETRY% + consumerGroup。
当延时级别设置完成,刷新消息的重试次数为当前次数加 1 ,Broker 端将该消息刷盘,逻辑如下:

延迟消息写入到 commitlog 里 ,这里实则和延迟消息机制的第一步类似,后面按照延迟消息机制的流程执行即可(第二步到第六步)。