Stream
本专栏学习内容来自尚硅谷周阳老师的视频
有兴趣的小伙伴可以点击视频地址观看
SpringCloud Stream是SpringCloud的消息驱动,之前的微服务学的好好的,为什么会突然冒出一个这么个东西来增加我们的学习量呢?
一听到消息,那肯定就想到了MQ、Kafka,在日常工作中可能不止用到一种MQ,这时候需要对所有的MQ进行系统的学习,当然也不是所有人都有经历去学习。这时候Stream就应运而生,可以理解为Stream对RabbitMQ、Kafka进行的封装,使用者只需要了解Stream即可。
概念
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka
Stream中的消息通信方式遵循了发布-订阅模式,使用了Topic主题进行广播
编码API和常用注解
- Middleware:中间件,目前只支持RabbitMQ和Kafka
- Binder:应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的该表消息类型(Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
- @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
- @Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
- @StreamListener:监听队列,用于消费者的队列的消息接收
- @EnableBinding:信道channel和exchange绑定在一起
前期准备
在学习这章节之前,至少RabbitMQ环境已经搭配好,并且小黄在这里建议学习之前最好去了解以下RabbitMQ,便于后面的理解。小黄学RabbitMQ
另外提前说明一下,在下面的案例中需要三个模块
- cloud-stream-rabbitmq-provider8801, 作为生产者进行发消息模块
- cloud-stream-rabbitmq-consumer8802,作为消息接收模块
- cloud-stream-rabbitmq-consumer8803,作为消息接收模块
消息驱动之生产者
创建cloud-stream-rabbitmq-provider8801服务
引入相关依赖
<!--stream-rabbitmq-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka-client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
编写配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 标识定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
output: # 这个名字是一个通道名称
destination: studyExchange # 标识要使用的交换机名称
content-type: application/json #设置消息类型
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
rabbitmq:
host: 124.220.80.180
port: 5672
username: xxx
password: xxx
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
创建发送者接口及其实现类
public interface IMessageProvider {
public String send();
}
@Slf4j
@EnableBinding(Source.class) // 可以理解为是一个消息的发送管道的定义
public class MessageProviderImpl implements IMessageProvider {
//消息的发送管道
@Resource
private MessageChannel output;
@Override
public String send() {
String msg = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(msg).build());
log.info("******发送消息:{}",msg);
return null;
}
}
编写调用代码
@RestController
public class MessageProviderController {
@Resource
IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
测试
发送三次请求,发现RabbitMQ上确收有收到消息 http://localhost:8801/sendMessage
消息驱动之消费者
创建cloud-stream-rabbitmq-consumer8802服务
引入相关依赖
<!--stream-rabbitmq-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka-client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
编写配置文件
与生产者基本一致,只不过把output改成input
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 标识定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务的整合处理
input: # 这个名字是一个通道名称 #****************
destination: studyExchange # 标识要使用的交换机名称
content-type: application/json #设置消息类型
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
rabbitmq:
host: 124.220.80.180
port: 5672
username: xxx
password: xxx
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8802.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
创建消息消费者
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message message)
{
log.info("消费者1号,------->接收到的消息:{} \t port: {}",message.getPayload(),serverPort);
}
}
测试
通过测试发现消费者可以正常的接受到生产者的信息
分组消费与持久化
为了演示问题场景,根据cloud-stream-rabbitmq-consumer8802克隆一个cloud-stream-rabbitmq-consumer8803服务
重复消费
启动8801、8802、8803服务,通过8801发送3条信息,发现8802、8803都消费了这三条信息,在微服务中,这种重复消费的情况是非常可怕的,可能会导致用户重复付费,所以需要避免
原因
这里大家最好还是先了解一下RabbitMQ
首先我们要探究为什么会出现重复消费的情况。上面讲过Stream是使用topic的形式分发消息,也就是说发到一个交换机里的消息会被所有的队列接受并消费。
在默认情况下,Stream会为每一个Input创建一个队列,所以8802、8803创建了两个队列,同时收到消息也非常合理
原理
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
解决方案
根据原理,我们的解决方案是将两个Input放入同一个组中
修改配置文件
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: myGroup # 为两个服务的分组
当分组完成后,再次启动服务发现交换机上只绑定了myGroup队列,再次测试发现不会出现重复消费的情况
默认采用的是轮询,即会将消息轮流发给多个消费者
消息丢失
MQ对消息的持久化是存放在队列中,如果这时候服务宕机了,8801发送的消息还是存储在原来的队列中,在默认没有分组的情况下,每次重启服务生成的队列名是随机的,是无法获取到在宕机时发送到MQ上的消息,就造成了消息丢失。
解决方案
同样也是使用分组来解决,定义了一个group也就是定义了一个队列,重启服务时会重新去监听那个队列,消息就不会丢失了。
友情提示:生产环境中group名称一旦确认了,谨慎修改!!!