1.1案例实现
需求:使用简单模式完成消息传递
maven依赖如下
com.rabbitmq amqp-client 5.3.0 com.google.code.gson gson 2.8.5
生产者如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接Connection connection = connectionFactory.newConnection();//创建通信“通道” 相当于TCP的虚拟连接Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用//第四个参数:是否自动删除,false 代表连接停掉后不自动删除这个队列//其他额外参数,nullchannel.queueDeclare("helloWord",false,false,false,null);String message = "你好RabbitMQ";//第一个参数 exchange 交换机,暂时不用,进行发布订阅的时候才用//第二个参数:队列名称//第三个参数:额外设置属性//第四个参数:消息字节数组channel.basicPublish("","helloWord",null,message.getBytes());channel.close();connection.close();System.out.println("===发送成功===");}
}
消费者如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接Connection connection = connectionFactory.newConnection();//创建通信“通道” 相当于TCP的虚拟连接final Channel channel = connection.createChannel();//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列//第一个参数:队列名称ID//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用//第四个参数:是否自动删除,false 代表连接停掉后不自动删除这个队列//其他额外参数,nullchannel.queueDeclare("helloWord",false,false,false,null);//第一个参数:队列名//第二个参数:参数代表是否自动确认收到消息,false 代表手动确认消息,是MQ推荐做法//第三个参数:传入DefaultConsumer 的实现类。channel.basicConsume("helloWord",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String (body);System.out.println("消费者接收到的消息:"+message);long tagId = envelope.getDeliveryTag();//第二个参数:只确认签收当前消息,设置true 代表签收该消息者所有未签收的消息channel.basicAck(tagId,false);}});}
}
上面案例使用的是简单模式如下图
上图概念如下:
Work Queues与简单模式的代码几乎是一样的。可以复制,并多复制一个消费者进行多个消费者同时对消息消费的测试。
连接工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//设置主机信息connectionFactory.setHost("81.71.14.7");connectionFactory.setPort(5672);connectionFactory.setUsername("user");connectionFactory.setPassword("password");connectionFactory.setVirtualHost("/vhost");//获取TCP长连接return connectionFactory.newConnection();}
}
消息发生产类:
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {String queueStr = "sm";Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(queueStr,false,false,false,null);for(int i=0;i<100;i++){SMS sms = new SMS("乘客"+i,"15600000000","你的车票已经预定成功");String jsonSms = new Gson().toJson(sms);channel.basicPublish("",queueStr,null,jsonSms.getBytes());}System.out.println("====发送数据成功===");channel.close();connection.close();}// 短信封装类static class SMS{private String name;private String mobile;private String content;public SMS(String name,String mobile,String content){this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}
}
消息消费类:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class SMSSender1 {public static void main(String[] args) throws IOException, TimeoutException {String queueStr = "sm";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(queueStr,false,false,false,null);//如果不写basicQos(1) 则MQ自动将所有请求平均发送给所有的消费者//basicQos(1) ,MQ 不在对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个。channel.basicConsume(queueStr,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1===短信发送成功:"+jsonSMS);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange绑定,或者没有 符合路由规则的队列,那么消息会丢失!
订阅模式和之前的Work Queues工作队列模式相比多了一个Exchange交换机的新的概念,之前生产者直接发送到队列,现在直接发送到交换机。消费者还是直接从队列中获取消息,但是需要消费者创建队列并且把队列和交换机绑定。
注意:代码中注释的地方,一般是之前没出现过的,或者使用新的参数了,Exchange是需要通过管理界面创建的类型为Fanout 。
生产者代码:
获取Connection工具类省略了。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;import java.util.Scanner;public class WeatherBureau {public static void main(String[] args) throws Exception{String exchangeStr = "exchange-weather";Connection connection = ConnectionUtil.getConnection();//从控制台输入发布内容String input = new Scanner(System.in).next();Channel channel = connection.createChannel();//第一个参数:交换机名字channel.basicPublish(exchangeStr,"",null,input.getBytes());channel.close();connection.close();}
}
消费者代码:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;public class BaiduConsumer {public static void main(String[] args) throws Exception {String exchangeStr = "exchange-weather";String baiduQueue = "baidu-queue";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(baiduQueue,false,false,false,null);//queueBing 用于将队列和交换机绑定//参数1:队列名,参数2:交换机名,参数三:路由key 在路由模式用的channel.queueBind(baiduQueue,exchangeStr,"");channel.basicQos(1);channel.basicConsume(baiduQueue,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String weatherStr = new String(body);System.out.println("SMSSender1===短信发送成功:"+weatherStr);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
发布订阅模式比WorkQueue 工作模式多了一个交换机的概念,并且生产者发布消息不是直接到队列Queue,而是发给交换机,消费者需要创建队列,在通过Bing把交换机和队列绑定。
上图解释如下:
注意:获取Connection工具类省略了,Exchange需要在管理界面创建且类型为Direct
生产者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbitmq.ConnectionUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;public class WeatherBureau {public static void main(String[] args) throws Exception{String exchangeStr = "exchange_weather_routing";Map area = new HashMap();area.put("china.beijing.20221128","北京20221128号天气晴朗!");area.put("china.zhengzhou.20221128","郑州20221128号天气小雪!");area.put("us.NewYork.20221129","纽约20221129号天气晴朗!");area.put("us.Washington.20221129","华盛顿20221129号天气小雪!");Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Iterator> itr = area.entrySet().iterator();while (itr.hasNext()){Map.Entry m = itr.next();//第一个参数:交换机名字,第二个参数:消息的Routing keychannel.basicPublish(exchangeStr,m.getKey(),null,m.getValue().getBytes());}channel.close();connection.close();}
}
消费者代码:
import com.rabbitmq.client.*;
import rabbitmq.ConnectionUtil;
import java.io.IOException;public class BaiduConsumer {public static void main(String[] args) throws Exception {String exchangeStr = "exchange_weather_routing";String baiduQueue = "baidu-queue";Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(baiduQueue,false,false,false,null);//queueBing 用于将队列和交换机绑定//参数1:队列名,参数2:交换机名,参数三:路由key 在路由模式用的channel.queueBind(baiduQueue,exchangeStr,"china.beijing.20221128");channel.queueBind(baiduQueue,exchangeStr,"china.zhengzhou.20221128");channel.queueBind(baiduQueue,exchangeStr,"us.NewYork.20221129");channel.basicQos(1);channel.basicConsume(baiduQueue,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String weatherStr = new String(body);System.out.println("SMSSender1===短信发送成功:"+weatherStr);channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
注意:生产者发送了4个路由ke,但消费者只接受了3个,剩余的一个会退回生产者,因为没有队列存储。
Routing 模式要求队列在绑定交换机时要指定routing key ,消息转发到符合routing key的队列。不符合的不转发,此模式比较麻烦,使用比较少。
代码就不写和路由模式是一样的,消费者代码不需要每一个绑定路由,只需要写一个通配符就可以。
上一篇:域名的命名规则有哪些?
下一篇:RKMEDIA使用简介