spring-cloud-stream中,@EnableBinding@从3.1开始就被弃用,取而代之的是函数式编程模型
同期被废弃的注解还有下面这些注解
@Input @Output @EnableBinding @StreamListener
官方例子:GitHub - spring-cloud/spring-cloud-stream-samples: Samples for Spring Cloud Stream
官方给的例子是kafka,rabbit,没有rocketmq的例子,轻度使用的情况下用rabbit的例子适配rocketmq也是可以的。
网上的文章要么介绍旧代码,要么介绍新代码,没有介绍旧代码改造到新代码的过程。这里介绍旧版代码改造过程。
一、生产者旧版代码改造
1.yml配置变更
spring:
cloud:
stream:
bindings:
output:
destination: stream-test-topic
改为
spring:
cloud:
stream:
bindings:
## 新版本固定格式 函数名-{out/in}-{index}
demoChannel-out-0:
destination: stream-test-topic
2.注解变更
去掉启动类上的@EnableBinding(Source.class)注解
3.代码变更
@Autowired
private Source source;
@GetMapping("/test-stream")
private String sendDemo(){
source.output().send(
MessageBuilder
.withPayload("消息体")
.build()
);
return "success";
}
改为
@Autowired
private StreamBridge streamBridge;
@GetMapping("/test-stream")
public String testStream() {
streamBridge.send("demoChannel-out-0",
MessageBuilder
.withPayload("消息体")
.build()
);
return "success";
}
注意,这里的send多了一个参数,跟配置里的bingdings下的out对应。
二、消费者代码改造
1.yml配置变更
spring:
cloud:
stream:
bindings:
input:
destination: stream-test-topic
group: binder-group
改为
spring:
cloud:
stream:
bindings:
## 新版本固定格式 函数名字-{out/in}-{index}
demoChannel-in-0:
destination: stream-test-topic
2.注解变更
去掉启动类上的@EnableBinding(Sink.class)注解
3.代码变更
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody){
log.info("通过stream收到了消息:messageBody={}",messageBody);
}
}
改为
@Slf4j
@Configuration
public class TestStreamConsumer {
@Bean
public Consumer<String> demoChannel() {
/*return new Consumer<String>() {
@Override
public void accept(String msgData) {
log.info("demoChannel接到消息:{}",msgData);
}
};*/
return message -> {
log.info("demoChannel接到消息:{}", message);
};
}
}
注意:这里的函数名未demoChannel,与yml中bingdings下的in前面一段对应。注释掉的代码和下面的匿名函数lamda写法等价。
关于函数名相关配置,参考官方文档:Preface
Functional binding names这一节