从浅入深 学习 SpringCloud 微服务架构(十五)
一、SpringCloudStream 的概述
-
在实际的企业开发中,消息中间件是至关重要的组件之一。消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
-
不同的中间件其实现方式,内部结构是不一样的。如常见的 RabbitMQ 和 Kafka,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange,kafka 有 Topic,partitions 分区。
-
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud stream给我们提供了一种解耦合的方式。
-
Spring Cloud Stream 由一个中间件中立的核组成。应用通过 Spring Cloud Stream 插入的 input (相当于消费者 consumer,它是从队列中接收消息的)和 output (相当于生产者 producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的 Binder 实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注 Binder 对应用程序提供的抽象概念来使用消息中间件实现业务即可。
二、SpringCloudStream 的实现思想,如下图:
说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费。
三、SpringCloudStream 的核心概念
1、核心概念 :绑定器
-
Binder 绑定器是 Spring Cloud Stream 中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的 Spring Boot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。
-
通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的 Channel 通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码。
-
通过配置把应用和 spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改 topic、exchange、type 等一系列信息而不需要修改一行代码。
2、Spring Cloud Stream 支持各种 binder 实现,下表包含 GitHub 项目的链接。
- RabbitMQ
- Apache Kafka
- Amazon Kinesis
- Google PubSub (partner maintained)
- Solace PubSub+(partner maintained)
- Azure Event Hubs (partner maintained)
3、发布/订阅模型
在 Spring Cloud Stream 中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的 Topic 主题是 Spring Cloud Stream 中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中,Topic 可能对应着不同的概念,比如:在 RabbitMO 中的它对应了 Exchange、而在 Kakfa 中则对应了 Kafka 中的 Topic。
四、消息生产者的入门案例:上
1、下载安装语言环境 Eralng 和 消息中间件 rabbitmq-server 服务。
1.1 下载安装语言环境 Eralng (otp_win64_20.2.exe)
1.2 配置语言环境 Eralng 环境变量:
- 新建系统变量名为:ERLANG_HOME 变量值:erlang 安装地址(如:D:\Program Files\erl9.2)
- 将 系统变量 ERLANG_HOME 添加到 Path 目录下。
1.3 验证语言环境 erlang 是否安装成功:
WIN + R 打开【运行】,输入【cmd】,打开命令提示符窗口,输入:erl 显示 erlang 版本号,说明安装成功。
1.4 下载安装消息中间件 rabbitmq-server-3.7.4.exe
1.5 进入 rabbitMQ 安装目录的 sbin 目录,安装管理界面 rabbitmq_management(插件)
rabbitmq-plugins enable rabbitmq_management
1.6 进入 rabbitMQ 安装目录的 sbin 目录,启动 rabbitmq-server.bat 服务:
rabbitmq-server.bat
1.7 验证消息中间件 rabbitmq 是否安装成功:
WIN + R 打开【运行】,输入【cmd】,打开命令提示符窗口,输入:rabbitmqctl status
出现如下界面,表示安装成功。
1.8 打开浏览器,地址栏输入 http://127.0.0.1:15672 ,进入消息中间件 rabbitmq 管理界面
默认用户名、密码都是:guest
2、SpringCloudStream 入门案例:
2.1 SpringCloudStream :入门案例–搭建环境,准备数据库数据表。
— 创建数据库:
create database shop;
— 使用数据库:
use shop;
— 创建数据表:
CREATE TABLE `tb_product` (
`id` int NOT NULL AUTO_INCREMENT,
`product_name` varchar(40) DEFAULT NULL COMMENT '名称',
`status` int DEFAULT NULL COMMENT '状态',
`price` decimal(10,2) DEFAULT NULL COMMENT '单价',
`product_desc` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '描述',
`caption` varchar(255) DEFAULT NULL COMMENT '标题',
`inventory` int DEFAULT NULL COMMENT '库存',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb3
— 插入数据:
insert into `tb_product` (`id`, `product_name`, `status`, `price`, `product_desc`, `caption`, `inventory`) values('1','iPhone 15 Pro','1','7999.00','iPhone 15 Pro 6.7 英寸或 6.1 英寸, 超视网膜 XDR 显示屏,ProMotion 自适应刷新率技术,钛金属搭配亚光质感玻璃背板, 灵动岛功能, A17 Pro 芯片,配备 6 核图形处理器, Pro 级摄像头系统,主摄 4800 万像素 | 超广角 | 长焦, 10 倍, 支持 USB 3, 视频播放最长可达 29 小时。 ','iPhone 15 Pro 巅峰之作','99');
2.2 SpringCloudStream :入门案例–搭建环境–打开 idea 创建父工程
创建 artifactId 名为 spring_cloud_demo 的 maven 工程。
--> idea --> File
--> New --> Project
--> Maven
Project SDK: ( 1.8(java version "1.8.0_131" )
--> Next
--> Groupld : ( djh.it )
Artifactld : ( spring_cloud_demo )
Version : 1.0-SNAPSHOT
--> Name: ( spring_cloud_demo )
Location: ( ...\spring_cloud_demo\ )
--> Finish
2.3 SpringCloudStream :入门案例–搭建环境–在父工程 spring_cloud_demo 的 pom.xml 文件中导入依赖坐标。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>djh.it</groupId>
<artifactId>spring_cloud_demo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>product_service</module>
<module>order_service</module>
<module>eureka_service</module>
<module>import_eurekaserver_test</module>
<module>api_zuul_service</module>
<module>api_gateway_service</module>
<module>stream_product</module>
<module>stream_consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- springcloudgateway 的内部是通过 netty + webflux 实现。
webflux 实现和 springmvc 存在冲突,需要注销掉父工程中的 web 依赖,在各子模块中导入 web 依赖。
-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-web</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
<!-- spring_cloud_demo\pom.xml -->
3、SpringCloudStream:入门案例–搭建环境–在父工程 spring_cloud_demo 下,创建子工程 stream_product (子模块)。
3.1 创建 stream_product 子工程(子模块)
--> 右键 spring_cloud_demo 父工程
--> Modules
--> Maven
--> Groupld : ( djh.it )
Artifactld : ( stream_product )
Version : 1.0-SNAPSHOT
--> Next
--> Module name: ( stream_product )
Content root : ( \spring_cloud_demo\stream_product )
Module file location: ( \spring_cloud_demo\stream_product )
--> Finish
3.2 在子工程 stream_product (子模块)中的 pom.xml 中导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_cloud_demo</artifactId>
<groupId>djh.it</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stream_product</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</project>
<!-- spring_cloud_demo\stream_product\pom.xml -->
3.3 在子工程 stream_product (子模块)中,创建配置文件 application.yml
## spring_cloud_demo\stream_product\src\main\resources\application.yml
server:
port: 7001 #服务端口
spring:
application:
nmae: stream_product #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
output: #管道交互
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
binders: #配置绑定器
defaultRabbit:
type: rabbit
3.4 在子工程 stream_product (子模块)中,创建启动类 ProducerApplication.java
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ProducerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)发送消息的话,定义一个通道接口,通过接口中内置的 messagechannel,(sprngcloudtream 中内置接口 Source)
* 4)@EnableBinding 注解 :绑定对应通道。
* 5)发送消息的话,通过 MessageChannel 发送消息,如果需要 MessageChannel --> 通过绑定内置接口获取。
*/
package djh.it.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
@EnableBinding(Source.class)
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
//发送消息,使用 工具类 MessageBuilder 创建消息。
output.send(MessageBuilder.withPayload("hello SpringCloudStrem").build());
}
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
}
五、SpringCloudStream:入门案例–消息生产者的入门案例:下
1、进入 rabbitMQ 安装目录的 sbin 目录,启动 rabbitmq-server.bat 服务:
rabbitmq-server.bat
2、打开浏览器,地址栏输入 http://127.0.0.1:15672 ,进入消息中间件 rabbitmq 管理界面
默认用户名、密码都是:guest
发现已经创建出我们自定义的交换机名称 djh-default 。
六、SpringCloudStream:入门案例—消息消费者子工程(子模块):
1、SpringCloudStream:入门案例–在父工程 spring_cloud_demo 下,创建子工程 stream_consumer (子模块)。
1、创建 stream_consumer 子工程(子模块)
--> 右键 spring_cloud_demo 父工程
--> Modules
--> Maven
--> Groupld : ( djh.it )
Artifactld : ( stream_consumer )
Version : 1.0-SNAPSHOT
--> Next
--> Module name: ( stream_consumer )
Content root : ( \spring_cloud_demo\stream_consumer )
Module file location: ( \spring_cloud_demo\stream_consumer )
--> Finish
2、在子工程 stream_consumer (子模块)中的 pom.xml 中导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring_cloud_demo</artifactId>
<groupId>djh.it</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>stream_consumer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
</project>
<!-- spring_cloud_demo\stream_consumer\pom.xml -->
3、在子工程 stream_consumer (子模块)中,创建配置文件 application.yml
## spring_cloud_demo\stream_consumer\src\main\resources\application.yml
server:
port: 7002 #服务端口
spring:
application:
nmae: stream_consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: guest
password: guest
cloud:
stream:
bindings:
input: #管道交互,内置的获取消息的通道,从 djh-default 中获取消息。
destination: djh-default #指定消息发送的目的地,在 rabbitmq 中,发送到一个 djh-default 的交换机 exchange。
binders: #配置绑定器
defaultRabbit:
type: rabbit
4、在子工程 stream_consumer (子模块)中,创建启动类 ConsumerApplication.java
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ConsumerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)定义一个通道接口,通过内置获取消息的接口:Sink
* 4)绑定对应通道。
* 5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
@SpringBootApplication
public class ConsumerApplication {
@StreamListener(Sink.INPUT)
public void input(String message) {
System.out.println("获取到的消息: " + message);
}
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
5、运行 ProducerApplication 和 ConsumerApplication 两个启动类,在 idea Run Dashboard 控制面板,会输出 “获取到的消息: hello SpringCloudStrem”
七、SpringCloudStream:基于入门案例的代码优化
1、在子工程 stream_product (子模块)中,抽取一个消息发送的工具类 MessageSender.java
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\producer\MessageSender.java
*
* 2024-5-10 抽取一个消息发送的工具类 MessageSender.java
*/
package djh.it.stream.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
//发送消息
public void send(Object obj){
output.send(MessageBuilder.withPayload((obj)).build());
}
}
2、在子工程 stream_product (子模块)中,修改 启动类 ProducerApplication.java
/**
* spring_cloud_demo\stream_product\src\main\java\djh\it\stream\ProducerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ProducerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)发送消息的话,定义一个通道接口,通过接口中内置的 messagechannel,(sprngcloudtream 中内置接口 Source)
* 4)@EnableBinding 注解 :绑定对应通道。
* 5)发送消息的话,通过 MessageChannel 发送消息,如果需要 MessageChannel --> 通过绑定内置接口获取。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
}
3、在子工程 stream_product (子模块)中,创建一个测试类 ProducterTest.java
/**
* spring_cloud_demo\stream_product\src\test\java\djh\it\stream\ProducterTest.java
*
* 2024-5-10 创建一个测试类 ProducterTest.java
*/
package djh.it.stream;
import djh.it.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class ProducterTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend(){
messageSender.send("hello 测试 工具类");
}
}
4、在子工程 stream_consumer (子模块)中,创建一个获取消息工具类 MessageListener.java.java
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\consumer\MessageListener.java
*
* 2024-5-10 创建一个获取消息工具类 MessageListener.java.java
*/
package djh.it.stream.consumer;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class MessageListener {
//监听 binding 中的消息
@StreamListener(Sink.INPUT)
public void input(String message) {
System.out.println("获取到的消息: " + message);
}
}
5、在子工程 stream_consumer (子模块)中,修改 启动类 ConsumerApplication.java
/**
* spring_cloud_demo\stream_consumer\src\main\java\djh\it\stream\ConsumerApplication.java
*
* 2024-5-9 SpringCloudStream 入门案例:启动类 ConsumerApplication.java
* 1)引入依赖。
* 2)配置 application.yml 配置文件。
* 3)定义一个通道接口,通过内置获取消息的接口:Sink
* 4)绑定对应通道。
* 5)配置一个监听方法 :当程序从中间件获取数据之后,执行的业务逻辑方法,需要在监听方法上配置 @StreamListener 注解。
*/
package djh.it.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
6、运行 测试类 ProducterTest 和 ConsumerApplication 启动类,在 idea Run Dashboard 控制面板,会输出 “获取到的消息: hello 测试 工具类”
上一节关联链接请点击:
# 从浅入深 学习 SpringCloud 微服务架构(十四)微服务链路追踪