SpringBoot整合RabbitMQ
创始人
2024-01-31 11:54:26
0

RabbitMQ安装部署详情可见:RabbitMQ简介及在Linux中安装部署(yum)

 一、导入pom.xml依赖

com.rabbitmqamqp-client5.8.0

org.slf4jslf4j-nop1.7.29

二、入门测试案例

  发送消息类:Send.java

public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发布消息String message = "你好,老6";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("发送了消息:" + message);//关闭连接channel.close();connection.close();}
}

  接收消息类:Recv.java

public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//接收消息并消费channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}});}
}

运行结果:            ​​​​​

 三、多任务消息队列案例

发送多任务消息类NewTask.java

public class NewTask {private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);for (int i = 0; i < 10; i++) {String message;if (i % 2 == 0) {message = i + "...";} else {message = String.valueOf(i);}channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("发送了消息:" + message);}channel.close();connection.close();}
}

 接收多任务消息类Worker.java

public class Worker {private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道final Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("开始接收消息");channel.basicQos(1);channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到了消息:" + message);try {doWork(message);} finally {System.out.println("消息处理完成");channel.basicAck(envelope.getDeliveryTag(), false);}}});}private static void doWork(String task) {char[] chars = task.toCharArray();for (char ch : chars) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}

运行结果:

四、接收指定消息案例

发送消息类:EmitLogDirect.java

public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String message = "info:Hello World!";channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为info,消息内容:" + message);message = "warning:Hello World!";channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为warning,消息内容:" + message);message = "error:Hello World!";channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为error,消息内容:" + message);channel.close();connection.close();}
}

接收所有消息类:ReceiveLogsDirect1.java

public class ReceiveLogsDirect1 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//生成一个随机的临时的queueString queueName = channel.queueDeclare().getQueue();//一个交换机同时绑定3个queuechannel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("开始接收消息");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}};channel.basicConsume(queueName, true, consumer);}
}

仅仅接收error消息类:ReceiveLogsDirect2.java

public class ReceiveLogsDirect2 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//生成一个随机的临时的queueString queueName = channel.queueDeclare().getQueue();//一个交换机绑定1个queuechannel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("开始接收消息");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}};channel.basicConsume(queueName, true, consumer);}
}

运行结果:

 

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...