RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing
创始人
2024-01-30 15:42:18
0

第四章-RabbitMQ工作模式-Routing

1.模式介绍

1.1 模式

路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与默认Exchange一致。但还有最核心的一点,上节未使用Routing key,此次模式中,会依赖于此Key。

如下图:

消费发送的时候,会指定routing key ,例如指定 black,那么消费发送至 X -则会根据routing key路由消息至队列Q2,被消费者C2进行消费。

若此时有个key ,比如blue 同时指向了 Q1 和 Q2,那消息也会同时传送到Q1与Q2,这与广播模式很像了。

1.2 场景

业务中需要根据不同的条件对所产生的消息进行不同的独立消费,可以使用此模式。

例如系统中产生不同level的日志,根据level的不同上送到不同的queue,进行相应的消费。

info类型指定 info的queue,debug类型指定debug的queue,error类型的指定error的queue

实际运用不多,有实际生产运用的小伙伴可以留言,学习下。

1.3 模拟

三个重点 1.声明direct类型的 交换机 2. 绑定好交换机与queue 3. 发消息指定路由键

 

2.代码验证

梦总会醒的,小明觉得复制水,不现实,搞个现实的开关吧。因为有天小丽对他说,洗水澡很没意思,想洗牛奶澡,要小明一定帮她实现。小明自己想了想觉得可以试试,但自己也想搞个新奇的,不喜欢牛奶,喜欢红酒,洗个红酒澡吧,一边洗,一边喝。醉洗 -- o(* ̄︶ ̄*)o。

2.1 生产者


/*** @author rabbit* @version 1.0.0* @Description 一个生产者,一个交换机,两个队列,两个消费者* 声明一个DIRECT类型的exchange,并且根据Routingkey绑定指定的队列* 生产者在创建DIRECT类型的exchange后绑定相应的队列,并且指定Routingkey。在发送消息是也要指定消息的Routingkey* 

* 另:TOPIC模式* 生产者创建Topic的exchange并且并且指定队列,这次绑定可以通过*和#匹配关键字,对指定RoutingKey内容进行匹配。* *(星号)可以代替一个单词。* #(哈希)可以替代零个或多个单词。* channel.queueBind("topics-queue-1", "topics-exchange", "zhang.*");* channel.basicPublish("topics-exchange", "zhang.sna", null, "张三".getBytes());* @createTime 2022/07/27 19:34:00*/ public class WaterProducer {public static final String PUBSUB_QUEUE_1 = "SolarWaterHeater-RedWine";public static final String PUBSUB_QUEUE_2 = "SolarWaterHeater-Milk";//生产者public static void main(String[] args) throws Exception {//1、获取connectionConnection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {sendMsg(channel, i);Thread.sleep(1000);}//4、关闭管道和连接channel.close();connection.close();}private static void sendMsg(Channel channel, int k) throws IOException {//3、创建exchange并且指定类型channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);//4、绑定队列 routing-queue-error routing-queue-infochannel.queueBind(PUBSUB_QUEUE_2, "routing-exchange", "MILK");channel.queueBind(PUBSUB_QUEUE_1, "routing-exchange", "REDWINE");/*** 参数1:指定exchange,使用“”。默认的exchange* 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中* 参数3:指定传递的消息携带的properties* 参数4:指定传递的消息,byte[]类型*///5、发送消息并且指定接收的队列的routingkeychannel.basicPublish("routing-exchange", "MILK", null, ("牛奶-" + k+"升").getBytes());channel.basicPublish("routing-exchange", "REDWINE", null, ("红酒-" + k+"升").getBytes());System.out.println("水龙头分别放牛奶和红酒成功!" + k + "升");}}

2.2 消费者

小明:


/*** @author rabbit* @version 1.0.0* @Description Routing 模式 一个生产者,一个交换机,两个队列,两个消费者* @createTime 2022/07/27 19:36:00*/
public class XMShowerConsumer {public static final String PUBSUB_QUEUE_1 = "SolarWaterHeater-RedWine";//消费者public static void consumer() throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(PUBSUB_QUEUE_1, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("小明洗澡用: " + new String(body, "UTF-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调**///3.关闭自动ACKchannel.basicConsume(PUBSUB_QUEUE_1, false, consumer);System.out.println("小明开始洗红酒澡......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}public static void main(String[] args) throws Exception {XMShowerConsumer.consumer();}}

小丽:


/*** @author rabbit* @version 1.0.0* @Description Routing 模式 一个生产者,一个交换机,两个队列,两个消费者* @createTime 2022/07/27 19:36:00*/
public class XLShowerConsumer {public static final String PUBSUB_QUEUE_2 = "SolarWaterHeater-Milk";//消费者public static void consumer() throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();channel.basicQos(1);//3、创建队列-helloworld/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(PUBSUB_QUEUE_2, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("小丽洗澡用: " + new String(body, "UTF-8"));try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}//手动ACK(接收信息,指定是否批量操作)channel.basicAck(envelope.getDeliveryTag(), false);}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调**///3.关闭自动ACKchannel.basicConsume(PUBSUB_QUEUE_2, false, consumer);System.out.println("小丽开始洗牛奶澡......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}public static void main(String[] args) throws Exception {XLShowerConsumer.consumer();}}

2.3 结论验证

生产者:

水龙头分别放牛奶和红酒成功!1升
水龙头分别放牛奶和红酒成功!2升
水龙头分别放牛奶和红酒成功!3升
水龙头分别放牛奶和红酒成功!4升
水龙头分别放牛奶和红酒成功!5升
水龙头分别放牛奶和红酒成功!6升
水龙头分别放牛奶和红酒成功!7升
水龙头分别放牛奶和红酒成功!8升
水龙头分别放牛奶和红酒成功!9升
水龙头分别放牛奶和红酒成功!10升

小明:

小明洗澡用: 红酒-1升
小明洗澡用: 红酒-2升
小明洗澡用: 红酒-3升
小明洗澡用: 红酒-4升
小明洗澡用: 红酒-5升
小明洗澡用: 红酒-6升
小明洗澡用: 红酒-7升
小明洗澡用: 红酒-8升
小明洗澡用: 红酒-9升
小明洗澡用: 红酒-10升

小丽

小丽洗澡用: 牛奶-1升
小丽洗澡用: 牛奶-2升
小丽洗澡用: 牛奶-3升
小丽洗澡用: 牛奶-4升
小丽洗澡用: 牛奶-5升
小丽洗澡用: 牛奶-6升
小丽洗澡用: 牛奶-7升
小丽洗澡用: 牛奶-8升
小丽洗澡用: 牛奶-9升
小丽洗澡用: 牛奶-10升

通过开关控制,分别给热水器里注入红酒和牛奶,红酒和牛奶存储到各自独立的水槽中,供两个人洗澡使用。

3.总结

核心点:

3.1 声明Direct 类型的Exchange

3.2 绑定好对应的 Queue

3.3 发送消息指定 Routing key

核心代码:

channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind(PUBSUB_QUEUE_2, "routing-exchange", "MILK");
channel.basicPublish("routing-exchange", "MILK", null, ("牛奶-" + k+"升").getBytes());

另:TOPIC模式

topic模式不再单独展开

topic模式最核心的内容是引入了通配符 * ,# 

* *(星号)可以代替一个单词。 
* #(哈希)可以替代零个或多个单词。

另外 声明的 Exchage 类型为topic 。

看下模拟情况:

1.首先声明Topic类型的 交换机

2.使用带通配符的 routing key 做好绑定关系

3. 发消息时,指定好对应的routing key ,按预期会将消息流转到对应的queue中。

 

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...