Stream用于解除各种消息中间件的使用差异,将他们整合为统一接口使用
1 Stream介绍
1.1 简介
- 诞生作用:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
- 官网:https://spring.io/projects/spring-cloud-stream#learn
- 中文文档:https://m.wang1314.com/doc/webapp/topic/20971999.html
- 应用程序通过inputs或者outputs来与spring cloud stream中的binder对象交互
- 通过我们配置来绑定,而spring cloud stream 的
binder对象负责与消息中间件交互
- 所以,我们只需要搞清楚如何与spring cloud stream交互就可以方便使用消息驱动的方式
- 目前仅支持RabbitMQ、Kafka
1.2 设计思想
- 生产者/消费者之间靠消息媒介传递信息内容(Message)
- 消息必须走特定的通道(Message Channel)
- 由(MessageHander)来进行消息接收和转发处理
- 通过定义绑定器作为中间层,完美实现了
应用程序和消息中间件之间的隔离
- 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现
- Stream中的消息通信方式遵循了发布-订阅模式
1.3 标准流程套路
- Binder:方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统汇总就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象时Spring Cloud Stream自身,从Stream发布消息就是输入,接受消息就是输入
1.4 API和常用注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用于消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间,可以动态的改变消息类,这些可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指定信道channel和exchange绑定在一起 |
2 消息发送者
2.1 搭建
- (1)创建项目
cloud-stream-rabbit-pub8801
- (2)引入依赖
1
2
3
4
5
6
7
8
9
10<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- (3)修改yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23server:
port: 8801
spring:
application:
name: cloud-stream-rabbit-pub
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称,OUTPUT表示这是消息的发送方
destination: testExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红没有关系)
- (4)主启动类
1
2
3
4
5
6
public class StreamPubMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamPubMain8801.class, args);
}
}
- (5)service层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//接口
public interface MessageProvider {
String send();
}
//-----------------------------------------------------------------------------------
//实现类
//定义消息的推送管道
public class MessageProviderImp implements MessageProvider{
private MessageChannel output; //消息发送管道
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build()); //将消息发送到'发送管道上'
System.out.println("serial:" + serial);
return null;
}
}
- (6)controller层
1
2
3
4
5
6
7
8
9
10
11
public class SendMessageController {
private MessageProvider messageProvider;
public String sendMessage() {
return messageProvider.send();
}
}
2.2 测试
- (1)启动
cloud-stream-rabbit-pub8801
项目,进入rabbitmq页面管理
- (2)查看rabbitmq交换机是否注册成功
- (3)测试接口:
http://localhost:8801/sendMessage
,进入rabbitmq页面是否接受到消息
3 消息接受者
3.1 搭建
- (1)创建项目
cloud-stream-rabbit-sub8802
- (2)引入依赖
1
2
3
4
5
6
7
8
9
10<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- (3)修改yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23server:
port: 8802
spring:
application:
name: cloud-stream-rabbit-sub
cloud:
stream:
binders: #配置要绑定的rabbitmq的服务信息
defaultRabbit: #(自定义名称),用于与binding整合
type: rabbit #消息中间件类型
environment: #环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
input: #
destination: testExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型
binder: defaultRabbit #要绑定的中间件服务
- (4)主启动类
1
2
3
4
5
6
public class StreamSubMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamSubMain8802.class, args);
}
}
- (5)Controller
1
2
3
4
5
6
7
8
9
10
11
public class ReceiveMessageController {
private String serverPort;
//接受消息,并自动执行下面方法
public void input(Message<String> message) {
System.out.println("订阅者:" + serverPort + " --- " + message.getPayload());
}
}
3.2 测试
- (1)先启动
cloud-stream-rabbit-pub8801
,再启动cloud-stream-rabbit-sub8802
- (2)测试8801接受,向中间件发送消息,http://localhost:8801/sendMessage
- (3)观察8802后台,查看是否监听到消息的发送,并自动执行方法
4 分组消费和持久化
4.1 重复消费
- 我们先复制一份
cloud-stream-rabbit-pub8803
,项目内容跟8802一模一样,做一个简单集群
- 然后分别运行三个项目,测试发送消息接口,发现8802,8803重复接受到了消息,这不是想要的结果,而是希望轮询接受数据
- 生产实际例子:订单系统我们做了集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取,那么就会造成数据错误,我们需要避免这种情况,我们就可以使用Stream中的消息分组来解决
4.2 分组解决重复消费
- 解决原理:当两个服务的消息中间件分组不一样,就会出现重复消费,共同获取信息。但是如果分组一样,两个服务之间就是竞争关系
修改订阅者的yaml,将两个组名改为一样
1
2
3
4
5
6
7
8
9
10
11spring:
application:
name: cloud-stream-rabbit-sub
cloud:
stream:
bindings:
input: #
destination: testExchange
content-type: application/json
binder: defaultRabbit
group: groupA #设置分组名测试访问发送消息接口两次,查看8802、8803后台,发现只有一个消息,实现了轮询效果
4.3 持久化
- 如果我们没有手动对yaml配置
group
属性,若消息发送者在消息接受者未启动时发送消息,则待消息接受者启动后,会出现消息丢失的情况,没有从消息中间件中获取消息。
- 如果我们设置了分组,则在启动时会正常获取之前发送的消息