您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦。
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门
文章目录
- 1. Spring Cloud Stream是什么?
- 2. Spring Cloud Stream的执行流程
- 3. 注解代码实现
- 在 my-springcloud-rocketmq-producer 上的操作
- 3.1. 引入依赖
- 3.2 . 属性文件配置
- 3.3. 定义生产者
- 在 my-springcloud-rocketmq-consumer上的操作
- 3.4. 引入依赖同生产者
- 3.5. 配置文件修改
- 3.6. 定义消费者
1. Spring Cloud Stream是什么?
Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。
官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/
官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations
该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。
简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构
Spring Cloud Stream的核心构建块(编程模型)是:
- Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
- **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
- Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。
2. Spring Cloud Stream的执行流程
3. 注解代码实现
首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。
本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13
在 my-springcloud-rocketmq-producer 上的操作
3.1. 引入依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.2 . 属性文件配置
spring:
cloud:
stream:
bindings:
output:
destination: my-springcloud-stream-topic
rocketmq:
binder:
name-server: 172.31.184.89:9876
3.3. 定义生产者
在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。
@Component
public class MyProducer {
@Resource
private Source source;
public void sendMessage(String msg) {
// 封装消息头
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "tagA");
// 创建消息对象
Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
// 发送消息
source.output().send(message);
}
}
在 my-springcloud-rocketmq-consumer上的操作
3.4. 引入依赖同生产者
3.5. 配置文件修改
spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876
spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic
spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group
3.6. 定义消费者
在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。
@Component
public class MyConsumer {
@StreamListener(Sink.INPUT)
public void processMessage(String message) {
System.out.println("收到的消息=" + message);
}
}