延迟消息
在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立即扣减商品库存。例如电影院购票、高铁购票、淘宝秒杀,下单后就会锁定座位或者库存资源,其他人无法重复购买。
但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!
因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立即取消订单并释放占用的资源。
例如,订单支付超过时间为30分钟,则我们应该在用户下单后的第15分钟检查订单支付状态,如果发现未支付,应该立即取消订单,释放资源。
像这种在一段时间以后才执行的任务,称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息。
在RabbitMQ中实现延迟消息也有两种方案:
- 死信交换机+TTL
- 延迟消息插件
死信交换机和延迟消息
首先讲一下基于死信交换机的延迟消息方案
死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过 dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时假如有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用?
- 收集因处理失败而被拒绝的消息
- 收集有队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
延迟消息
作用中前面两种场景可以看做是把死信交换机当做一种消息处理的最终兜底方案,与消费者重试时说的RepublishMessageRecoverer作用类似。
而最后一种场景,可以设计下面这样的场景:
如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是 ttl.queue
没有消费者监听,而是设定了死信交换机 hmall.direct
,而队列 direct.queue1
则与死信交换机绑定,RoutingKey是blue。
假如我们现在发送一条消息到 ttl.fanout
,RoutingKey为blue,并设置消息的有效期为5000ms:
👀️ 注意:
尽管这里的ttl.fanout
不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct
才能正确路由消息。
消息肯定会被投递到 ttl.queue
之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
死信被再次投递到死信交换机 hmall.direct
,并沿用之前的RoutingKey,也就是 blue
:
由于 direct.queue1
与 hmall.direct
绑定的key是blue,因此最终消息被成功路由到 direct.queue1
,如果此时有消费者与 direct.queue1
绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息,成功实现了延迟消息。
👀️ 注意:
RabbitMQ的消息过期是基于追溯方式实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此要设置的TTL时间不一定准确。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此在RabbitMQ3.5.7及以上的版本提供了一个延迟消息插件(rabbitmq-delayed-message-exchange)来实现相同的效果。
官方文档说明:
url: https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
title: "Scheduling Messages with RabbitMQ | RabbitMQ"
description: "For a while people have looked for ways of implementing delayed"
host: blog.rabbitmq.com
favicon: https://blog.rabbitmq.com/img/rabbitmq-logo.svg
image: https://www.rabbitmq.com/img/rabbitmq-social-media-card.svg
下载
插件下载地址:
url: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
title: "GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ"
description: "Delayed Messaging for RabbitMQ. Contribute to rabbitmq/rabbitmq-delayed-message-exchange development by creating an account on GitHub."
host: github.com
favicon: https://github.githubassets.com/favicons/favicon.svg
image: https://opengraph.githubassets.com/43d1786218c7aef1600964d66a3dac449b410ff8578faf2f85b2f48ea76cb4dc/rabbitmq/rabbitmq-delayed-message-exchange
安装
基于docker安装,先查看RabbitMQ的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[
{
"CreatedAt": "2024-06-19T09:22:59+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
插件目录被挂载到了 /var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
运行结果如下:
声明延迟交换机
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
基于@Bean的方式:
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
👀️注意:
RabbitMQ的消息过期是基于追溯方式实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此要设置的TTL时间不一定准确。
- 感谢你赐予我前进的力量