0%

【RabbitMQ】RabbitMQ快速开始

关于rabbitmq的快速开始(Hello World)


1 快速开始

以Java来实现生产者 - 队列 - 消费者之间关系

1.1 生产者

  • (1)准备依赖
    1
    2
    3
    4
    5
    6
    7
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <!-- RabbitMQ Java客户端 -->
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.0</version>
    </dependency>
  • (2)生产者构建
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    /**
    * 消息生产者
    * @author letere
    * @create 2021-07-31 14:55
    */
    public class Producer {

    public static void main(String[] args) throws Exception{
    Producer producer = new Producer();

    // 连接rabbitmq
    Connection connection = producer.connectRabbitMQ();
    // 创建信道
    Channel channel = connection.createChannel();
    /*
    生成队列
    参数1:队列名
    参数2:是否将消息持久化到磁盘(默认false)
    参数3:是否值提供一个消费者进行消费
    参数4:是否自动删除队列(当没有消费者进行连接的时候)
    参数5:其他参数配置
    */
    channel.queueDeclare("my_queue", false, false, false, null);
    /*
    发布消息
    参数1:交换机名
    参数2:路由key值
    参数3:其他参数配置
    参数4:发布消息的内容(二进制数组)
    */
    channel.basicPublish("", "my_queue", null, "Hello World".getBytes());
    System.out.println("消息发送完毕!");
    }

    /**
    * 连接rabbitmq
    * @return Connection
    * @throws Exception
    */
    private Connection connectRabbitMQ() throws Exception{
    // rabbitmq连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // ip地址
    factory.setHost("192.168.85.130");
    // 端口号
    factory.setPort(5672);
    // 用户名
    factory.setUsername("guest");
    // 密码
    factory.setPassword("guest");

    // 创建连接
    return factory.newConnection();
    }
    }
  • (3)运行并查看
  • 运行程序后,进行rabbitmq-web界面进行查看

1.2 消费者

  • (1)消费者构建
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    /**
    * 消息消费者
    * @author letere
    * @create 2021-07-31 15:46
    */
    public class Consumer {

    public static void main(String[] args) throws Exception{
    Consumer consumer = new Consumer();

    // 连接rabbitmq
    Connection connection = consumer.connectRabbitMQ();
    // 创建信道
    Channel channel = connection.createChannel();
    /*
    消费消息
    参数1:队列名
    参数2:是否自动应答
    参数3:消费消息回调函数
    参数4:消费取消回调函数
    */
    channel.basicConsume(
    "my_queue",
    true,
    ((consumerTag, message) -> {
    System.out.println(consumerTag);
    System.out.println(new String(message.getBody()));
    }),
    ((consumerTag) -> {
    System.out.println("消息:" + consumerTag + ",消费取消/中断!");
    })
    );
    }

    /**
    * 连接rabbitmq
    * @return Connection
    * @throws Exception
    */
    private Connection connectRabbitMQ() throws Exception{
    // rabbitmq连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // ip地址
    factory.setHost("192.168.85.130");
    // 端口号
    factory.setPort(5672);
    // 用户名
    factory.setUsername("guest");
    // 密码
    factory.setPassword("guest");

    // 创建连接
    return factory.newConnection();
    }
    }
  • (2)运行测试