七、延时队列
创始人
2024-05-30 22:46:22
0

1、延时队列的概念

队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了以后被取出处理

延时队列就是用来存放需要在指定时间被处理的元素的队列

2、延时队列使用的场景

订单在十分钟之内未支付则自动取消
在这里插入图片描述
在这里插入图片描述

3、RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用

3.1 消息设置TTL

发送消息时指定消息的过期属性:
在这里插入图片描述

3.2 队列设置TTL属性

在创建队列的时候设置队列的“x-message-ttl”属性
在这里插入图片描述

3.3 两者之间的区别

  • 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)
  • 如果设置了消息的TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息

4、整合SpringBoot

1)添加依赖

org.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-testtestcom.alibabafastjson1.2.47org.projectlomboklombok

2) 添加最基本的 RabbitMQ 的配置

spring:rabbitmq:host: 192.168.126.10port: 5672username: adminpassword: 123

5、队列TTL实现延时队列

5.1 代码架构图

在这里插入图片描述
队列QA : 队列TTL属性设置为10s
队列QB:队列TTL属性设置为40s
正常交换机 X
死信交换机 Y
死信队列 QD

5.2 配置文件

创建交换机、队列、绑定关系等

/*** @author houChen* @date 2022/11/13 22:58* @Description: 配置交换机*/
@Configuration
public class TtlQueueConfig {private static final String X_EXCHANGE = "X";private static final String QA_QUEUE = "QA";private static final String QB_QUEUE = "QB";//死信交换机private static final String DEAD_EXCHANGE = "Y";private static final String DEAD_QUEUE = "QD";//创建普通交换机@Beanpublic DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//创建死信交换机@Beanpublic DirectExchange yExchange() {return new DirectExchange(DEAD_EXCHANGE);}//创建队列QA, ttl为10秒,绑定到对应的死信交互机@Beanpublic Queue queueA() {Map args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由键args.put("x-dead-letter-routing-key", "YD");//声明队列的TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QA_QUEUE).withArguments(args).build();}//队列QA绑定 X交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//创建队列QB, ttl为40秒,绑定到对应的死信交互机@Beanpublic Queue queueB() {Map args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由键args.put("x-dead-letter-routing-key", "YD");//声明队列的TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QB_QUEUE).withArguments(args).build();}//队列QB绑定 X交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//声明死信队列 QD@Beanpublic Queue queueD() {return QueueBuilder.durable(DEAD_QUEUE).build();}//死信队列和死信交换机进行绑定@Beanpublic Binding queuedBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

5.3 消息生产者

/*** @author houChen* @date 2022/11/14 22:39* @Description: 消息生产者代码*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);}}

5.4 消费者代码

/*** @author houChen* @date 2022/11/14 23:34* @Description: 消费者代码*/
@Slf4j
@Component
public class DelayQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException {String msg = new String(message.getBody(), "utf-8");log.info("当前时间:{},收到死信队列消息{}", new Date(), msg);}
}

5.5 测试

发送请求 :http://localhost:8080/ttl/sendMessage/aaaa
在这里插入图片描述
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了

缺点
使用队列的 x-message-ttl 属性的话,每增加一个新的时间需求,就需要新增一个队列,不太好扩展

6、延时队列优化

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...