xiaoming728

xiaoming728

RocketMQ-延时消息实现原理分析

2023-12-11
RocketMQ-延时消息实现原理分析

来源:CSDN-hosaos

链接:https://blog.csdn.net/hosaos/article/details/90577732

日期: 2019-05-26 19:34:18

在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。

如何实现延迟任务

第一反应是利用cron方案来实现:

启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。

cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;

  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;

  • 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

那么有没其他解决方案?关键有2点设计要求:

  1. 能够在指定时间间隔后触发某个业务操作

  2. 能够应对业务数据量特别大的特殊场景

RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。

延时消息

Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

首先需要在RocketMQ配置文件中给出延时等级的定义,在broker.conf中指定以下配置:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

测试代码:
控制器

    /**
     * 定时消息发送
     */
    @RequestMapping("/send/delay")
    public void delay(){
        rocketMqProducer.sendDelay("test_delay","延时消息",3000L, 5);
    }

生产者

    /**
     * 发送延时消息
     */
    public void sendDelay(String topic, String msgBody, long timeout, Integer delayLevel) {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), timeout, delayLevel);
        if (ObjectUtils.isNotEmpty(sendResult)) {
            //sendResult不空则表示消息发送成功
            log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
        }
    }

消费者

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_delay", selectorExpression = "*", consumerGroup = "test_delay")
public class SimpleDelayMessageListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("receive async message:{}", msg);
    }
}

结果,对比上面的配置,延时等级5是一分钟,消息发送和消费正好一分钟。

2020-11-27 17:56:55.248  INFO 51704 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer      : send success , send msg = 延时消息, messageId = AC100208C9F818B4AAC289BF48250000
2020-11-27 17:57:55.280  INFO 51704 --- [MessageThread_1] c.c.b.m.r.c.SimpleDelayMessageListener   : receive async message:延时消息

RocketMQ延时消息实现思路

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别

  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1

  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

  4. 根据消费偏移量offset从commitLog中解析出对应消息

  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递