此篇为发布确认的知识补充,对在springboot实现发布确认,以及发布失败如何进行消息的重新发送的介绍
1 环境准备
1.1 依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.3</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
|
1.2 yaml配置
1 2 3 4 5 6
| spring: rabbitmq: host: 192.168.85.130 port: 5672 username: guest password: guest
|
1.3 定义交换机与队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Slf4j @Configuration public class ConfirmConfig {
@Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange("confirm_exchange"); }
@Bean public Queue confirmQueue() { return QueueBuilder .nonDurable("confirm_queue") .build(); }
@Bean public Binding queueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue) { return BindingBuilder .bind(confirmQueue) .to(confirmExchange) .with("confirm"); } }
|
1.4 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @RestController @RequestMapping("/confirm") public class ProducerController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public String sendMessage(String message) { rabbitTemplate.convertAndSend("confirm_exchange", "confirm", message); log.info("生产者 --- 发送消息为:{}", message); return "消息发送成功!"; } }
|
1.5 消费者
1 2 3 4 5 6 7 8 9
| @Slf4j @Component public class Consumer {
@RabbitListener(queues = "confirm_queue") public void receiveMessage(Message message) { log.info("消费者 --- 接收消息为:{}", new String(message.getBody())); } }
|
2 发布确认
确认生产者是否成功发布消息到交换机
2.1 配置
- (1)yaml配置
1 2 3 4
| spring: rabbitmq: publisher-confirm-type: correlated
|
- (2)自定义发布确认函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j @Component public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); }
@Override public void confirm(CorrelationData data, boolean ack, String cause) { String id = data == null ? "" : data.getId(); if (ack) { log.info("交换机 --- 消息接收成功,消息id为:{}", id); } else { log.info("交换机 --- 消息接收失败,消息id为:{},失败原因为:{}", id, cause); } } }
|
- (3)消息发送添加具体参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @RestController @RequestMapping("/confirm") public class ProducerController {
@GetMapping("/sendMessage") public String sendMessage(String message) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(String.valueOf(System.currentTimeMillis()));
rabbitTemplate.convertAndSend("confirm_exchange", "confirm", message, correlationData); log.info("生产者 --- 发送消息为:{}", message); return "消息发送成功!"; } }
|
2.2 测试
- 项目启动,浏览器输入地址:
http://localhost:8080/confirm/sendMessage?message=交换机消息发布确认
3 消息回退
上面的发布确认能保证消息发送到交换机没有问题,但不能保证消息从交换机发送到队列没有问题,而消息回退在消息发送到队列出现问题时,会调用消息回退函数,进行处理
3.1 配置
- (1)yaml配置
1 2 3 4
| spring: rabbitmq: publisher-returns: true
|
- (2)自定义消息回退函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @Component public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setReturnsCallback(this); }
@Override public void returnedMessage(ReturnedMessage returnedMessage) { log.info("被回退消息:{},原因为:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getReplyText()); } }
|
3.2 测试
- 在发送消息时,修改发送消息的routingkey,来故意让消息发送不到队列中,来检测消息回退是否触发
4 消息手动应答
- SpringBoot中,消费者消息默认是自动应答,自动应答不适合生产实际业务,应改成手动
1 2 3 4 5 6
| spring: rabbitmq: listener: simple: acknowledge-mode: manual
|
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = {"queue"}) public void receiveMessage(Message message, String body, Channel channel) { try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); System.out.println("消息ack出现异常"); } }
|
5 备份交换机
当发送消息给队列无法接收时,除了可以设置消息回退外,还可以设置备份交换机。交换机会自动将无法接收的消息,转发到备份交换机,交给备份交换机处理。【备份交换机优先级大于消息回退】
5.1 配置
- (1)配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| @Slf4j @Configuration public class ConfirmConfig {
@Bean("confirmExchange") public DirectExchange confirmExchange() {
return ExchangeBuilder .directExchange("confirm_exchange") .alternate("backup_exchange") .build(); }
@Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange("backup_exchange"); }
@Bean("backupQueue") public Queue backupQueue() { return QueueBuilder .nonDurable("backup_queue") .build(); }
@Bean public Binding backupBinding(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue) { return BindingBuilder .bind(backupQueue) .to(backupExchange); } }
|
- (2)消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component public class Consumer {
@RabbitListener(queues = "backup_queue") public void receiveBackupMsg(Message message) { log.info("消费者(备份) --- 接收消息为:{}", new String(message.getBody())); } }
|
5.2 测试
- 还是和前面的消息回退一样,故意填写错routingKey,测试交换机是否将消息转发到备份交换机