public class RabbitMqTest {//消息队列名称private final static String QUEUE_NAME = "hello";@Testpublic void send() throws java.io.IOException, TimeoutException {//创建连接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");//创建连接Connection connection = factory.newConnection();//创建消息通道Channel channel = connection.createChannel();//生成一个消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 0; i < 10; i++) {String message = "Hello World RabbitMQ count: " + i;//发布消息,第一个参数表示路由(Exchange名称),为""则表示使用默认消息路由channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}//关闭消息通道和连接channel.close();connection.close();}@Testpublic void consumer() throws java.io.IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");//创建连接Connection connection = factory.newConnection();//创建消息信道final Channel channel = connection.createChannel();//消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println("[*] Waiting for message. To exist press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
执行send()后控制台输出:
[x] Sent 'Hello World RabbitMQ count: 0'
[x] Sent 'Hello World RabbitMQ count: 1'
[x] Sent 'Hello World RabbitMQ count: 2'
[x] Sent 'Hello World RabbitMQ count: 3'
[x] Sent 'Hello World RabbitMQ count: 4'
[x] Sent 'Hello World RabbitMQ count: 5'
[x] Sent 'Hello World RabbitMQ count: 6'
[x] Sent 'Hello World RabbitMQ count: 7'
[x] Sent 'Hello World RabbitMQ count: 8'
[x] Sent 'Hello World RabbitMQ count: 9'
public class RabbitUtil {public static ConnectionFactory getConnectionFactory() {//创建连接工程,下面给出的是默认的caseConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("admin");factory.setVirtualHost("/");return factory;}
}
public class DirectConsumer {private static final String exchangeName = "direct.exchange";public void msgConsumer(String queueName, String routingKey) {try {MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {DirectConsumer consumer = new DirectConsumer();String[] routingKey = new String[]{"aaa", "bbb", "ccc"};String[] queueNames = new String[]{"qa", "qb", "qc"};for (int i = 0; i < 3; i++) {consumer.msgConsumer(queueNames[i], routingKey[i]);}Thread.sleep(1000 * 60 * 100);}
}
执行后的输出:
[*] Waiting for message. To exist press CTRL+C[x] Received 'hello >>> 0[x] Done[x] Received 'hello >>> 3[x] Done[x] Received 'hello >>> 6[x] Done[x] Received 'hello >>> 9[x] Done
[*] Waiting for message. To exist press CTRL+C[x] Received 'hello >>> 1[x] Done[x] Received 'hello >>> 4[x] Done[x] Received 'hello >>> 7[x] Done
[*] Waiting for message. To exist press CTRL+C[x] Received 'hello >>> 2[x] Done[x] Received 'hello >>> 5[x] Done[x] Received 'hello >>> 8[x] Done
可以看到,分别从qa、qb、qc中将不同的key的数据消费掉。
5.2.2 问题探讨
有个疑问:这个队列的名称qa、qb和qc是RabbitMQ自动生成的么,我们可以指定队列名称么?
我做了个简单的实验,我把消费者代码修改了一下:
public static void main(String[] args) throws InterruptedException {DirectConsumer consumer = new DirectConsumer();String[] routingKey = new String[]{"aaa", "bbb", "ccc"};String[] queueNames = new String[]{"qa", "qb", "qc1"}; // 将qc修改为qc1for (int i = 0; i < 3; i++) {consumer.msgConsumer(queueNames[i], routingKey[i]);}Thread.sleep(1000 * 60 * 100);
}
/*** Declare an exchange.* @see com.rabbitmq.client.AMQP.Exchange.Declare* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk* @param exchange the name of the exchange* @param type the exchange type* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* @param autoDelete true if the server should delete the exchange when it is no longer in use* @param arguments other properties (construction arguments) for the exchange* @return a declaration-confirm method to indicate the exchange was successfully declared* @throws java.io.IOException if an error is encountered*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,Map arguments) throws IOException;/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;