org.apache.rocketmq rocketmq-spring-boot-starter ${RELEASE.VERSION}
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{@Resourceprivate RocketMQTemplate rocketMQTemplate;public static void main(String[] args){SpringApplication.run(ProducerApplication.class, args);}public void run(String... args) throws Exception {//send message synchronouslyrocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");//send spring messagerocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());//send messgae asynchronouslyrocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {@Overridepublic void onSuccess(SendResult var1) {System.out.printf("async onSucess SendResult=%s %n", var1);}@Overridepublic void onException(Throwable var1) {System.out.printf("async onException Throwable=%s %n", var1);}});//Send messages orderlyrocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")//rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate}@Data@AllArgsConstructorpublic class OrderPaidEvent implements Serializable{private String orderId;private BigDecimal paidMoney;}
}
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication
public class ConsumerApplication{public static void main(String[] args){SpringApplication.run(ConsumerApplication.class, args);}@Slf4j@Service@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")public class MyConsumer1 implements RocketMQListener{public void onMessage(String message) {log.info("received message: {}", message);}}@Slf4j@Service@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")public class MyConsumer2 implements RocketMQListener{public void onMessage(OrderPaidEvent orderPaidEvent) {log.info("received orderPaidEvent: {}", orderPaidEvent);}}
}
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test
注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.
编写代码
@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Resource(name = "extRocketMQTemplate")private RocketMQTemplate extRocketMQTemplate;public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Overridepublic void run(String... args) throws Exception {//This is an example of pull consumer using rocketMQTemplate.List messages = rocketMQTemplate.receive(String.class);System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);//This is an example of pull consumer using extRocketMQTemplate.messages = extRocketMQTemplate.receive(String.class);System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);}
}
修改application.properties
## application.propertiesrocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group
注意:
请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{@Resourceprivate RocketMQTemplate rocketMQTemplate;public static void main(String[] args){SpringApplication.run(ProducerApplication.class, args);}public void run(String... args) throws Exception {try {// Build a SpringMessage for sending in transactionMessage msg = MessageBuilder.withPayload(..)...;// In sendMessageInTransaction(), the first parameter transaction name ("test")// must be same with the @RocketMQTransactionListener's member field 'transName'rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);} catch (MQClientException e) {e.printStackTrace(System.out);}}// Define transaction listener with the annotation @RocketMQTransactionListener@RocketMQTransactionListenerclass TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// ... local transaction process, return bollback, commit or unknownreturn RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// ... check transaction status and return bollback, commit or unknownreturn RocketMQLocalTransactionState.COMMIT;}}
}
Producer 端要想使用消息轨迹,需要多配置两个配置项:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-grouprocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1",enableMsgTrace = true,customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener {...
}
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。
若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD。
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-grouprocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1",accessKey = "AK",secretKey = "SK"
)
public class MyConsumer implements RocketMQListener {...
}
注意:
可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值
上一篇:普乐蛙VR虚拟现实体验馆设备VR文旅景区项目VR体验馆项目案例
下一篇:检修大众迈腾2.0T空调不制冷故障(检修大众迈腾20t空调不制冷故障怎么办) 大众迈腾空调不制冷的解决办法 2020款大众迈腾空调不制冷