这是最简单的一个模式了,一般在实际的生产环境中,大家应该都不会使用一个消费者。只做入门的介绍。
一个生产者,一个默认的交换机【图中没体现】,一个队列,一个消费者。
生产者产生消费发送至交换机,交换机路由至队列,队列再投递给消费者进行消费。
在进行代码开发前,我们先用模拟器进行模拟看下:
消息是直接发到了默认的Exchange,【Exchange不存储消息,只路由】,路由到对应的MQ中,接着进行了消费。
我们还采用小明洗澡的方式,进行模拟。水龙头模拟生产者,热水器模拟mq,小明洗澡模拟消费者。
首次执行代码的时候,我们会发现缺失对应的Queue,可以采用两种方式进行创建。
1.去mq的控制台,手动创建queue
2.执行queue创建的代码,这里我们放到了消费者-采用此方式可以先执行消费者
/*** @author rabbit* @version 1.0.0* @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者* 1.创建生产者,创建一个channel,发送消息到默认的exchange* 2.打开控制台观察* 3.打开WireShark观察* 4.启动消费者* 5.打开控制台观察* 6.打开WireShark观察* @createTime 2022/07/27 19:34:00*/
public class WaterProducer {public static final String QUEUE_NAME = "SolarWaterHeater";//生产者public static void main(String[] args) throws Exception {//1、获取connectionConnection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();for (int i = 1; i <= 1; i++) {sendMsg(channel, i);Thread.sleep(1000);}//4、关闭管道和连接channel.close();connection.close();}private static void sendMsg(Channel channel, int k) throws IOException {//3、发送消息到exchangeString msg = k + "升";/*** 参数1:指定exchange,使用“”。默认的exchange* 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中* 参数3:指定传递的消息携带的properties* 参数4:指定传递的消息,byte[]类型*/channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("水龙头放水成功!" + k + "升");}}
/*** @author rabbit* @version 1.0.0* @Description MQ 简单模式:一个生产者,一个默认的交换机,一个队列,一个消费者* 创建一个消费者,创建一个channel,创建一个队列* @createTime 2022/07/27 19:36:00*/
public class XMShowerConsumer {public static final String QUEUE_NAME = "SolarWaterHeater";//消费者public static void main(String[] args) throws Exception {//1、获取连对象、Connection connection = RabbitCommonConfig.getConnection();//2、创建channelChannel channel = connection.createChannel();//3、创建队列/*** 参数1:queue 指定队列名称* 参数2:durable 是否开启持久化(true)* 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)* 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除* 参数5:arguments 指定队列携带的信息*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);//4.开启监听QueueDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));}};/*** 参数1:queue 指定消费哪个队列* 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)* 参数1:cancelCallback 指定消费回调*/channel.basicConsume(QUEUE_NAME, true, consumer);System.out.println("小明开始洗澡......");//5、键盘录入,让程序不结束!System.in.read();//6、释放资源channel.close();connection.close();}}
生产者:
水龙头放水成功!1升
mq控制台:
消费者:
小明开始洗澡......
小明洗澡用水: 1升
我们抓取所有关于5672端口的包,5672即mq的服务端端口,56639为本机端口。
1、首先看建立连接的部分。
前三条TCP连接有没有很熟悉,56639希望与5672建立连接,三次握手的形式建立好连接,为后续的AMQP协议的传输做好准备
2. 发送数据包部分
第四条为56639 发送给5672 一个数据,协议头 是 0-9-1 这就是我们的AMQP的版本。要准备AMQP交互了
3. AMQP协议交互
我们将TCP的包过滤掉,只看AMQP的,这里就完整呈现了,mq消息发送的整个过程。
Connection 创建、打开通道Tunne、打开虚机、打开信道Channel、发送消息、信道关闭、连接关闭。一目了然。
TCP连接不再赘述,看下消费的AMQP协议传输
基本与生产者一致,区别在于,我们在消费者中有创建Queue的操作,数据包体现在-Queue.Declare.
另外 消费者是需要消费的 Basic.Consume 即把消费者注册到了队列上。
Basic.Consume-OK 将消息推送过来,进行了消费。
我们看到Basic.Consume-OK 消息内容中有 x 即 exchange 。默认为空。
还有rk 即 routing key 路由键- 默认的为队列的名称