Springboot RocketMQ整合—官方原版
创始人
2025-06-01 07:35:41
0

一、添加maven依赖:

org.apache.rocketmqrocketmq-spring-boot-starter${RELEASE.VERSION}

二、发送消息

1、修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

2、编写代码

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{@Resourceprivate RocketMQTemplate rocketMQTemplate;public static void main(String[] args){SpringApplication.run(ProducerApplication.class, args);}public void run(String... args) throws Exception {//send message synchronouslyrocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");//send spring messagerocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());//send messgae asynchronouslyrocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.printf("async onSucess SendResult=%s %n", var1);}@Overridepublic void onException(Throwable var1) {System.out.printf("async onException Throwable=%s %n", var1);}});//Send messages orderlyrocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")//rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate}@Data@AllArgsConstructorpublic class OrderPaidEvent implements Serializable{private String orderId;private BigDecimal paidMoney;}
}

三、接收消息

1、Push模式

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication
public class ConsumerApplication{public static void main(String[] args){SpringApplication.run(ConsumerApplication.class, args);}@Slf4j@Service@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")public class MyConsumer1 implements RocketMQListener{public void onMessage(String message) {log.info("received message: {}", message);}}@Slf4j@Service@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")public class MyConsumer2 implements RocketMQListener{public void onMessage(OrderPaidEvent orderPaidEvent) {log.info("received orderPaidEvent: {}", orderPaidEvent);}}
}

2、Pull模式

从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test
注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

编写代码

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resource(name = "extRocketMQTemplate")private RocketMQTemplate extRocketMQTemplate;public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Overridepublic void run(String... args) throws Exception {//This is an example of pull consumer using rocketMQTemplate.List messages = rocketMQTemplate.receive(String.class);System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);//This is an example of pull consumer using extRocketMQTemplate.messages = extRocketMQTemplate.receive(String.class);System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);}
}

四、事务消息

修改application.properties

## application.propertiesrocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{@Resourceprivate RocketMQTemplate rocketMQTemplate;public static void main(String[] args){SpringApplication.run(ProducerApplication.class, args);}public void run(String... args) throws Exception {try {// Build a SpringMessage for sending in transactionMessage msg = MessageBuilder.withPayload(..)...;// In sendMessageInTransaction(), the first parameter transaction name ("test")// must be same with the @RocketMQTransactionListener's member field 'transName'rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);} catch (MQClientException e) {e.printStackTrace(System.out);}}// Define transaction listener with the annotation @RocketMQTransactionListener@RocketMQTransactionListenerclass TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// ... local transaction process, return bollback, commit or unknownreturn RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return bollback, commit or unknownreturn RocketMQLocalTransactionState.COMMIT;}}
}

五、消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-grouprocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1",enableMsgTrace = true,customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener {...
}
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。
若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD。

六、ACL功能

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-grouprocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1",accessKey = "AK",secretKey = "SK"
)
public class MyConsumer implements RocketMQListener {...
}
注意:
可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...