文章目录
- 前言
- 实战要点
- 技术积累
- Spring Cloud Stream简介
- 集成kafka要点
- 集成rabbitmq要点
- 实战演示
- Maven依赖版本号选择
- Spring及MQ主要配置
- 基础信道
- 绑定信道消息发送
- 集成兼容多mq演示
- Rabbitmq演示
- Kafka演示
- 写在最后
前言
前面的博文我们介绍并实战演示了Spring Cloud Stream整合rabbitmq,其中主要介绍了如何使用和配置完成消息中间件的集成。但是,在实际的生产环境中可能会用到多个消息中间件,又或者是由于业务改变需要更换消息中间件,在这些情况下我们的Spring Cloud Stream框架可以完全兼容多个消息中间件和多种消息中间件的替换。今天,我们就在一个项目中用Spring Cloud Stream 集成两个消息中间件kafka和rabbitmq。
实战要点
1、完美集成并兼容kafka和rabbitmq
2、增加消费组概念,直接保证消息唯一消费
3、增加重试机制,重试条件满足后自动加入死信
4、增加死信消费者,可以直接移植生产
5、消费者手动ack、offset
6、rabbitmq、kafka配置,保证消息不丢失
技术积累
Spring Cloud Stream简介
Spring Cloud Stream是用于构建微服务具有消息驱动能力的框架,应用程序通过inputs、outputs通道与binder进行交互,binder与消息中间件进行通信。
binder的作用是将消息中间件进行粘合,相当于对第三方中间件进行封装整合,让开发人员不用关心底层消息中间件如何运行。
inputs是消息输入通道,类似于消息中间件的consumer消费者;outputs是消息输出通道,类似于消息中间件的producer生产者。应用程序收发消息不再直接调用消息中间件的接口或者逻辑代码,直接使用Spring Cloud Stream 的OUTPUT与INPUT通道进行处理。
可以通过binder绑定选用各种消息中间件,用binding进行中间件的相关参数配置,让应用程序达到灵活配置和切换消息中间件的目的。
集成kafka要点
1、修改server.properties文件,将#listeners=PLAINTEXT://:9092这一句注释放开,改为listeners=PLAINTEXT://kafka服务器ip:9092
如果此处不改SpringBoot在启动时会报错:
Error connecting to node devops-01:9092 (id: 0 rack: null)
2、kafka 2.8版本开始自带zk,建议使用2.8版本以上的版本不用安装zk
3、spring-boot-starter-paren与spring-cloud-starter-stream-kafk版本号一定要对应上,特别是springboot2之后的版本。如果没有特殊要求,需严格按照本文的版本号进行配置和实战
4、kafka本身、生产者、消费者保证消息不丢失,注意必须使用kafka HA配合修改配置
集成rabbitmq要点
1、rabbitmq比kafka的限制条件就少很多,基本上不用考虑spring版本号兼容
2、rabbimq本身、生产者、消费者保证消息不丢失,注意必须使用rabbitmq HA
实战演示
本次实战直接采用从0到1的策略进行演示,适合小白直接入手,可直接接入生产
本次实战MQ组件全部采用单机进行测试,生产环境请更换为HA
本次实战提供:
1、Kafka、Rabbitmq消息中间件信道注册
2、Kafka、Rabbitm消息中间件消息发送、接收消息监听、死信消息监听
Maven依赖版本号选择
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR10</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>3.0.3.RELEASE</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
Spring及MQ主要配置
server:
port: 9999
spring:
rabbitmq:
host: 10.10.22.187
port: 5672
username: admin
password: admin
virtual-host: /
kafka:
bootstrap-servers: 10.10.22.174:9092
cloud:
stream:
default-binder: myRabbit #默认绑定的mq
binders: #stream框架粘接的mq
myRabbit: #自定义个人mq名称
type: rabbit
environment:
spring: ${spring.rabbitmq}
myKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka: ${spring.cloud.stream.kafka.binder}
bindings: #stream绑定信道
output_channel: #自定义发送信道名称
destination: assExchange #目的地 交换机/主题
content-type: application/json
binder: myRabbit #粘接到的mq
input_channel: #自定义接收信道
destination: assExchange #目的地 交换机/主题
content-type: application/json
binder: myRabbit #粘接到的mq
group: assGroup
consumer:
maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
output_kafka_channel: #自定义发送信道名称
destination: assTopic #目的地 交换机/主题
content-type: text/plain
binder: myKafka #粘接到的mq
producer:
partition-count: 2 #分区数目
input_kafka_channel: #自定义接收信道
destination: assTopic #目的地 交换机/主题
content-type: text/plain
binder: myKafka #粘接到的mq
group: assGroup
consumer:
maxAttempts: 3 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
rabbit: #stream mq配置
bindings:
out_channel:
producer:
delivery-mode: persistent #消息持久化 non-persistent
useConfirmHeader: true #Future<Confirm>获取异常投递,与confirmAckChannel互斥
input_channel:
consumer:
concurrency: 1 #消费者数量
max-concurrency: 5 #最大消费者数量
durable-subscription: true #持久化队列
recovery-interval: 3000 #3s 重连
acknowledge-mode: MANUAL #手动
requeue-rejected: false #是否重新放入队列
auto-bind-dlq: true #开启死信队列
requeueRejected: true #异常放入死信
kafka:
binder:
brokers: ${spring.kafka.bootstrap-servers}
auto-add-partitions: true #自动分区
auto-create-topics: true #自动创建主题
replication-factor: 1 #两个副本
min-partition-count: 1 #最小分区
bindings:
out_kafka_channel:
producer:
# 无限制重发不产生消息丢失
retries: Integer.MAX_VALUE
#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
#可以设置的值为:all, -1, 0, 1
acks: all
min:
insync:
replicas: 1 #感知副本数
input_kafka_channel:
consumer:
concurrency: 1 #消费者数量
max-concurrency: 5 #最大消费者数量
recovery-interval: 3000 #3s 重连
auto-rebalance-enabled: true #主题分区消费者组成员自动平衡
auto-commit-offset: false #手动提交偏移量
enable-dlq: true # 开启 dlq队列
dlq-name: assTopic.dlq
deserializationExceptionHandler: sendToDlq #异常加入死信
基础信道
/**
* MqChannel
* @author senfel
* @version 1.0
* @date 2023/6/2 15:46
*/
public interface MqChannel {
/**
* 消息目的地
* RabbitMQ中为交换机名称
* kafka topic
*/
String DESTINATION = "assExchange";
String DESTINATIONBYGROUP = "assGroup";
String DESTINATIONBYTOPIC = "assTopic";
/**
* 输出信道
*/
String OUTPUT_CHANNEL = "output_channel";
String OUTPUT_KAFKA_CHANNEL = "output_kafka_channel";
/**
* 输入信道
*/
String INPUT_CHANNEL = "input_channel";
String INPUT_KAFKA_CHANNEL = "input_kafka_channel";
String INPUT_KAFKA_CHANNEL_ERROR = "assTopic.dlq";
/**
* 死信队列
*/
String INPUT_CHANNEL_DLQ = "assExchange.assGroup.dlq";
@Output(MqChannel.OUTPUT_CHANNEL)
MessageChannel output();
@Output(MqChannel.OUTPUT_KAFKA_CHANNEL)
MessageChannel outputByKafka();
@Input(MqChannel.INPUT_CHANNEL)
SubscribableChannel input();
@Input(MqChannel.INPUT_KAFKA_CHANNEL)
SubscribableChannel inputByKafka();
@Input(MqChannel.INPUT_KAFKA_CHANNEL_ERROR)
SubscribableChannel inputByKafkaError();
}
绑定信道消息发送
提供绑定信道,增加rabbitmq、kafka发消息逻辑
1、启动类增加绑定mq注解@EnableBinding(MqChannel.class)
@SpringBootApplication
@EnableBinding(MqChannel.class)
public class TestDemoApplication {
public static void main(String[] args) {
SpringApplication.run(TestDemoApplication.class, args);
}
}
2、增加发送消息接口
/**
* TestMQService
* @author senfel
* @version 1.0
* @date 2023/6/2 15:47
*/
public interface TestMQService {
/**
* rabbitmq发送消息
*/
void send(String str);
/**
* kafka发送消息
*/
void sendByKafka(String str);
}
3、实现发送消息接口
/**
* TestMQServiceImpl
* @author senfel
* @version 1.0
* @date 2023/6/2 15:49
*/
@Service
@Slf4j
public class TestMQServiceImpl implements TestMQService {
@Resource
private MqChannel mqChannel;
@Override
public void send(String str) {
mqChannel.output().send(MessageBuilder.withPayload("rabbitmq测试:"+str).build());
}
@Override
public void sendByKafka(String str) {
mqChannel.outputByKafka().send(MessageBuilder.withPayload("kafka测试:"+str).build());
}
}
4、提供接口层
/**
* @author senfel
* @version 1.0
* @date 2023/6/2 17:27
*/
@RestController
public class TestController{
@Resource
private TestMQService testMQService;
/**
* testRabbitmq
* @param str
* @author senfel
* @date 2023/6/8 11:27
* @return java.lang.String
*/
@GetMapping("/test")
public String testMq(String str){
testMQService.send(str);
return str;
}
/**
* testKafka
* @param str
* @author senfel
* @date 2023/6/8 11:27
* @return java.lang.String
*/
@GetMapping("/testKafka")
public String testKafka(String str){
testMQService.sendByKafka(str);
return str;
}
}
集成兼容多mq演示
Rabbitmq演示
1、TestMQServiceImpl增加mq消息监听和私信监听
/**
* 接收消息监听
* @param message 消息体
* @param channel 信道
* @param tag 标签
* @author senfel
* @date 2023/6/5 9:25
* @return void
*/
@StreamListener(MqChannel.INPUT_CHANNEL)
public void process(String message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("message : "+message);
if(message.contains("9")){
// 参数1为消息的tag 参数2为是否多条处理 参数3为是否重发
//channel.basicNack(tag,false,false);
System.err.println("--------------rabbitmq消费者消费异常--------------------------------------");
System.err.println(message);
throw new RuntimeException("消费异常");
}else{
System.err.println("--------------rabbitmq消费者--------------------------------------");
System.err.println(message);
channel.basicAck(tag,false);
}
}
/**
* 死信监听
* @param message 消息体
* @param channel 信道
* @param tag 标签
* @author senfel
* @date 2023/6/5 14:30
* @return void
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(MqChannel.INPUT_CHANNEL_DLQ)
, exchange = @Exchange(MqChannel.DESTINATION)
),
concurrency = "1-5"
)
public void processByDlq(String message,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("message : "+message);
System.err.println("---------------rabbitmq死信消费者------------------------------------");
System.err.println(message);
}
2、测试正常消息投递
--------------rabbitmq消费者--------------------------------------
rabbitmq测试:777777777777777
3、测试异常消息投递,投递规则3次消费失败直接进入死信
--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
--------------rabbitmq消费者消费异常--------------------------------------
rabbitmq测试:7777777777777779
---------------rabbitmq死信消费者------------------------------------
rabbitmq测试:7777777777777779
Kafka演示
1、TestMQServiceImpl增加mq消息监听和私信监听
/**
* kafka消费者
* @param message 消息体
* @param acknowledgment ack
* @param receivedTopic topic
* @param groupId 消费者group
* @author senfel
* @date 2023/6/7 15:59
* @return void
*/
@StreamListener(MqChannel.INPUT_KAFKA_CHANNEL)
public void processByKafka(String message,
@Header(value = KafkaHeaders.ACKNOWLEDGMENT,required = false) Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
@Header(value = KafkaHeaders.GROUP_ID,required = false) String groupId,
@Header(value = KafkaHeaders.PARTITION_ID,required = false) String partitionId) throws Exception {
System.err.println("-------进入kafka消费者---------------");
System.err.println(message);
System.err.println(receivedTopic);
if(message.contains("9")){
log.error("kafka消费异常:{}",message);
System.err.println("kafka1消费异常"+message);
throw new RuntimeException("kafka消费异常");
}
System.err.println("kafka接受的数据为"+message);
acknowledgment.acknowledge();
}
/**
* kafka死信消费
* @param message 消息体
* @param receivedTopic topic
* @author senfel
* @date 2023/6/7 15:58
* @return void
*/
@KafkaListener(topics = {MqChannel.INPUT_KAFKA_CHANNEL_ERROR},
groupId = MqChannel.DESTINATIONBYGROUP)
public void processByKafkaError(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) throws Exception {
System.err.println("-------进入死信消费者---------------");
System.err.println(message);
System.err.println(receivedTopic);
System.err.println("kafka死信接受的数据为"+message);
System.err.println(message);
}
2、测试正常消息投递
-------进入kafka消费者---------------
kafka测试:7777777777777777
assTopic
kafka接受的数据为kafka测试:7777777777777777
3、测试异常消息投递,投递规则3次消费失败直接进入死信
-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入kafka消费者---------------
kafka测试:7777777777777779
assTopic
kafka1消费异常kafka测试:7777777777777779
-------进入死信消费者---------------
kafka测试:7777777777777779
assTopic.dlq
kafka死信接受的数据为kafka测试:7777777777777779
kafka测试:7777777777777779
写在最后
Spring Cloud Stream集成多消息中间件kafka、rabbitmq较为简单,直接省去了原生中间的的操作与处理,开发人员可以直接任意切换和混用多种消息中间件,大大增加架构的可用性与可移植性。本实战案例提供重试、私信、手动ack、消费者分组和负载等高可用方案,直接可接入生产使用。
⭐️路漫漫其修远兮,吾将上下而求索 🔍