RocketMQ-延时消息实现原理分析
编辑来源:CSDN-hosaos
链接:https://blog.csdn.net/hosaos/article/details/90577732
日期: 2019-05-26 19:34:18
在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。
如何实现延迟任务
第一反应是利用cron方案来实现:
启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。
cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:
当数据量大的时候轮询效率低;
时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;
那么有没其他解决方案?关键有2点设计要求:
能够在指定时间间隔后触发某个业务操作
能够应对业务数据量特别大的特殊场景
RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。
延时消息
Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
首先需要在RocketMQ配置文件中给出延时等级的定义,在broker.conf中指定以下配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
测试代码:
控制器
/**
* 定时消息发送
*/
@RequestMapping("/send/delay")
public void delay(){
rocketMqProducer.sendDelay("test_delay","延时消息",3000L, 5);
}
生产者
/**
* 发送延时消息
*/
public void sendDelay(String topic, String msgBody, long timeout, Integer delayLevel) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), timeout, delayLevel);
if (ObjectUtils.isNotEmpty(sendResult)) {
//sendResult不空则表示消息发送成功
log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
}
}
消费者
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_delay", selectorExpression = "*", consumerGroup = "test_delay")
public class SimpleDelayMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive async message:{}", msg);
}
}
结果,对比上面的配置,延时等级5是一分钟,消息发送和消费正好一分钟。
2020-11-27 17:56:55.248 INFO 51704 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 延时消息, messageId = AC100208C9F818B4AAC289BF48250000
2020-11-27 17:57:55.280 INFO 51704 --- [MessageThread_1] c.c.b.m.r.c.SimpleDelayMessageListener : receive async message:延时消息
RocketMQ延时消息实现思路
producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别
broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别-1
mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列
根据消费偏移量offset从commitLog中解析出对应消息
从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递
若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递
- 0
- 0
-
赞助
赞赏 -
分享