RabbitMQ------延迟队列(整合SpringBoot以及使用延迟插件实现真正延时)(七)
创始人
2024-04-10 15:48:54
0

RabbitMQ------延迟队列(七)

延迟队列

延迟队列,内部是有序的,特点:延时属性。
简单讲:延时队列是用来存放需要在指定时间被处理的元素队列。
是基于死信队列的消息过期场景。

适用场景

1.订单在十分钟之内未支付则自动取消。
2.用户注册后,三天内没有登陆,则短信提醒。
特点:需要在某个事件发生之后或者之前的特定事件点完成莫一项任务。

整合SpringBoot

导入依赖

    org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-webcom.alibabafastjson1.2.34org.projectlomboklombokio.springfoxspringfox-swagger22.6.1io.springfoxspringfox-swagger-ui2.6.1org.springframework.amqpspring-rabbit-test2.4.7test

在配置文件application.properties中写明rabbitmq的IP、端口、用户名以及密码

spring.rabbitmq.host=192.168.200.139
spring.rabbitmq.prot=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123

架构图如图所示
在这里插入图片描述
队列1设置的过期时间为10s,队列2设置的过期时间为40s。
代码分三部分:生产者、消费者、以及交换机队列整体作为一部分。
生产者和消费者都不在进行交换机以及队列的声明。
交换机以及队列配置类的书写:

/*** TTL队列  配置文件类代码*/
@Configuration
public class TTLQueueConfig {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_EXCHANGE = "Y";//普通队列1 过期时间10spublic static final String QA_QUEUE = "QA";//普通队列2 过期时间40spublic static final String QB_QUEUE = "QB";//死信队列public static final String QD_QUEUE = "QD";//声明X交换机@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明X交换机@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_EXCHANGE);}//声明普通队列1@Bean("queueA")public Queue queueA(){Map arguments = new HashMap<>();//设置死信队列arguments.put("x-dead-letter-exchange",QD_QUEUE);//设置routingkeyarguments.put("x-dead-letter-routing-key","YD");//设置ttlarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();}//声明普通队列2@Bean("queueB")public Queue queueB(){Map arguments = new HashMap<>();//设置死信队列arguments.put("x-dead-letter-exchange",QD_QUEUE);//设置routingkeyarguments.put("x-dead-letter-routing-key","YD");//设置ttlarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();}//声明死信队列@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(QD_QUEUE).build();}//绑定//通过容器名字进行捆绑,绑定普通队列A和交换价X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//通过容器名字进行捆绑,绑定普通队列B和交换价X@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XA");}//通过容器名字进行捆绑,绑定死信队列D和交换价Y@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

生产者代码示例:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 发送延迟* 生产者*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//开始发消息@GetMapping("/send/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date().toString(),message);/*** 交换机* routingkey* message*/rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的队列:"+message);}
}

消费者代码示例:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL 消费者*/
@Slf4j
@Component
public class DeadLetterConsumer {//接收消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String string = new String(message.getBody(),"UTF-8");log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),string);}
}

注意导包,不要导错了。

结果:第一条消息在10s后变成死信消息,然后被消费者消费掉,第二条消息在40s后变成死信消息,然后被消费者消费掉,这样就达成了延迟队列的目的。

局限性:每增加一个延时需求,都需要新增一个普通队列。这样是不合理的。
优化:只有一个延时队列,由生产者指定需要延时多久

延时队列优化,由生产者指定延时时间

增加一个队列QC,QC不设置过期时间,过期时间由生产者指定。
配置类代码新增QC,不设置存活时间,由生产者发送

    //设置普通队列public static final String QC_QUEUE = "QC";//设置普通队列@Bean("queueC")public  Queue queueC(){HashMap arguments = new HashMap<>();//设置死信队列arguments.put("x-dead-letter-exchange",QD_QUEUE);//设置routingkeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QC_QUEUE).withArguments(arguments).build();}//绑定//通过容器名字进行捆绑,绑定普通队列A和交换价X@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}

生产者新增代码

//开始发消息@GetMapping("/sendExpirationMessage/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条消息给QC,ttl队列:{},过期时间为:{}ms",new Date().toString(),message,ttlTime);/*** 交换机* routingkey* message* MessagePostProcessor,可以设置存活时间*///ttlTime设置置过期时间rabbitTemplate.convertAndSend("X","XC","消息来自ttl的队列:"+message,msg->{//发送消息时,设置存活时间msg.getMessageProperties().setExpiration(ttlTime);return msg;   });}

使用这种方式,消息并不会按时死亡。因为RabbitMQ只会检测第一个消息是否过期,如果过期,会被放入死信队列。

经过测试发现,第一个发送20s过期的消息,第二个发送2s过期的消息,结果依然是20s后,20s消息被消费,之后,2s消息才会被消费。 说明延时队列是按顺序执行。如果第一个消息延时很久,后续消息也会延时,并不会优先执行。

此现象只能通过,基于插件的RabbitMQ进行弥补,自身无法弥补这个缺陷。

RabbitMQ插件实现延时队列

安装插件

在官网上下载

https://www.rabbitmq.com/community-plugins.html

下载rabbitmq_delayed_message_exchange插件。解压放在RabbitMQ的插件目录。
进入RabbitMQ的安装目录下的plgins目录,执行以下命令让该插件生效,然后重启RabbitMQ。

--  3.8.8代表rabbitmq版本
-- 目录如下
cd   /usr/lib/rabbitmq/rabbitmq_server_3.8.8/plugins
-- 安装命令,不用写插件版本号
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-- 重启rabbitmq
systemctl restart  rabbitmq-server(安装时的服务名)

重启好后打开rabbitmq的管理端页面,可以在Exchanges目录下,Add a new exchange,Type 中,会增加一个x-delayed-message的选项。
使用插件,结构更加简单
代表由交换机进行延迟,而不是队列了。
在这里插入图片描述
配置类书写
当Bean中不指定名称时,名称默认方法名
自定义交换机时,需要指定交换机类型,而之前未自定交换机,直接创建的DirectExchange交换机

/*** 延迟交换机*/
@Configuration
public class DelayedQueueConfig {//延迟public static final String DELAYED_EXCHANGE = "delayed.exchange";//延迟队列public static final String DELAYED_QUEUE = "delayed.queue";//routingkeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingkey";//声明  自定义交换机,基于插件@Beanpublic CustomExchange delayedExchange(){//String name, String type, boolean durable, boolean autoDelete, Map arguments/*** 交换机名字* 类型* 是否持久化* 是否自动删除* 自定义参数*/HashMap arguments = new HashMap();arguments.put("x-delayed-type","direct");return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",false,false,arguments);}//声明延迟队列@Beanpublic Queue queueDe(){return new Queue(DELAYED_QUEUE);}//绑定  当Bean中不指定名称时,名称默认方法名@Beanpublic Binding queueBindingExchange(@Qualifier("queueDe") Queue queue,@Qualifier("delayedExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者以及消费者代码与之前相同。
结论:可以实现根据过期时间,消费消息。

延时队列也有很多其他的选择,比如Java的DelayQueue,利用Redis的Zset,利用Quartz或者利用kafka的时间轮,各有特点,需要合适的场景。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...