队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了以后被取出处理
延时队列就是用来存放需要在指定时间被处理的元素的队列
订单在十分钟之内未支付则自动取消
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用
发送消息时指定消息的过期属性:
在创建队列的时候设置队列的“x-message-ttl”属性
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息
1)添加依赖
org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test com.alibaba fastjson 1.2.47 org.projectlombok lombok
2) 添加最基本的 RabbitMQ 的配置
spring:rabbitmq:host: 192.168.126.10port: 5672username: adminpassword: 123
队列QA : 队列TTL属性设置为10s
队列QB:队列TTL属性设置为40s
正常交换机 X
死信交换机 Y
死信队列 QD
创建交换机、队列、绑定关系等
/*** @author houChen* @date 2022/11/13 22:58* @Description: 配置交换机*/
@Configuration
public class TtlQueueConfig {private static final String X_EXCHANGE = "X";private static final String QA_QUEUE = "QA";private static final String QB_QUEUE = "QB";//死信交换机private static final String DEAD_EXCHANGE = "Y";private static final String DEAD_QUEUE = "QD";//创建普通交换机@Beanpublic DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//创建死信交换机@Beanpublic DirectExchange yExchange() {return new DirectExchange(DEAD_EXCHANGE);}//创建队列QA, ttl为10秒,绑定到对应的死信交互机@Beanpublic Queue queueA() {Map args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由键args.put("x-dead-letter-routing-key", "YD");//声明队列的TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QA_QUEUE).withArguments(args).build();}//队列QA绑定 X交换机@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//创建队列QB, ttl为40秒,绑定到对应的死信交互机@Beanpublic Queue queueB() {Map args = new HashMap<>();//声明当前队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//声明当前队列的死信路由键args.put("x-dead-letter-routing-key", "YD");//声明队列的TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QB_QUEUE).withArguments(args).build();}//队列QB绑定 X交换机@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//声明死信队列 QD@Beanpublic Queue queueD() {return QueueBuilder.durable(DEAD_QUEUE).build();}//死信队列和死信交换机进行绑定@Beanpublic Binding queuedBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}
/*** @author houChen* @date 2022/11/14 22:39* @Description: 消息生产者代码*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);}}
/*** @author houChen* @date 2022/11/14 23:34* @Description: 消费者代码*/
@Slf4j
@Component
public class DelayQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException {String msg = new String(message.getBody(), "utf-8");log.info("当前时间:{},收到死信队列消息{}", new Date(), msg);}
}
发送请求 :http://localhost:8080/ttl/sendMessage/aaaa
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了
缺点
使用队列的 x-message-ttl 属性的话,每增加一个新的时间需求,就需要新增一个队列,不太好扩展
上一篇:追踪malloc/free的使用
下一篇:KEIL5中头文件路劲包含问题