【MQ工作队列模式】
创始人
2024-01-28 22:06:30
0
1、模式介绍 ⚫Work Queues:与入门程序的简单模式相比,多了一个或一些消费端, 多个消费端共同消费同一个队列中的消息。 ⚫ 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处 理的速度。 小结: 1、在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关 系是竞争的关系 2、Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任 务处理的速度。例如:短信服务部署多个, 只需要有一个节点成功发送即可。

 

2、代码实现 Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复 制,并多复制一个消费者进行多 个消费者同时对消费消息的测试。 1、生产者 生产者代码Producer_WorkQueues:
package com.dxw.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2、设置参数
factory.setHost("localhost");//ip 默认localhost
factory.setPort(5672);//端口 默认5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3、创建连接
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该队
列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自动
删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、发送消息
/*
* 参数解释:
* basicPublish(String exchange,
* String routingKey,
* BasicProperties props,
* byte[] body)
* 1. exchange:交换机名称。简单模式下交换机会使用默认的
""
* 2. routingKey:路由名称
* 3. props:配置信息
* 4. body:发送消息数据
启动生产者观察控制台
2、消费者
第一个消费者代码Consumer_WorkQueues1:
*/
for(int i=1;i<=10;i++){
String body = i+"hello rabbitmq~~~";
channel.basicPublish("","work_queues",null,body.getBytes(
));
}
//7、释放资源
//channel.close();
//connection.close();
}
}
启动生产者观察控制台

 

2、消费者 第一个消费者代码Consumer_WorkQueues1:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
第二个消费者代码Consumer_WorkQueues2:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出
3、Pub/Sub订阅模式
1、模式介绍
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是
发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方
面,知道如何处理消息,例如递交给某个特别队 列、递交给所有队列、
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出

 

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...