0%

【RabbitMQ】发布确认高级

此篇为发布确认的知识补充,对在springboot实现发布确认,以及发布失败如何进行消息的重新发送的介绍


1 环境准备

1.1 依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--web启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--rabbitmq springboot启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.3</version>
</dependency>

<!--lombok注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

1.2 yaml配置

1
2
3
4
5
6
spring:
rabbitmq:
host: 192.168.85.130
port: 5672
username: guest
password: guest

1.3 定义交换机与队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@Configuration
public class ConfirmConfig {

/**
* 创建交换机
* @return DirectExchange
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange("confirm_exchange");
}

/**
* 创建队列
* @return Queue
*/
@Bean
public Queue confirmQueue() {
return QueueBuilder
.nonDurable("confirm_queue")
.build();
}

/**
* 绑定交换机和队列
* @param confirmExchange 交换机
* @param confirmQueue 队列
* @return Binding
*/
@Bean
public Binding queueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
@Qualifier("confirmQueue") Queue confirmQueue) {
return BindingBuilder
.bind(confirmQueue)
.to(confirmExchange)
.with("confirm");
}
}

1.4 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMessage")
public String sendMessage(String message) {
rabbitTemplate.convertAndSend("confirm_exchange", "confirm", message);
log.info("生产者 --- 发送消息为:{}", message);
return "消息发送成功!";
}
}

1.5 消费者

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class Consumer {

@RabbitListener(queues = "confirm_queue")
public void receiveMessage(Message message) {
log.info("消费者 --- 接收消息为:{}", new String(message.getBody()));
}
}

2 发布确认

确认生产者是否成功发布消息到交换机

2.1 配置

  • (1)yaml配置
    1
    2
    3
    4
    spring:
    rabbitmq:
    # ......
    publisher-confirm-type: correlated # 开启发布确认,消息成功发送到交换机后触发回调函数
  • (2)自定义发布确认函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Slf4j
    @Component
    public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * 配置确认回调函数
    */
    @PostConstruct
    public void init() {
    rabbitTemplate.setConfirmCallback(this);
    }

    /**
    * 发布确认回调函数
    * @param data 发送消息相关信息
    * @param ack ack应答
    * @param cause 失败原因
    */
    @Override
    public void confirm(CorrelationData data, boolean ack, String cause) {
    String id = data == null ? "" : data.getId();
    if (ack) {
    log.info("交换机 --- 消息接收成功,消息id为:{}", id);
    } else {
    log.info("交换机 --- 消息接收失败,消息id为:{},失败原因为:{}", id, cause);
    }
    }
    }
  • (3)消息发送添加具体参数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ProducerController {

    // ......

    @GetMapping("/sendMessage")
    public String sendMessage(String message) {
    // 发布确认回调需要的信息
    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(String.valueOf(System.currentTimeMillis()));

    rabbitTemplate.convertAndSend("confirm_exchange", "confirm", message, correlationData);
    log.info("生产者 --- 发送消息为:{}", message);
    return "消息发送成功!";
    }
    }

2.2 测试

  • 项目启动,浏览器输入地址:http://localhost:8080/confirm/sendMessage?message=交换机消息发布确认

3 消息回退

上面的发布确认能保证消息发送到交换机没有问题,但不能保证消息从交换机发送到队列没有问题,而消息回退在消息发送到队列出现问题时,会调用消息回退函数,进行处理

3.1 配置

  • (1)yaml配置
    1
    2
    3
    4
    spring:
    rabbitmq:
    # ......
    publisher-returns: true # 开启消息回退
  • (2)自定义消息回退函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Slf4j
    @Component
    public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * 消息回退注入
    */
    @PostConstruct
    public void init() {
    rabbitTemplate.setReturnsCallback(this);
    }

    /**
    * 消息回退方法
    * @param returnedMessage 回退信息
    */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
    log.info("被回退消息:{},原因为:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getReplyText());
    }
    }

3.2 测试

  • 在发送消息时,修改发送消息的routingkey,来故意让消息发送不到队列中,来检测消息回退是否触发

4 消息手动应答

  • SpringBoot中,消费者消息默认是自动应答,自动应答不适合生产实际业务,应改成手动
1
2
3
4
5
6
# yaml配置
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = {"queue"})
public void receiveMessage(Message message, String body, Channel channel) {
// 业务逻辑...
try {
// ack应答(消息唯一ID,是否批量应答)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
System.out.println("消息ack出现异常");
}
}

5 备份交换机

当发送消息给队列无法接收时,除了可以设置消息回退外,还可以设置备份交换机。交换机会自动将无法接收的消息,转发到备份交换机,交给备份交换机处理。【备份交换机优先级大于消息回退】

5.1 配置

  • (1)配置类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    @Slf4j
    @Configuration
    public class ConfirmConfig {

    // ......

    /**
    * 创建交换机,并关联备份交换机
    * @return DirectExchange
    */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
    // return new DirectExchange("confirm_exchange");
    return ExchangeBuilder
    .directExchange("confirm_exchange") // 交换机类型,以及交换机名
    .alternate("backup_exchange") // 备份交换机名
    .build();
    }

    /**
    * 创建备份交换机
    * @return DirectExchange
    */
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
    return new FanoutExchange("backup_exchange");
    }

    /**
    * 创建备份队列
    * @return Queue
    */
    @Bean("backupQueue")
    public Queue backupQueue() {
    return QueueBuilder
    .nonDurable("backup_queue")
    .build();
    }

    /**
    * 绑定备份交换机和备份队列
    * @param backupExchange 备份交换机
    * @param backupQueue 备份队列
    * @return Binding
    */
    @Bean
    public Binding backupBinding(@Qualifier("backupExchange") FanoutExchange backupExchange,
    @Qualifier("backupQueue") Queue backupQueue) {
    return BindingBuilder
    .bind(backupQueue)
    .to(backupExchange);
    }
    }
  • (2)消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Slf4j
    @Component
    public class Consumer {

    // ......

    /**
    * 接收备份队列的消息
    * @param message 消息
    */
    @RabbitListener(queues = "backup_queue")
    public void receiveBackupMsg(Message message) {
    log.info("消费者(备份) --- 接收消息为:{}", new String(message.getBody()));
    }
    }

5.2 测试

  • 还是和前面的消息回退一样,故意填写错routingKey,测试交换机是否将消息转发到备份交换机