0%

【RabbitMQ】SpringBoot整合RabbitMQ

(1)交换机、队列的创建;(2)发送消息,接收消息方法;(3)消息JSON化;


1 环境准备

1.1 依赖

1
2
3
4
5
6
7
<!--rabbitmq springboot启动器-->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.3</version>
</dependency>

1.2 yaml配置

1
2
3
4
5
6
7
spring:
# rabbitmq配置
rabbitmq:
host: 192.168.85.130 # ip地址
port: 5672 # 端口号
username: guest # 用户名
password: guest # 密码

2 声明交换机和队列

2.1 AmqpAdmin

  • 使用AmqpAdmin来构建,利用@PostConstruct在构造方法之后自动执行代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class MyRabbitMQConfig {

@Autowired
private AmqpAdmin amqpAdmin;

/**
* 初始化交换机和队列
*/
@PostConstruct
public void initExchangeAndQueue() {
// 定义交换机(交换机名,是否持久化,是否自动删除,其他参数)
amqpAdmin.declareExchange(new DirectExchange("exchange", true, false, null));
// 定义队列(队列名,是否持久化,是否排他,是否自动删除,其他参数)
amqpAdmin.declareQueue(new Queue("queue", true, false, false, null));
// 绑定交换机和队列(绑定对象名,绑定对象类型,交换机名,routingKey,其他参数)
amqpAdmin.declareBinding(new Binding("queue", Binding.DestinationType.QUEUE, "exchange", "routingKey", null));
};
}

2.2 @Bean

  • 用Builder构造交换机,队列,并注入IOC容器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class MyRabbitMQConfig {

@Bean
public Exchange myExchange() {
return ExchangeBuilder
.directExchange("exchange") // 交换机类型,以及交换机名
.durable(true) // 持久化
.build();
}

@Bean
public Queue myQueue() {
return QueueBuilder
.durable("queue") // 队列名,并持久化
.build();
}

@Bean
public Binding binding(@Qualifier("myExchange") Exchange myExchange, @Qualifier("myQueue") Queue myQueue) {
return BindingBuilder
.bind(myQueue)
.to(myExchange)
.with("routingKey")
.noargs();
}
}

3 消息发送和接收

3.1 消息发送

  • RabbitMQ提供一个模板类rabbitTemplate来封装消息的发送等操作
1
2
3
4
5
6
7
8
9
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMessage")
public String sendMessage(String message) {
// 转换并发送(交换机名,routingKey,信息(Object类))
rabbitTemplate.convertAndSend("exchange", "routingKey", message);
return "消息发送成功";
}

3.2 接收消息

  • SpringBoot中消费者的消息接收为一个监听器,通过注解@RabbitListener来表示
  • 方法上添加@RabbitListener(queues = {#队列名}),来实现消费者(该方法所在类要在IOC容器中)
1
2
3
4
5
6
7
8
9
/**
* Message类:包含MQ消息的详细信息
* <T> body:body是一个泛型,是生产者发送消息的类型
* Channel类:MQ信道的信息
*/
@RabbitListener(queues = {"queue"})
public void receiveQA(Message message, String body, Channel channel) {
System.out.println("接收到的消息为:" + body);
}

3.3 接收消息2

  • 接收消息可以@RabbitListener@RabbitHandler结合使用,可以在监听同一队列,不同消息类型的数据进行自动区分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
@RabbitListener(queues = {"queue"})
public class Consumer {
// 自动区分String类型,和Interger类型的数据
@RabbitHandler
public void receiveMessage(Message message, String body, Channel channel) {
log.info("消费者 --- 接收消息为:{}", body);
}

@RabbitHandler
public void receiveMessage(Message message, Integer body, Channel channel) {
log.info("消费者 --- 接收消息为:{}", body);
}
}

4 消息JSON化

  • 生产者发送消息给RabbitMQ默认是使用JDK序列化,建议改成JSON形式
1
2
3
4
5
6
7
8
9
@Configuration
public class MyRabbitMQConfig {

@Bean
public MessageConverter messageConverter() {
// 将json消息转化器注入IOC中,替换默认消息转换器
return new JsonbMessageConverter();
}
}