0%

【RabbitMQ】其他知识点

关于RabbitMQ比较杂碎的知识点的补充


1 幂等性

1.1 概念

  • 例:用户购买商品,下单支付,点击支付后,钱扣了,但界面显示网络异常,用户刷新界面,重新点击付钱。此时再次扣钱,也形参两条流水线(以前单应用系统可以通过数据库事控制)

1.2 消息重复消费

  • 消息重复消费: MQ把消息发送给消费者,在消费者发送ack应答时,网络中断了,故MQ未收到消息,该条消息重新发送到消费者,可消费者本质上是已经成功消费了消息
  • 解决思路: 使用全局ID(唯一标识的时间戳,UUID,MQ自身的id)来进行判断,判断是否已经消费过该消息

1.3 幂等性主流解决

  • (1)唯一ID + 指纹码机制
  • 指纹码为一些规则或时间戳加别的服务给到的唯一信息码,保证唯一性。利用数据库查询此id是否存在数据库中
  • 优点:方式简单,就一个简单的拼接
  • 缺点:在高并发的情况下,单个数据库读写性能会出现瓶颈
  • (2)Redis原子性
  • 利用redis执行setnx命令,天然具有幂等性,从而不重复消费

2 优先级队列

2.1 介绍

  • 顾名思义,优先级队列中消息存在优先级,优先级高的消息,更早从MQ中发送到消费者
  • 使用场景: 订单催付,如果下订单的是大客户,优先发送消息叫客户支付,避免客户反悔取消订单。

2.2 代码

  • (1)yaml
    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    host: 192.168.85.130
    port: 5672
    username: guest
    password: guest
  • (2)配置类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @Configuration
    public class PriorityQueueConfig {

    /**
    * 构建优先级交换机
    * @return DirectExchange
    */
    @Bean
    public DirectExchange priorityExchange() {
    return ExchangeBuilder
    .directExchange("priority_exchange")
    .build();
    }

    /**
    * 构建优先级队列
    * @return Queue
    */
    @Bean
    public Queue priorityQueue() {
    return QueueBuilder
    .nonDurable("priority_queue")
    .maxPriority(10) // 设置最大优先级
    .build();
    }

    /**
    * 绑定优先级交换机和队列
    * @param priorityExchange 优先级交换机
    * @param priorityQueue 优先级队列
    * @return Binding
    */
    @Bean
    public Binding priorityBinding(@Qualifier("priorityExchange") DirectExchange priorityExchange,
    @Qualifier("priorityQueue") Queue priorityQueue) {
    return BindingBuilder
    .bind(priorityQueue)
    .to(priorityExchange)
    .with("priority");
    }
    }
  • (3)生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    @RestController
    public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg")
    public String sendMsg() {
    // map存储消息,key=消息,value=优先级
    Map<String, Integer> map = new HashMap<>();
    map.put("消息-1", 5);
    map.put("消息-2", 7);
    map.put("消息-3", 0);

    for (Map.Entry<String, Integer> entry : map.entrySet()) {
    rabbitTemplate.convertAndSend(
    "priority_exchange",
    "priority",
    entry.getKey(),
    message -> {
    // 设置消息优先级
    message.getMessageProperties().setPriority(entry.getValue());
    return message;
    });
    }

    return "消息发送成功";
    }
    }
  • (4)消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Slf4j
    @Component
    public class Consumer {

    /**
    * 监听优先级队列,并接收消息
    * @param message 消息
    */
    @RabbitListener(queues = "priority_queue")
    public void receivePriorityMsg(Message message) {
    log.info("消费者 --- 接收消息:{}", new String(message.getBody()));
    }
    }

2.3 测试

  • 先把消费者部分的代码注释掉,启动程序,调用接口http://localhost:8080/sendMsg发送信息
  • 然后再将消费者的注释去掉,重启程序,查看消费者接收消息的顺序

3 惰性队列

3.1 概念

  • 惰性队列: 将消息存放在磁盘上,可以减少内存的消耗,但相对的会降低消息的处理速度

3.2 代码

  • (1)配置类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Configuration
    public class LazyQueueConfig {

    /**
    * 构建惰性队列
    * @return Queue
    */
    @Bean
    public Queue lazyQueue() {
    return QueueBuilder
    .nonDurable("lazy_queue")
    .lazy() // 堕性队列
    .build();
    }
    }