关于RabbitMQ比较杂碎的知识点的补充
1 幂等性
1.1 概念
- 例:用户购买商品,下单支付,点击支付后,钱扣了,但界面显示网络异常,用户刷新界面,重新点击付钱。此时再次扣钱,也形参两条流水线(以前单应用系统可以通过数据库事控制)
1.2 消息重复消费
- 消息重复消费: MQ把消息发送给消费者,在消费者发送ack应答时,网络中断了,故MQ未收到消息,该条消息重新发送到消费者,可消费者本质上是已经成功消费了消息
- 解决思路: 使用全局ID(唯一标识的时间戳,UUID,MQ自身的id)来进行判断,判断是否已经消费过该消息
1.3 幂等性主流解决
- (1)唯一ID + 指纹码机制
- 指纹码为一些规则或时间戳加别的服务给到的唯一信息码,保证唯一性。利用数据库查询此id是否存在数据库中
- 优点:方式简单,就一个简单的拼接
- 缺点:在高并发的情况下,单个数据库读写性能会出现瓶颈
- (2)Redis原子性
- 利用redis执行setnx命令,天然具有幂等性,从而不重复消费
2 优先级队列
2.1 介绍
- 顾名思义,优先级队列中消息存在优先级,优先级高的消息,更早从MQ中发送到消费者
- 使用场景: 订单催付,如果下订单的是大客户,优先发送消息叫客户支付,避免客户反悔取消订单。
2.2 代码
- (1)yaml
1
2
3
4
5
6spring:
rabbitmq:
host: 192.168.85.130
port: 5672
username: guest
password: guest
- (2)配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class PriorityQueueConfig {
/**
* 构建优先级交换机
* @return DirectExchange
*/
public DirectExchange priorityExchange() {
return ExchangeBuilder
.directExchange("priority_exchange")
.build();
}
/**
* 构建优先级队列
* @return Queue
*/
public Queue priorityQueue() {
return QueueBuilder
.nonDurable("priority_queue")
.maxPriority(10) // 设置最大优先级
.build();
}
/**
* 绑定优先级交换机和队列
* @param priorityExchange 优先级交换机
* @param priorityQueue 优先级队列
* @return Binding
*/
public Binding priorityBinding( DirectExchange priorityExchange,
Queue priorityQueue){
return BindingBuilder
.bind(priorityQueue)
.to(priorityExchange)
.with("priority");
}
}
- (3)生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ProducerController {
private RabbitTemplate rabbitTemplate;
public String sendMsg() {
// map存储消息,key=消息,value=优先级
Map<String, Integer> map = new HashMap<>();
map.put("消息-1", 5);
map.put("消息-2", 7);
map.put("消息-3", 0);
for (Map.Entry<String, Integer> entry : map.entrySet()) {
rabbitTemplate.convertAndSend(
"priority_exchange",
"priority",
entry.getKey(),
message -> {
// 设置消息优先级
message.getMessageProperties().setPriority(entry.getValue());
return message;
});
}
return "消息发送成功";
}
}
- (4)消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Consumer {
/**
* 监听优先级队列,并接收消息
* @param message 消息
*/
public void receivePriorityMsg(Message message) {
log.info("消费者 --- 接收消息:{}", new String(message.getBody()));
}
}
2.3 测试
- 先把消费者部分的代码注释掉,启动程序,调用接口
http://localhost:8080/sendMsg
发送信息 - 然后再将消费者的注释去掉,重启程序,查看消费者接收消息的顺序
3 惰性队列
3.1 概念
- 惰性队列: 将消息存放在磁盘上,可以减少内存的消耗,但相对的会降低消息的处理速度
3.2 代码
- (1)配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LazyQueueConfig {
/**
* 构建惰性队列
* @return Queue
*/
public Queue lazyQueue() {
return QueueBuilder
.nonDurable("lazy_queue")
.lazy() // 堕性队列
.build();
}
}