0%

【RabbitMQ】交换机

关于RabbitMQ中交换机的使用


1 交换机

1.1 使用场景

  • 当我们需要一个消息被多个消费者共享的时候,此时就需要用到交换机,一个队列中消费者之间是竞争关系,无法实现消息共享,所以需要创建多一个队列,让交换机发送消息到这些队列中,从而达到一个消息被多个消费者共享

1.2 概念

  • 生产者发送消息并不是直接发送到队列中,而是发送到交换机中。而交换机负责将消息放入到队列中,具体放到特定队列、多个队列、或丢弃消息,用交换机具体的类型决定
  • 交换机类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)

1.3 交换机与队列

  • 交换机与队列之间的关系通过routingKey进行关联,routingKey相当于队列的唯一标识,交换机通过routingKey来寻找对应的队列,并发送消息给此队列

2 fanout交换机

  • fanout类型交换机,功能类似广播一样,将消息发送到所有它绑定的队列中
  • fanout类型交换机中,队列的routingKey都为空,不需要队列有唯一标识

2.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 生产者 - 发送消息给交换机
* @author letere
* @create 2021-08-04 14:53
*/
public class Provider {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
定义交换机
参数1:交换机名
参数2:交换机类型
*/
channel.exchangeDeclare("fanoutExchange", "fanout");

// 发送消息(控制台输入)
Scanner scanner = new Scanner(System.in);
while (true) {
String message = scanner.next();
channel.basicPublish("fanoutExchange", "", null, message.getBytes());
}
}
}

2.2 消费者

  • 同理创建两个一样的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 消费者01 - 队列绑定交换机,从队列中接收消息
* @author letere
* @create 2021-08-04 14:57
*/
public class Consumer01 {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明默认队列,并获取队列名
String queueName = channel.queueDeclare().getQueue();
/*
与交换机绑定
参数1:队列名
参数2:交换机名
参数3:routingKey
*/
channel.queueBind(queueName, "fanoutExchange", "");

System.out.println("消费者01等待接受消息...");
channel.basicConsume(
queueName,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收异常!");
}
);
}
}

2.3 测试

  • 先启动生产者来创建交换机,再启动两个消费者,在生产者控制台输入要发送过的消息

3 direct交换机

  • direct大部分跟fanout交换机一样,只是direct交换机绑定的routingKey有对应的值,可以发送消息到特定的队列上

3.1 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 生产者 - 发送消息给交换机
* @author letere
* @create 2021-08-04 14:53
*/
public class Provider {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
定义交换机
参数1:交换机名
参数2:交换机类型
*/
channel.exchangeDeclare("directExchange", "direct");

// 发送消息(控制台输入)
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入发送信息:");
String message = scanner.next();
System.out.print("请输入routingKey:");
String routingKey = scanner.next();
channel.basicPublish("directExchange", routingKey, null, message.getBytes());
System.out.println("---------------------------");
}
}
}

3.2 消费者

  • 同理创建两个一样的消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 消费者01 - 队列绑定交换机,从队列中接收消息
* @author letere
* @create 2021-08-04 14:57
*/
public class Consumer01 {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明默认队列,并获取队列名
String queueName = channel.queueDeclare().getQueue();
/*
与交换机绑定
参数1:队列名
参数2:交换机名
参数3:routingKey
*/
channel.queueBind(queueName, "directExchange", "consumer01");

System.out.println("消费者01等待接受消息...");
channel.basicConsume(
queueName,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收异常!");
}
);
}
}

3.3 测试

  • 先启动生产者来创建交换机,其次再启动两个消费者,用控制台发送消息

4 topic交换机

  • topic交换机是fanout和direct交换机的一个升级,可以同时向两个队列发送消息,也可以向特定的队列发送消息

4.1 routingKey命名

  • topic交换机中,routingKey有固定规范的命名方式
  • (1)命名方式用xxx.xxx.xxx的格式进行命名,其中单词的长度不限制,但总长度不能超过255
  • (2)存在通用字符匹配,*代表任意一个单词,#代表任意零个或多个单词

4.2 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 生产者 - 发送消息给交换机
* @author letere
* @create 2021-08-04 20:37
*/
public class Provider {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
/*
定义交换机
参数1:交换机名
参数2:交换机类型
*/
channel.exchangeDeclare("topicExchange", "topic");

// 发送消息(控制台输入)
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入发送信息:");
String message = scanner.next();
System.out.print("请输入routingKey:");
String routingKey = scanner.next();
channel.basicPublish("topicExchange", routingKey, null, message.getBytes());
System.out.println("---------------------------");
}
}
}

4.3 消费者

  • 同理创建两个消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 消费者01 - 队列绑定交换机,从队列中接收消息
* @author letere
* @create 2021-08-04 20:38
*/
public class Consumer01 {

public static void main(String[] args) throws Exception{
// 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明默认队列,并获取队列名
String queueName = channel.queueDeclare().getQueue();
/*
与交换机绑定
参数1:队列名
参数2:交换机名
参数3:routingKey
*/
channel.queueBind(queueName, "topicExchange", "#.consumer01.#");

System.out.println("消费者01等待消息接收...");
channel.basicConsume(
queueName,
(consumerTag, message) -> {
System.out.println("接收消息:" + new String(message.getBody()));
},
consumerTag -> {
System.out.println("消息接收失败!");
}
);
}
}

4.4 测试

  • 先启动生产者创建交换机,再启动消费者,通过控制台发送消息