目录
1. 直连交换机(Direct实战)
provider生产者(publisher)
consumer消费者
2. 主题交换机(Topic实战)
provider生产者(publisher)
consumer消费者
3. 扇形交换机(Fanout实战)
provider生产者(publisher)
consumer消费者
想学习RabbitMQ基础的请阅读下边博文链接
RabbitMQ【基本使用】_JoneClassMate的博客-CSDN博客
DirectConfig
package com.jmh.provider.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 蒋明辉* @data 2022/11/25 19:02*/
@Configuration
@SuppressWarnings("all")
public class DirectConfig {/*** 创建队列*/@Beanpublic Queue directQueueA(){return new Queue("directQueueA",true);}@Beanpublic Queue directQueueB(){return new Queue("directQueueB",true);}@Beanpublic Queue directQueueC(){return new Queue("directQueueC",true);}/*** 创建交换机*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("directExchange");}/*** 设置队列和交换机的绑定*/@Beanpublic Binding bindingA(){return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");}@Beanpublic Binding bindingB(){return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");}@Beanpublic Binding bindingC(){return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");}}
package com.jmh.provider.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 蒋明辉* @data 2022/11/25 19:08*/
@RestController
@SuppressWarnings("all")
public class ProviderController {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 直连交换机* @param key* @return*/@RequestMapping("/directSend")public String directSend(String key){rabbitTemplate.convertAndSend("directExchange",key,"Hello World");return "yes";}}
DirectReceiverA
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueA")
@Slf4j
public class DirectReceiverA {@RabbitHandlerpublic void info(String msg){log.info("A接收到了"+msg);}}
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueB")
@Slf4j
public class DirectReceiverB {@RabbitHandlerpublic void info(String msg){log.info("B接收到了"+msg);}
}
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueC")
@Slf4j
public class DirectReceiverC {@RabbitHandlerpublic void info(String msg){log.info("C接收到了"+msg);}
}
TopicConfig
package com.jmh.provider.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 蒋明辉* @data 2022/11/25 19:02*/
@Configuration
@SuppressWarnings("all")
public class TopicConfig {private static final String KEY_A="*.a.*";private static final String KEY_B="*.*.a";private static final String KEY_C="a.#";/*** 创建队列*/@Beanpublic Queue topicQueueA(){return new Queue("topicQueueA",true);}@Beanpublic Queue topicQueueB(){return new Queue("topicQueueB",true);}@Beanpublic Queue topicQueueC(){return new Queue("topicQueueC",true);}/*** 创建交换机*/@Beanpublic TopicExchange topicExchange(){return new TopicExchange("topicExchange");}/*** 设置队列和交换机的绑定*/@Beanpublic Binding topicBindingA(){return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);}@Beanpublic Binding topicBindingB(){return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);}@Beanpublic Binding topicBindingC(){return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);}}
package com.jmh.provider.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 蒋明辉* @data 2022/11/25 19:08*/
@RestController
@SuppressWarnings("all")
public class ProviderController {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 主题交换机* @param key* @return*/@RequestMapping("/topicSend")public String topicSend(String key){rabbitTemplate.convertAndSend("topicExchange",key,"Hello World");return "yes";}}
TopicReceiverA
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class TopicReceiverA {@RabbitHandlerpublic void info(String msg){log.info("A接收到了"+msg);}}
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueB")
@Slf4j
public class TopicReceiverB {@RabbitHandlerpublic void info(String msg){log.info("B接收到了"+msg);}}
package com.jmh.consumer.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author 蒋明辉* @data 2022/11/25 19:12*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueC")
@Slf4j
public class TopicReceiverC {@RabbitHandlerpublic void info(String msg){log.info("C接收到了"+msg);}}
FanoutConfig
package com.jmh.provider.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 蒋明辉* @data 2022/11/25 19:02*/
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {/*** 创建队列*/@Beanpublic Queue fanoutQueueA(){return new Queue("fanoutQueueA",true);}@Beanpublic Queue fanoutQueueB(){return new Queue("fanoutQueueB",true);}@Beanpublic Queue fanoutQueueC(){return new Queue("fanoutQueueC",true);}/*** 创建交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}/*** 设置队列和交换机的绑定*/@Beanpublic Binding fanoutBindingA(){return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());}@Beanpublic Binding fanoutBindingB(){return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());}@Beanpublic Binding fanoutBindingC(){return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());}}
package com.jmh.provider.controller;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 蒋明辉* @data 2022/11/25 19:08*/
@RestController
@SuppressWarnings("all")
public class ProviderController {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 扇形交换机* @param key* @return*/@RequestMapping("/fanoutSend")public String fanoutSend(){rabbitTemplate.convertAndSend("fanoutExchange",null,"Hello World");return "yes";}}
FanoutConfig
package com.jmh.provider.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author 蒋明辉* @data 2022/11/25 19:02*/
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {/*** 创建队列*/@Beanpublic Queue fanoutQueueA(){return new Queue("fanoutQueueA",true);}@Beanpublic Queue fanoutQueueB(){return new Queue("fanoutQueueB",true);}@Beanpublic Queue fanoutQueueC(){return new Queue("fanoutQueueC",true);}/*** 创建交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}/*** 设置队列和交换机的绑定*/@Beanpublic Binding fanoutBindingA(){return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());}@Beanpublic Binding fanoutBindingB(){return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());}@Beanpublic Binding fanoutBindingC(){return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());}}