(1)交换机、队列的创建;(2)发送消息,接收消息方法;(3)消息JSON化;
1 环境准备
1.1 依赖
1 2 3 4 5 6 7
|
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.3</version> </dependency>
|
1.2 yaml配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.85.130 port: 5672 username: guest password: guest
|
2 声明交换机和队列
2.1 AmqpAdmin
- 使用AmqpAdmin来构建,利用
@PostConstruct
在构造方法之后自动执行代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration public class MyRabbitMQConfig {
@Autowired private AmqpAdmin amqpAdmin;
@PostConstruct public void initExchangeAndQueue() { amqpAdmin.declareExchange(new DirectExchange("exchange", true, false, null)); amqpAdmin.declareQueue(new Queue("queue", true, false, false, null)); amqpAdmin.declareBinding(new Binding("queue", Binding.DestinationType.QUEUE, "exchange", "routingKey", null)); }; }
|
2.2 @Bean
- 用Builder构造交换机,队列,并注入IOC容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class MyRabbitMQConfig {
@Bean public Exchange myExchange() { return ExchangeBuilder .directExchange("exchange") .durable(true) .build(); }
@Bean public Queue myQueue() { return QueueBuilder .durable("queue") .build(); }
@Bean public Binding binding(@Qualifier("myExchange") Exchange myExchange, @Qualifier("myQueue") Queue myQueue) { return BindingBuilder .bind(myQueue) .to(myExchange) .with("routingKey") .noargs(); } }
|
3 消息发送和接收
3.1 消息发送
- RabbitMQ提供一个模板类
rabbitTemplate
来封装消息的发送等操作
1 2 3 4 5 6 7 8 9
| @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public String sendMessage(String message) { rabbitTemplate.convertAndSend("exchange", "routingKey", message); return "消息发送成功"; }
|
3.2 接收消息
- SpringBoot中消费者的消息接收为一个监听器,通过注解
@RabbitListener
来表示
- 方法上添加
@RabbitListener(queues = {#队列名})
,来实现消费者(该方法所在类要在IOC容器中)
1 2 3 4 5 6 7 8 9
|
@RabbitListener(queues = {"queue"}) public void receiveQA(Message message, String body, Channel channel) { System.out.println("接收到的消息为:" + body); }
|
3.3 接收消息2
- 接收消息可以
@RabbitListener
和@RabbitHandler
结合使用,可以在监听同一队列,不同消息类型的数据进行自动区分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component @RabbitListener(queues = {"queue"}) public class Consumer { @RabbitHandler public void receiveMessage(Message message, String body, Channel channel) { log.info("消费者 --- 接收消息为:{}", body); }
@RabbitHandler public void receiveMessage(Message message, Integer body, Channel channel) { log.info("消费者 --- 接收消息为:{}", body); } }
|
4 消息JSON化
- 生产者发送消息给RabbitMQ默认是使用JDK序列化,建议改成JSON形式
1 2 3 4 5 6 7 8 9
| @Configuration public class MyRabbitMQConfig {
@Bean public MessageConverter messageConverter() { return new JsonbMessageConverter(); } }
|