简介
Spring Cloud Stream是一个用于构建基于事件驱动的微服务应用程序的框架,其核心目标是简化开发过程,降低消息通信的复杂性,从而使开发人员能够专注于编写业务逻辑。Spring Cloud Stream通过提供Binder抽象,将应用程序与消息中间件解耦,让开发人员无需关心底层通信细节。同时,它还提供了一套丰富的API和特性,如消息分组、分区和错误处理,使得构建强大、可扩展的事件驱动应用程序变得更加简单。
应用程序可以通过inputs或者 outputs 来与Spring Cloud Stream中binder对象交互。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
目前仅支持RabbitMQ、 Kafka。
一句话:屏蔽消息中间件之间的区别,提供统一API接口。类似JDBC
处理架构
Binder:绑定器对应了两端,其中INPUT对应于消费者、OUTPUT对应于生产者
相关注解
@Input 注解标识输入通道,通过该输乎通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起
生产者
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yml:
spring:
application:
name: cloud-stream-service
datasource:
type: com.alibaba.druid.pool.DruidDataSource # 当前数据源操作类型
driver-class-name: com.mysql.cj.jdbc.Driver # mysql驱动包
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: yi
password: 123456
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
service层:向消息队列发送消息
public interface IMessageProvider {
public String send();
}
//通过rabbitMQ发送消息
@EnableBinding(Source.class) //定义消息的推送管道
public class IMessageProviderImp implements IMessageProvider {
@Resource
private MessageChannel output;//消息发送管道
@Override
public String send() {
UUID uuid = UUID.randomUUID();
output.send(MessageBuilder.withPayload(uuid.toString()).build());
System.out.println("***8****发送的消息为:"+uuid.toString());
return null;
}
}
controller层:访问/sendMessage时会自动向消息队列发送UUID消息。
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return iMessageProvider.send();
}
}
消费者
pom.xml与生产者一样
application.yml:把生产者的input改成output。
controller层:
@RestController
@EnableBinding(Sink.class)
public class ReceiverMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号接收到的消息:"+message.getPayload()+" port:"+serverPort);
}
}
消费组配置
同一个消费组中的成员只会有一个去进行消息的处理。
只需要在yml文件里配置group名。
配置消费组后,消费者会含有持久化属性,宕机重启后会重新获得未被消费的消息。