rabbitMQ:绑定Exchange发送和接收消息(topic)
创始人
2024-01-20 06:45:14
0

topic交换机和fanout交换机类似,也是广播机制,但是topic需要绑定RoutingKey,绑定RoutingKey时可以使用通配符(*,#)代替。

*:只能一个单词

#:0个或多个单词

编写topic消息发送类

1.编写Receive1类

package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive1 {public static void main(String[] args)  {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue1",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue1","topicExchange","aa");channel.basicConsume("topicQueue1",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("1消费者aa:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

编写Receive2类

package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive2 {public static void main(String[] args)  {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue2",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue2","topicExchange","aa.*");channel.basicConsume("topicQueue2",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("2消费者aa.*:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

编写Receive3类

package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive3 {public static void main(String[] args)  {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue3",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue3","topicExchange","aa.#");channel.basicConsume("topicQueue3",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("3消费者aa.#:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}

2.分别启动三个接收类

 

 

 

注意:

1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如 test.myRoutingKey

编写topic的消息接收类

1.编写发送类

package com.it.rabbitmq.exchange.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static void main(String[] args) {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//配置rabbitMQ的连接信息factory.setHost("192.168.174.129");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");//定义连接Connection connection = null;//定义通道Channel channel = null;try {connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare("topicExchange", "topic", true);String message = "topic的测试消息!";channel.basicPublish("topicExchange", "aa", null, message.getBytes("utf-8"));System.out.println("消息发送成功,direct");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

2.测试发送类

1)当RoutingKey为aa时

 接收类1(aa),接收类3(aa.#)可以接收到消息

 

 

2)当RoutingKey为aa.bb时

接收类2(aa.*)和接收类3(aa.#)可以接收消息

 

 

3)当RoutingKey为aa.bb.cc时

接收类3(aa.#)可以接收消息

 

注意:

1、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...