死信队列的介绍以及处理
1 死信队列
1.1 概念
- 死信: 即无法被消费的信息,由于特殊原因导致队列中的消息无法被消费,此些消息无法进行后续处理,变成死信,例如消费者宕机,无人处理消息
1.2 产生原因
- (1)消息存活时间(TTL:Time To Live)过期
- (2)队列已满,无法将消息添加进队列中
- (3)消息被拒绝(
basic.reject
或basic.nack
)并且不再放回队列requeue=false
2 死信演示
2.1 代码架构图
2.2 TTL过期
- (1)生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* 生产者 - 发送两条消息到普通交换机上
* @author letere
* @create 2021-08-05 11:10
*/
public class Provider {
public static void main(String[] args) throws Exception{
// (工具类,具体看之间文章)获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 定义消息过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration(String.valueOf(5 * 1000)).build();
// 连续发送10条消息
for (int i=0; i<10; i++) {
channel.basicPublish("normal_exchange", "456", properties, String.valueOf(i).getBytes());
}
System.out.println("消息发送完毕!");
}
}
- (2)消费者01
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57/**
* 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
* @author letere
* @create 2021-08-05 11:10
*/
public class Consumer01 {
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
然后第一个启动此程序就构建好所有关系
*/
Consumer01 consumer01 = new Consumer01();
consumer01.buildRelation(channel);
// 接受消息
System.out.println("普通队列消费者等待接收消息...");
channel.basicConsume(
"normal_queue",
true,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收中止!");
}
);
}
/**
* 构建交换机和队列之间关系
* @param channel 信道
*/
public void buildRelation(Channel channel) throws Exception{
// 定义死信交换机
channel.exchangeDeclare("dead_exchange", "direct");
// 定义死信队列
channel.queueDeclare("dead_queue", false, false, false, null);
// 交换机与队列进行关联
channel.queueBind("dead_queue", "dead_exchange", "123");
// 定义普通交换机
channel.exchangeDeclare("normal_exchange", "direct");
// 定义普通队列
Map<String, Object> argument = new HashMap<>();
argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
argument.put("x-dead-letter-routing-key", "123");
channel.queueDeclare("normal_queue", false, false, false, argument);
// 交换机与队列进行关联
channel.queueBind("normal_queue", "normal_exchange", "456");
}
}
- (3)消费者02
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26/**
* 消费者02 - 负责处理死信队列的
* @author letere
* @create 2021-08-05 11:10
*/
public class Consumer02 {
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 接收消息
System.out.println("死信队列消费者等待接收消息...");
channel.basicConsume(
"dead_queue",
true,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收中止!");
}
);
}
}
- (4)测试
- 先启动消费者01,来构建交换机和队列之间的关系,然后关闭消费者01
- 然后启动消费者02,生产者
- 因为消费者01没有启动,队列中的消息无法处理,达到过期时间,自动将消息发送到死信交换机
2.3 超出队列最大长度
- (1)生产者
- 连续发送10条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* 生产者 - 发送两条消息到普通交换机上
* @author letere
* @create 2021-08-05 11:10
*/
public class Provider {
public static void main(String[] args) throws Exception{
// (工具类,具体看之前文章)获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 连续发送10条消息
for (int i=0; i<10; i++) {
channel.basicPublish("normal_exchange", "456", null, String.valueOf(i).getBytes());
}
System.out.println("消息发送完毕!");
}
}
- (2)消费者01
- 修改队列最大长度为5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58/**
* 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
* @author letere
* @create 2021-08-05 11:10
*/
public class Consumer01 {
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
然后第一个启动此程序就构建好所有关系
*/
Consumer01 consumer01 = new Consumer01();
consumer01.buildRelation(channel);
// 接受消息
System.out.println("普通队列消费者等待接收消息...");
channel.basicConsume(
"normal_queue",
true,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收中止!");
}
);
}
/**
* 构建交换机和队列之间关系
* @param channel 信道
*/
public void buildRelation(Channel channel) throws Exception{
// 定义死信交换机
channel.exchangeDeclare("dead_exchange", "direct");
// 定义死信队列
channel.queueDeclare("dead_queue", false, false, false, null);
// 交换机与队列进行关联
channel.queueBind("dead_queue", "dead_exchange", "123");
// 定义普通交换机
channel.exchangeDeclare("normal_exchange", "direct");
// 定义普通队列
Map<String, Object> argument = new HashMap<>();
argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
argument.put("x-dead-letter-routing-key", "123");
argument.put("x-max-length", 5); // 修改最大队列长度为5
channel.queueDeclare("normal_queue", false, false, false, argument);
// 交换机与队列进行关联
channel.queueBind("normal_queue", "normal_exchange", "456");
}
}
- (3)消费者02
- 跟前面一样,不多重复
- (4)测试
- 因为
normal_queue
参数发生变化,需要事前删除该队列,然后启动consumer01,再关闭consumer01,导致消息堆积 - 依次启动consumer02,生产者
2.4 消息被拒
- (1)生产者
- 跟前面的没有发生变化,不多展示
- (2)消费者01
- 将消息改为手动应答,并对消息
5
进行拒绝应答1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63/**
* 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
* @author letere
* @create 2021-08-05 11:10
*/
public class Consumer01 {
public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
然后第一个启动此程序就构建好所有关系
*/
Consumer01 consumer01 = new Consumer01();
consumer01.buildRelation(channel);
// 接受消息(开启手动应答)
System.out.println("普通队列消费者等待接收消息...");
channel.basicConsume(
"normal_queue",
false,
(consumerTag, message) -> {
if (new String(message.getBody()).equals("5")) {
System.out.println("此消息:" + new String(message.getBody()) + ",已被拒绝,成为死信");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("接收消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
},
consumerTag -> {
System.out.println("消息接收中止!");
}
);
}
/**
* 构建交换机和队列之间关系
* @param channel 信道
*/
public void buildRelation(Channel channel) throws Exception{
// 定义死信交换机
channel.exchangeDeclare("dead_exchange", "direct");
// 定义死信队列
channel.queueDeclare("dead_queue", false, false, false, null);
// 交换机与队列进行关联
channel.queueBind("dead_queue", "dead_exchange", "123");
// 定义普通交换机
channel.exchangeDeclare("normal_exchange", "direct");
// 定义普通队列
Map<String, Object> argument = new HashMap<>();
argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
argument.put("x-dead-letter-routing-key", "123");
channel.queueDeclare("normal_queue", false, false, false, argument);
// 交换机与队列进行关联
channel.queueBind("normal_queue", "normal_exchange", "456");
}
}
- (3)消费者02
- 跟前面一样,不多展示
- (4)测试
- 因为
normal_queue
参数发生变化,需要事先删除,再启动consumer01 - 依次启动consumer02,生产者