目录
概念
快速搭建SCS环境
一秒切换MQ
组件
1. Binder
2. Binding
3. Message
分组消费
概念
Spring Cloud Stream(SCS) 的主要目标是一套代码,兼容所有MQ, 降低MQ的学习成本,提供一致性的编程模型,让开发者能更容易地集成/切换消息组件(如 Apache Kafka、RabbitMQ、RocketMQ)
官网地址:Spring Cloud Stream
快速搭建SCS环境
1. 引入pom依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.4.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
2. 配置文件application.properties
# mq地址
spring.cloud.stream.rocketmq.binder.name-server=192.168.6.128:9876
spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
3. 生产者和消费者代码
@RestController
public class SendMessageController {
@Autowired
private Source source;
@GetMapping("/send")
public Object send(String message) {
MessageBuilder<String> messageBuilder =
MessageBuilder.withPayload(message);
source.output().send(messageBuilder.build());
return "message sended : "+message;
}
}
@Component
public class MessageConsumer {
@StreamListener(Sink.INPUT)
public void process(Object message) {
System.out.println("received message : " + message);
}
}
4. 验证生产消息,消费消息
一秒切换MQ
修改pom文件, 改成目标MQ依赖
<!--Kafka依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<!--RocketMq依赖-->
<!--<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.1</version>
</dependency>-->
<!--RabbitMQ依赖-->
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>-->
组件
1. Binder
SCS通过Binder定义一个外部消息服务器。默认情况下,SCS会使用对应的 SpringBoot插件来构建Binder。
例如RabbitMQ默认值配置
spring.rabbitmq.host=local
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
在SCS中,支持配置多个Binder访问不同的外部消息服务器。这些配置是通过spring.cloud.stream.binders. [bindername].environment.[props]=[value]的方式进行配置。另外,如果配置了多个Binder,也可以通过spring.cloud.stream.default-binder属性指定默认的 Binder。
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.host=loca
lhost
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.username=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.password=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.virtualhost=/
# 指定默认binder
spring.cloud.stream.default-binder=testbinder
2. Binding
Binding是SCS中实际进行消息交互的桥梁。在SCS中,通过Binding和 Binder建立绑定关系,客户端就通过Binding实现的消息收发。 在SCS框架中,配置Binding首先对他进行声明。声明Binding的方式,是在启动类中引入@EnableBinding注解。应用会向Spring容器中注入一个Binding接口的实例对象。在SCS中,默认提供了 Source、Sink、Process三个接口对象,分别代表消息的生产者、消费者和中间处理者。
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
public interface Processor extends Source, Sink {
}
binding配置项
# 队列名
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定binder。
spring.cloud.stream.bindings.output.binder=testbinder
spring.cloud.stream.bindings.input.destination=scstreamExchange
# 消费群组
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
# 指定binder。
spring.cloud.stream.bindings.input.binder=testbinder
# 最大重试次数
spring.cloud.stream.bindings.input.consumer.max-attempts=3
3. Message
在不同的MQ产品中,对于消息的定义其实也是不相同,SCS框架就需要对这些消息类型进行统一。消息结构包括请求头和消息体。
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Payload就是消息体,在SCS中定义成了一个泛型,可以直接传递对象。MessageHeaders是消息的头部属性,也可以说是消息的补充属性。不同的MQ产品下,就可以通过不同的MessageHeaders属性来代表各自的消息差异,消息内容可以通过Payload统一。
例如,RabbitMQ中有一个非常重要的概念routingKey。通过routingKey可以定制Exchange与Queue之间的路由关系。这个routingKey就可以通过在Headers当中指定一个routingkey属性来实现。
MessageBuilder<String> messageBuilder =
MessageBuilder.withPayload(message).setHeader("routingkey","info");
分组消费
分组消费机制:是在生产者实例和消费者实例之间建立一种对应关系,生产者实例发出的消息只会被对应的消费者消费
1. 设置分区规则, 提前设置好分区ID, 用ID匹配
# 生产者配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
# 只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1
# 消费者配置
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
# 启动消费分区 新版本这个属性已经取消,改为由分区表达式自动判断
spring.cloud.stream.bindings.input.consumer.partitioned=true
# 参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
# 设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=1
2. 设置分区规则2, 根据请求头属性匹配
分区提取器
// 增加分区提取器-提取匹配键值
public class MyPartitionKeyExtractor implements PartitionKeyExtractorStrategy {
public static final String PARTITION_PROP="partition";
@Override
public Object extractKey(Message<?> message) {
return message.getHeaders().get(MyPartitionKeyExtractor.PARTITION_PROP);
}
}
分区匹配器
public class MyPartitionSelectorStrategy implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
return Integer.parseInt(key.toString()) % partitionCount;
}
}
分区配置文件
# 添加生产者的分区配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.output.binder=testbinder
# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
#只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1
# 动态生成分区键
spring.cloud.stream.bindings.output.producer.partition-key-extractorname=myPartitionKeyExtractor
spring.cloud.stream.bindings.output.producer.partition-selectorname=myPartitionSelector
发送消息
@GetMapping("/send2")
public Object send2(String message) {
// 发送4条消息, 设置请求头0 1 2 3
for (int i = 0; i < 4; i++) {
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message)
.setHeader(MyPartitionKeyExtractor.PARTITION_PROP, i);
source.output().send(messageBuilder.build());
}
return "message sended : "+message;
}