0%

【SpringCloud】消息驱动:Stream

Stream用于解除各种消息中间件的使用差异,将他们整合为统一接口使用


1 Stream介绍

1.1 简介

  • 诞生作用:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
  • 应用程序通过inputs或者outputs来与spring cloud stream中的binder对象交互
  • 通过我们配置来绑定,而spring cloud stream 的 binder对象负责与消息中间件交互
  • 所以,我们只需要搞清楚如何与spring cloud stream交互就可以方便使用消息驱动的方式
  • 目前仅支持RabbitMQ、Kafka

1.2 设计思想

  • 生产者/消费者之间靠消息媒介传递信息内容(Message)
  • 消息必须走特定的通道(Message Channel)
  • 由(MessageHander)来进行消息接收和转发处理
  • 通过定义绑定器作为中间层,完美实现了应用程序和消息中间件之间的隔离
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现
  • Stream中的消息通信方式遵循了发布-订阅模式

1.3 标准流程套路

  • Binder:方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统汇总就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象时Spring Cloud Stream自身,从Stream发布消息就是输入,接受消息就是输入

1.4 API和常用注解

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用于消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间,可以动态的改变消息类,这些可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指定信道channel和exchange绑定在一起

2 消息发送者

2.1 搭建

  • (1)创建项目cloud-stream-rabbit-pub8801
  • (2)引入依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- stream-rabbit -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  • (3)修改yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    server:
    port: 8801

    spring:
    application:
    name: cloud-stream-rabbit-pub
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    environment: # 设置rabbitmq的相关的环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称,OUTPUT表示这是消息的发送方
    destination: testExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置(爆红没有关系)
  • (4)主启动类
    1
    2
    3
    4
    5
    6
    @SpringBootApplication
    public class StreamPubMain8801 {
    public static void main(String[] args) {
    SpringApplication.run(StreamPubMain8801.class, args);
    }
    }
  • (5)service层
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    //接口
    public interface MessageProvider {
    String send();
    }

    //-----------------------------------------------------------------------------------
    //实现类
    @EnableBinding(Source.class) //定义消息的推送管道
    public class MessageProviderImp implements MessageProvider{

    @Resource
    private MessageChannel output; //消息发送管道

    @Override
    public String send() {
    String serial = UUID.randomUUID().toString();
    output.send(MessageBuilder.withPayload(serial).build()); //将消息发送到'发送管道上'
    System.out.println("serial:" + serial);
    return null;
    }
    }
  • (6)controller层
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RestController
    public class SendMessageController {

    @Resource
    private MessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage() {
    return messageProvider.send();
    }
    }

2.2 测试

  • (1)启动cloud-stream-rabbit-pub8801项目,进入rabbitmq页面管理
  • (2)查看rabbitmq交换机是否注册成功
  • (3)测试接口:http://localhost:8801/sendMessage,进入rabbitmq页面是否接受到消息

3 消息接受者

3.1 搭建

  • (1)创建项目cloud-stream-rabbit-sub8802
  • (2)引入依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <!-- web -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- stream-rabbit -->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  • (3)修改yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    server:
    port: 8802

    spring:
    application:
    name: cloud-stream-rabbit-sub
    cloud:
    stream:
    binders: #配置要绑定的rabbitmq的服务信息
    defaultRabbit: #(自定义名称),用于与binding整合
    type: rabbit #消息中间件类型
    environment: #环境配置
    spring:
    rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    bindings:
    input: #
    destination: testExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型
    binder: defaultRabbit #要绑定的中间件服务
  • (4)主启动类
    1
    2
    3
    4
    5
    6
    @SpringBootApplication
    public class StreamSubMain8802 {
    public static void main(String[] args) {
    SpringApplication.run(StreamSubMain8802.class, args);
    }
    }
  • (5)Controller
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @EnableBinding(Sink.class)
    public class ReceiveMessageController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT) //接受消息,并自动执行下面方法
    public void input(Message<String> message) {
    System.out.println("订阅者:" + serverPort + " --- " + message.getPayload());
    }
    }

3.2 测试

  • (1)先启动cloud-stream-rabbit-pub8801,再启动cloud-stream-rabbit-sub8802
  • (3)观察8802后台,查看是否监听到消息的发送,并自动执行方法

4 分组消费和持久化

4.1 重复消费

  • 我们先复制一份cloud-stream-rabbit-pub8803,项目内容跟8802一模一样,做一个简单集群
  • 然后分别运行三个项目,测试发送消息接口,发现8802,8803重复接受到了消息,这不是想要的结果,而是希望轮询接受数据
  • 生产实际例子:订单系统我们做了集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取,那么就会造成数据错误,我们需要避免这种情况,我们就可以使用Stream中的消息分组来解决

4.2 分组解决重复消费

  • 解决原理:当两个服务的消息中间件分组不一样,就会出现重复消费,共同获取信息。但是如果分组一样,两个服务之间就是竞争关系
  • 修改订阅者的yaml,将两个组名改为一样

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    spring:
    application:
    name: cloud-stream-rabbit-sub
    cloud:
    stream:
    bindings:
    input: #
    destination: testExchange
    content-type: application/json
    binder: defaultRabbit
    group: groupA #设置分组名
  • 测试访问发送消息接口两次,查看8802、8803后台,发现只有一个消息,实现了轮询效果

4.3 持久化

  • 如果我们没有手动对yaml配置group属性,若消息发送者在消息接受者未启动时发送消息,则待消息接受者启动后,会出现消息丢失的情况,没有从消息中间件中获取消息。
  • 如果我们设置了分组,则在启动时会正常获取之前发送的消息