0%

【RabbitMQ】工作队列

关于RabbitMQ中的工作队列的介绍


1 消息分发

1.1 介绍

  • 当一个生产者发送大量消息的时候,只有一个工作线程(消费者)进行消息处理的话,速度慢。此时需要多个工作线程来对消息进行处理,RabbitMQ会按照轮询法则(默认)对消息分发到多个工作线程中

1.2 轮询发送

  • (1)工具类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /**
    * RabbitMQ工具类
    * @author letere
    * @create 2021-07-31 16:39
    */
    public class RabbitMQUtil {

    /**
    * 获取连接
    * @return Connection
    */
    public static Connection getConnection() throws Exception{
    // rabbitmq连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // ip地址
    factory.setHost("192.168.85.130");
    // 端口号
    factory.setPort(5672);
    // 用户名
    factory.setUsername("guest");
    // 密码
    factory.setPassword("guest");

    // 创建连接
    return factory.newConnection();
    }
    }
  • (2)生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /**
    * 生产者
    * @author letere
    * @create 2021-07-31 17:08
    */
    public class Producer {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();
    // 生成队列
    channel.queueDeclare("worker_queue", false, false, false, null);
    // 循环发送消息
    for (int i=1; i <= 10; i++) {
    channel.basicPublish("", "worker_queue", null, String.valueOf(i).getBytes());
    }
    System.out.println("消息发送完毕!");
    }
    }
  • (3)工作线程(消费者)
  • 同理创建两个
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * 工作线程1
    * @author letere
    * @create 2021-07-31 16:50
    */
    public class Worker01 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 产生信道
    Channel channel = connection.createChannel();
    // 消费消息
    System.out.println("worker01:等待接受消息...");
    channel.basicConsume(
    "worker_queue",
    true,
    ((consumerTag, message) -> {
    System.out.println("消费消息:" + new String(message.getBody()));
    }),
    (consumerTag -> {
    System.out.println("消息:" + consumerTag + ",消费取消!");
    })
    );
    }
    }
  • (4)测试
  • 先把工作线程启动好(可能会报404错误,因为队列还没有创建,可以事先创建队列)
  • 再启动生产者发送消息

1.3 不公平分发

  • 不公平分发: 会根据消费者对消息的处理速度进行分发,处理速度快的消费则分布多点,处理慢的分发少点(能者多劳)
  • 消费者方进行设置,信道的basicQos方法,将参数改为1,意思为:不公平分发,信道信息堆积条数为1
  • 可以改成其他数值,来定义信道堆积信息的条数,此值被称为预取值
    1
    void basicQos(int prefetchCount) throws IOException;

2 消息应答

2.1 概念

  • 当某个工作线程的工作时间过于长,一旦该工作线程发生了宕机,则此消息的处理就会发生丢失,为解决此情况,rabbitmq引入消息应答机制
  • 消息应答:消费者在接收消息,并处理完消息之后,告诉rabbitmq消息已经处理完毕,此时rabbitmq才把队列中的消息删除,确保消息不会丢失

2.2 自动应答

  • 特点:消息发送后立即就认为已经传送成功了,虽然提高了消息的吞吐量,但也会造成消息的丢失。此模式仅适用于消费者可以高效并以某种速率处理消息的情况下使用

2.3 手动应答

  • 手动应答具体分为 信道(channel) 中的三个方法
    1
    2
    3
    4
    5
    6
    // 肯定确认:确认消息成功处理,将消息删除
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

    // 否定确认:直接将消息删除,不进行确认是否完成
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • multiple - 是否批量应答
  • true:当一个消息处理完需要应答时,若信道中还有其他消息,则顺便将这些消息都进行应答
  • false:只应答当前处理完的消息

2.4 消息重新入队列

  • 如果真的出现消费者宕机导致消息未处理的情况,rabbitmq会将消息重新进入队列

2.5 代码实现

  • (1)生产者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class Provider {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();
    // 定义队列
    channel.queueDeclare("ack_queue", false, false, false, null);
    // 发送消息
    channel.basicPublish("", "ack_queue", null, "ack应答1".getBytes());
    channel.basicPublish("", "ack_queue", null, "ack应答2".getBytes());
    System.out.println("消息发送完毕!");
    }
    }
  • (2)消费者
  • 创建两个消费者,一个沉睡1秒,一个沉睡30秒
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class Consumer01 {

    public static void main(String[] args) throws Exception{
    // 获取连接
    Connection connection = RabbitMQUtil.getConnection();
    // 创建信道
    Channel channel = connection.createChannel();
    // 接受消息,并处理
    channel.basicConsume(
    "ack_queue",
    false,
    (consumerTag, message) -> {
    // 沉睡1秒
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("接收到的消息为:" + new String(message.getBody()));
    // 手动应答
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    },
    consumerTag -> {
    System.out.println("消息消费取消/终止!");
    }
    );
    }
    }
  • (3)测试
  • 事先将队列创建好,然后启动消费者,再发送消息
  • 将沉睡30秒的消费者进行关闭,去另外一个消费查看,查看消息是否重发到此消费者中

3 RabbitMQ持久化

3.1 概念

  • 有时可能会出现rabbitmq出现宕机的情况,若不对rabbitmq的队列和消息进行保存,则重启rabbitmq就会造成数据的丢失。所有需要实现rabbitmq数据的持久化

3.2 队列持久化

  • 在声明队列的方法queueDeclare中,第二个参数durable即为持久化,改为true就可持久化
    1
    2
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;
  • 持久化前,需要将原同名队列进行删除,否则报错
  • 运行程序,队列持久化

3.3 消息持久化

  • 在发布消息的方法basicPublish中,第三个参数props设置为MessageProperties.PERSISTENT_TEXT_PLAIN

    1
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  • 注意: 此消息持久化不能百分百保证消息不丢失,有可能在将消息写入磁盘的一瞬间,rabbitmq宕机了,还没将消息写入磁盘,导致消息丢失。更强有力的持久化在后面的《发布确认》进行介绍