Springboot + RabbitMq 消息队列
创始人
2024-05-24 14:51:14
0

前言

一、RabbitMq简介

1、RabbitMq场景应用,RabbitMq特点

场景应用
以订单系统为例,用户下单之后的业务逻辑可能包括:生成订单、扣减库存、使用优惠券、增加积分、通知商家用户下单、发短信通知等等。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成订单)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独消费线程拉取MQ的消息(或者由 MQ 推送消息。
特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

  • AMQP :Advanced Message
    Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

  • 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange
    来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个
    Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等多种消息中间件协议。

  • 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如Java、Python 、Ruby 、PHP 、C# 、JavaScript 等。

  • 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

2、RabbitMq工作流程

  • publisher连接到rabbitMQ Broker,创建connection,开启channel。声明交换机类型、名称、是否持久化等。

  • 消息可靠投递(开启confirmCallback returnCallback),并指定消息是否持久化等属性和routing key。

  • exchange收到消息之后,根据routing key路由到跟当前交换机绑定的相匹配的(存)队列里面。

  • consumer监听接收到消息之后开始业务处理,然后发送一个ack确认告知消息已经被消费。
    rabbitMQ Broker收到ack之后将对应的消息从队列里面删除掉。
    如下图:
    在这里插入图片描述

  • Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  • Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Connection
    网络连接,比如一个TCP连接。

  • Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  • Broker
    表示消息队列服务器实体。

二 引用RabbitMq

将RabbitMq配置成插件使用
1.创建工程 pom.xml 引用jar
在这里插入图片描述

		org.springframework.cloudspring-cloud-starter-bus-amqporg.springframework.cloudspring-cloud-commons
  1. 创建RabbitMQConfig

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {public static final String DELAYED_SWITCH_OFF_QUEUE_NAME = "delay.queue.switchOff.delay.queue";public static final String DELAYED_SWITCH_OFF_EXCHANGE_NAME = "delay.queue.switchOff.delay.exchange";public static final String DELAYED_SWITCH_OFF_ROUTING_KEY = "delay.queue.switchOff.delay.routingkey";/*** @desciption 控制设备开关延时关闭* @author zhangbl* @Date : 2022-08-08 16:29**/@Beanpublic Queue immediateSwitchOffQueue() {return new Queue(DELAYED_SWITCH_OFF_QUEUE_NAME,true);}@Beanpublic CustomExchange customSwitchOffExchange() {Map args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_SWITCH_OFF_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingSwitchOffNotify() {return BindingBuilder.bind(immediateSwitchOffQueue()).to(customSwitchOffExchange()).with(DELAYED_SWITCH_OFF_ROUTING_KEY).noargs();}
}
  1. 创建RabbitMq注解类 EnableRabbitMq
import com.common.rabbitmq.config.RabbitMQConfig;
import org.springframework.context.annotation.Import;import java.lang.annotation.*;/*** 

* 开启支持RabbitMQ*/ @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import({ RabbitMQConfig.class }) public @interface EnableRabbitMq {}

  1. 服务引用Mq pom.xml添加mq插件jar包
com.mqcommon-rabbitmq

  1. 启动类引用 @EnableRabbitMq 注解
@EnableRabbitMq
@SpringBootApplication
public class TestApplication{public static void main(String[] args) {SpringApplication.run(TestApplication.class, args);}
}
  1. 创建消费者实现类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class SwitchOffListener {@Resourceprivate RedisTemplate strRedisTemplate;@Resourceprivate RabbitTemplate rabbitTemplate;@RabbitListener(queues = RabbitMQConfig.DELAYED_SWITCH_OFF_QUEUE_NAME)public void onExpired(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());//获取锁,设置有效期,防止程序异常没有释放锁导致死锁String str = UUID.randomUUID().toString();String lockKey = RabbitMQConfig.DELAYED_SWITCH_OFF_QUEUE_NAME + msg;Boolean lock = strRedisTemplate.opsForValue().setIfAbsent(lockKey, str, Duration.ofSeconds(5));try {if(lock){log.info("========进入开关延时关闭处理==========");//具体实现}log.info("开关延时关闭成功!!!");} catch (Exception e) {e.printStackTrace();} finally {//释放锁if (str.equals(strRedisTemplate.opsForValue().get(lockKey))){strRedisTemplate.delete(lockKey);}}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
  1. 业务调用
 @Autowiredprivate RabbitTemplate rabbitTemplate;public void machDeviceCloseTime(String msg) {if(StringUtils.isNotBlank(msg){try {rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_SWITCH_OFF_EXCHANGE_NAME,RabbitMQConfig.DELAYED_SWITCH_OFF_ROUTING_KEY, message, listener ->{listener.getMessageProperties().setDelay(1000);//1毫秒后发送mq消息return listener;});}} catch (JsonProcessingException e) {throw new RuntimeException(e);}}}
  1. application.yml 添加配置
spring:rabbitmq:# mq单机模式配置host: 127.0.0.1port: 5672# mq集群模式配置方式addresses: 127.0.0.1:5672,127.0.0.2:5672,127.0.0.3:5672username: adminpassword: 123456virtual-host: /test#这个配置是保证提供者确保消息推送到交换机中,不管成不成功,都会回调publisher-confirm-type: correlated#保证交换机能把消息推送到队列中publisher-returns: true#这个配置是保证消费者会消费消息,手动确认listener:simple:acknowledge-mode: manualtemplate:mandatory: true

配置Rabbitmq 集群参照 https://blog.csdn.net/zhangbinlong/article/details/127067044?spm=1001.2014.3001.5502

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...