topic交换机和fanout交换机类似,也是广播机制,但是topic需要绑定RoutingKey,绑定RoutingKey时可以使用通配符(*,#)代替。
*:只能一个单词
#:0个或多个单词
1.编写Receive1类
package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive1 {public static void main(String[] args) {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue1",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue1","topicExchange","aa");channel.basicConsume("topicQueue1",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("1消费者aa:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
编写Receive2类
package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive2 {public static void main(String[] args) {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue2",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue2","topicExchange","aa.*");channel.basicConsume("topicQueue2",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("2消费者aa.*:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
编写Receive3类
package com.it.rabbitmq.topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Receive3 {public static void main(String[] args) {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setUsername("root");factory.setPassword("123456");factory.setHost("192.168.174.129");factory.setPort(5672);Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象try {connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象channel.queueDeclare("topicQueue3",true,false,false,null);channel.exchangeDeclare("topicExchange", "topic", true);channel.queueBind("topicQueue3","topicExchange","aa.#");channel.basicConsume("topicQueue3",true, "",new DefaultConsumer(channel) {public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body);System.out.println("3消费者aa.#:"+bodyStr);}});} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}
}
2.分别启动三个接收类
注意:
1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如 test.myRoutingKey
1.编写发送类
package com.it.rabbitmq.exchange.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static void main(String[] args) {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//配置rabbitMQ的连接信息factory.setHost("192.168.174.129");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");//定义连接Connection connection = null;//定义通道Channel channel = null;try {connection = factory.newConnection();channel = connection.createChannel();channel.exchangeDeclare("topicExchange", "topic", true);String message = "topic的测试消息!";channel.basicPublish("topicExchange", "aa", null, message.getBytes("utf-8"));System.out.println("消息发送成功,direct");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
2.测试发送类
1)当RoutingKey为aa时
接收类1(aa),接收类3(aa.#)可以接收到消息
2)当RoutingKey为aa.bb时
接收类2(aa.*)和接收类3(aa.#)可以接收消息
3)当RoutingKey为aa.bb.cc时
接收类3(aa.#)可以接收消息
注意:
1、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息
上一篇:【微信小程序】flex布局