0%

【RabbitMQ】发布确认

关于生产者发布消息的相关知识


1 发布确认

1.1 概念

  • 发布确认: 即为在消息持久化的过程中,rabbitmq接收到消息并保存完在磁盘上后,发消息通知生产者消息发送成功,否则生产者会重新发送消息,确保消息持久化不丢失数据

1.2 发布确认开启

  • 生产者中的信道方法confirmSelect,即为开启发布确认
    1
    Confirm.SelectOk confirmSelect() throws IOException;

1.3 单个发布确认

  • 一种同步确认发布方式,即生产者发布一条信息,就得确认一条信息,信息确认完后才往后发其他信息
  • 优点:保证了信息持久化
  • 缺点:发布速度特别慢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 单个发布确认:发一条确认一条
* @throws Exception
*/
public void singleConfirm() throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 定义队列
channel.queueDeclare("publish_queue1", false, false, false, null);
// 开启发布确认
channel.confirmSelect();

long begin = System.currentTimeMillis();
// 发1000条数据
for (int i=0; i<1000; i++) {
channel.basicPublish("", "publish_queue1", null, String.valueOf(i).getBytes());
// 确认消息(确认失败重发)
if (!channel.waitForConfirms()) {
i--;
}
}
long end = System.currentTimeMillis();
System.out.println("单个发布确认共耗时:" + (end-begin) + "毫秒!");
}
1
2
3
4
5
6
7
// 运行测试时间(602ms)
public static void main(String[] args) throws Exception{
Provider provider = new Provider();

// 单个发布确认(602ms)
provider.singleConfirm();
}

1.4 批量发布确认

  • 跟单个发布确认相反,批量发布确认速度快,但一旦出现异常,就不清楚哪个信息出现异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 批量发布确认:暂定发布100条确认一次
* @throws Exception
*/
public void multiConfirm() throws Exception {
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 定义队列
channel.queueDeclare("publish_queue2", false, false, false, null);
// 开启发布确认
channel.confirmSelect();

long begin = System.currentTimeMillis();
// 发1000条数据
for (int i=0; i<1000; i++) {
channel.basicPublish("", "publish_queue2", null, String.valueOf(i).getBytes());

if (i+1 % 100 == 0) {
// 发布确认(确认失败重发100条数据)
if (!channel.waitForConfirms()) {
i -= 100;
}
}
}
long end = System.currentTimeMillis();
System.out.println("批量发布确认共耗时:" + (end-begin) + "毫秒!");
}
1
2
3
4
5
6
7
// 运行测试时间(64ms)
public static void main(String[] args) throws Exception{
Provider provider = new Provider();

// 批量发布确认(64ms)
provider.multiConfirm();
}

1.5 异步发布确认

  • 即发消息和确认消息这两个步骤是异步的,生产者只需要一直发消息即可,哪些消息是确认失败的会稍后通知
  • 需要准备一个多线程用的Map,来记录kv值,方便查找未确认的消息并重新发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 异步发布确认
* @throws Exception
*/
public void asyncPublic() throws Exception {
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 定义队列
channel.queueDeclare("publish_queue3", false, false, false, null);
// 开启发布确认
channel.confirmSelect();

// 存储发送消息
ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<>();

/*
消息监听器,监听哪些消息成功,哪些失败
参数1:发布确认成功回调函数
参数2:发布确认失败回调函数
*/
channel.addConfirmListener(
(deliveryTag, multiple) -> {
// 发布成功逻辑代码
},
(deliveryTag, multiple) -> {
// 重新发送消息
channel.basicPublish("", "publish_queue3", null, map.get(deliveryTag).getBytes());
}
);

long begin = System.currentTimeMillis();
// 发1000条数据
for (int i=0; i<1000; i++) {
// 将发送的消息存储到map中
map.put(channel.getNextPublishSeqNo(), String.valueOf(i));
channel.basicPublish("", "publish_queue3", null, String.valueOf(i).getBytes());
}
long end = System.currentTimeMillis();
System.out.println("异步发布确认共耗时:" + (end-begin) + "毫秒!");
}
1
2
3
4
5
6
7
// 运行测试时间(45ms)
public static void main(String[] args) throws Exception{
Provider provider = new Provider();

// 异步发布确认(45ms)
provider.asyncPublic();
}