RabbitMQ安装部署详情可见:RabbitMQ简介及在Linux中安装部署(yum)
com.rabbitmq amqp-client 5.8.0
org.slf4j slf4j-nop 1.7.29
发送消息类:Send.java
public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发布消息String message = "你好,老6";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("发送了消息:" + message);//关闭连接channel.close();connection.close();}
}
接收消息类:Recv.java
public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//接收消息并消费channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}});}
}
运行结果:
发送多任务消息类NewTask.java
public class NewTask {private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);for (int i = 0; i < 10; i++) {String message;if (i % 2 == 0) {message = i + "...";} else {message = String.valueOf(i);}channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("发送了消息:" + message);}channel.close();connection.close();}
}
接收多任务消息类Worker.java
public class Worker {private final static String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ地址factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");//建立连接Connection connection = factory.newConnection();//获得信道final Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("开始接收消息");channel.basicQos(1);channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到了消息:" + message);try {doWork(message);} finally {System.out.println("消息处理完成");channel.basicAck(envelope.getDeliveryTag(), false);}}});}private static void doWork(String task) {char[] chars = task.toCharArray();for (char ch : chars) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}
运行结果:
发送消息类:EmitLogDirect.java
public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String message = "info:Hello World!";channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为info,消息内容:" + message);message = "warning:Hello World!";channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为warning,消息内容:" + message);message = "error:Hello World!";channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));System.out.println("发送了消息," + "等级为error,消息内容:" + message);channel.close();connection.close();}
}
接收所有消息类:ReceiveLogsDirect1.java
public class ReceiveLogsDirect1 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//生成一个随机的临时的queueString queueName = channel.queueDeclare().getQueue();//一个交换机同时绑定3个queuechannel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("开始接收消息");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}};channel.basicConsume(queueName, true, consumer);}
}
仅仅接收error消息类:ReceiveLogsDirect2.java
public class ReceiveLogsDirect2 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.119.129");factory.setUsername("admin");factory.setPassword("password");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//生成一个随机的临时的queueString queueName = channel.queueDeclare().getQueue();//一个交换机绑定1个queuechannel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("开始接收消息");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("收到消息:" + message);}};channel.basicConsume(queueName, true, consumer);}
}
运行结果: