0%

【RabbitMQ】死信队列

死信队列的介绍以及处理


1 死信队列

1.1 概念

  • 死信: 即无法被消费的信息,由于特殊原因导致队列中的消息无法被消费,此些消息无法进行后续处理,变成死信,例如消费者宕机,无人处理消息

1.2 产生原因

  • (1)消息存活时间(TTL:Time To Live)过期
  • (2)队列已满,无法将消息添加进队列中
  • (3)消息被拒绝(basic.rejectbasic.nack)并且不再放回队列requeue=false

2 死信演示

2.1 代码架构图

2.2 TTL过期

  • (1)生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /**
    * 生产者 - 发送两条消息到普通交换机上
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Provider {

    public static void main(String[] args) throws Exception{
    // (工具类,具体看之间文章)获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();

    // 定义消息过期时间
    AMQP.BasicProperties properties = new AMQP.BasicProperties()
    .builder().expiration(String.valueOf(5 * 1000)).build();
    // 连续发送10条消息
    for (int i=0; i<10; i++) {
    channel.basicPublish("normal_exchange", "456", properties, String.valueOf(i).getBytes());
    }
    System.out.println("消息发送完毕!");
    }
    }

  • (2)消费者01
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    /**
    * 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Consumer01 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();

    /*
    由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
    然后第一个启动此程序就构建好所有关系
    */
    Consumer01 consumer01 = new Consumer01();
    consumer01.buildRelation(channel);

    // 接受消息
    System.out.println("普通队列消费者等待接收消息...");
    channel.basicConsume(
    "normal_queue",
    true,
    (consumerTag, message) -> {
    System.out.println("接收消息:" + new String(message.getBody()));
    },
    consumerTag -> {
    System.out.println("消息接收中止!");
    }
    );
    }

    /**
    * 构建交换机和队列之间关系
    * @param channel 信道
    */
    public void buildRelation(Channel channel) throws Exception{
    // 定义死信交换机
    channel.exchangeDeclare("dead_exchange", "direct");
    // 定义死信队列
    channel.queueDeclare("dead_queue", false, false, false, null);
    // 交换机与队列进行关联
    channel.queueBind("dead_queue", "dead_exchange", "123");

    // 定义普通交换机
    channel.exchangeDeclare("normal_exchange", "direct");
    // 定义普通队列
    Map<String, Object> argument = new HashMap<>();
    argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
    argument.put("x-dead-letter-routing-key", "123");
    channel.queueDeclare("normal_queue", false, false, false, argument);
    // 交换机与队列进行关联
    channel.queueBind("normal_queue", "normal_exchange", "456");
    }
    }
  • (3)消费者02
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * 消费者02 - 负责处理死信队列的
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Consumer02 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();
    // 接收消息
    System.out.println("死信队列消费者等待接收消息...");
    channel.basicConsume(
    "dead_queue",
    true,
    (consumerTag, message) -> {
    System.out.println("接收消息:" + new String(message.getBody()));
    },
    consumerTag -> {
    System.out.println("消息接收中止!");
    }
    );
    }
    }
  • (4)测试
  • 先启动消费者01,来构建交换机和队列之间的关系,然后关闭消费者01
  • 然后启动消费者02,生产者
  • 因为消费者01没有启动,队列中的消息无法处理,达到过期时间,自动将消息发送到死信交换机

2.3 超出队列最大长度

  • (1)生产者
  • 连续发送10条消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 生产者 - 发送两条消息到普通交换机上
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Provider {

    public static void main(String[] args) throws Exception{
    // (工具类,具体看之前文章)获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();

    // 连续发送10条消息
    for (int i=0; i<10; i++) {
    channel.basicPublish("normal_exchange", "456", null, String.valueOf(i).getBytes());
    }
    System.out.println("消息发送完毕!");
    }
    }
  • (2)消费者01
  • 修改队列最大长度为5
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    /**
    * 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Consumer01 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();

    /*
    由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
    然后第一个启动此程序就构建好所有关系
    */
    Consumer01 consumer01 = new Consumer01();
    consumer01.buildRelation(channel);

    // 接受消息
    System.out.println("普通队列消费者等待接收消息...");
    channel.basicConsume(
    "normal_queue",
    true,
    (consumerTag, message) -> {
    System.out.println("接收消息:" + new String(message.getBody()));
    },
    consumerTag -> {
    System.out.println("消息接收中止!");
    }
    );
    }

    /**
    * 构建交换机和队列之间关系
    * @param channel 信道
    */
    public void buildRelation(Channel channel) throws Exception{
    // 定义死信交换机
    channel.exchangeDeclare("dead_exchange", "direct");
    // 定义死信队列
    channel.queueDeclare("dead_queue", false, false, false, null);
    // 交换机与队列进行关联
    channel.queueBind("dead_queue", "dead_exchange", "123");

    // 定义普通交换机
    channel.exchangeDeclare("normal_exchange", "direct");
    // 定义普通队列
    Map<String, Object> argument = new HashMap<>();
    argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
    argument.put("x-dead-letter-routing-key", "123");
    argument.put("x-max-length", 5); // 修改最大队列长度为5
    channel.queueDeclare("normal_queue", false, false, false, argument);
    // 交换机与队列进行关联
    channel.queueBind("normal_queue", "normal_exchange", "456");
    }
    }
  • (3)消费者02
  • 跟前面一样,不多重复
  • (4)测试
  • 因为normal_queue参数发生变化,需要事前删除该队列,然后启动consumer01,再关闭consumer01,导致消息堆积
  • 依次启动consumer02,生产者

2.4 消息被拒

  • (1)生产者
  • 跟前面的没有发生变化,不多展示
  • (2)消费者01
  • 将消息改为手动应答,并对消息5进行拒绝应答
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    /**
    * 消费者01 - 负责处理普通队列的消息,并构建整个RabbitMQ的关系
    * @author letere
    * @create 2021-08-05 11:10
    */
    public class Consumer01 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();

    /*
    由于关系比较复杂,为了方便启动,将所有交换机和队列关系都在此定义
    然后第一个启动此程序就构建好所有关系
    */
    Consumer01 consumer01 = new Consumer01();
    consumer01.buildRelation(channel);

    // 接受消息(开启手动应答)
    System.out.println("普通队列消费者等待接收消息...");
    channel.basicConsume(
    "normal_queue",
    false,
    (consumerTag, message) -> {
    if (new String(message.getBody()).equals("5")) {
    System.out.println("此消息:" + new String(message.getBody()) + ",已被拒绝,成为死信");
    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    } else {
    System.out.println("接收消息:" + new String(message.getBody()));
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    }
    },
    consumerTag -> {
    System.out.println("消息接收中止!");
    }
    );
    }

    /**
    * 构建交换机和队列之间关系
    * @param channel 信道
    */
    public void buildRelation(Channel channel) throws Exception{
    // 定义死信交换机
    channel.exchangeDeclare("dead_exchange", "direct");
    // 定义死信队列
    channel.queueDeclare("dead_queue", false, false, false, null);
    // 交换机与队列进行关联
    channel.queueBind("dead_queue", "dead_exchange", "123");

    // 定义普通交换机
    channel.exchangeDeclare("normal_exchange", "direct");
    // 定义普通队列
    Map<String, Object> argument = new HashMap<>();
    argument.put("x-dead-letter-exchange", "dead_exchange"); // 关联死信交换机(该队列出现死信后,自动发送到该交换机中)
    argument.put("x-dead-letter-routing-key", "123");
    channel.queueDeclare("normal_queue", false, false, false, argument);
    // 交换机与队列进行关联
    channel.queueBind("normal_queue", "normal_exchange", "456");
    }
    }
  • (3)消费者02
  • 跟前面一样,不多展示
  • (4)测试
  • 因为normal_queue参数发生变化,需要事先删除,再启动consumer01
  • 依次启动consumer02,生产者