【实战】Spring Cloud Stream 3.1+整合Kafka

news2024/11/29 4:53:49

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      binders:
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}
      default-binder: kafkahub 
      function:
          definition: inputChannel,outputChannel
      bindings:
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目
  

此时消息生产者为:

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
    return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> inputChannel(){
        return message -> {
            System.out.println("接收到消息Payloa:" + message.getPayload());
            System.out.println("接收到消息Header:" + message.getHeaders());
        };
    }

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties>
    <java.version>8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
       <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>3.2.4</version>
    </dependency>
</dependencies>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:
  #kafka
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:  # kafka配置
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
          auto-add-partitions: true #自动分区
          auto-create-topics: true #自动创建主题
          replication-factor: 3 #副本
          min-partition-count: 3 #最小分区
        bindings:
          outputChannel-out-0:
            producer:
              # 无限制重发不产生消息丢失
              retries: Integer.MAX_VALUE
              #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
              #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
              #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
              #可以设置的值为:all, -1, 0, 1
              acks: all
              min:
                insync:
                  replicas: 3 #感知副本数
          inputChannel-in-0:
            consumer:
              concurrency: 1 #消费者数量
              max-concurrency: 5 #最大消费者数量
              recovery-interval: 3000  #3s 重连
              auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡
              auto-commit-offset: false   #手动提交偏移量
              enable-dlq: true  # 开启 dlq队列
              dlq-name: test-kafka-topic.dlq
              deserializationExceptionHandler: sendToDlq #异常加入死信
      binders: # 与外部mq组件绑定
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}

      default-binder: kafkahub #默认绑定
      function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
        definition: inputChannel;outputChannel;dlqChannel
      bindings: # 通道绑定
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目
        dlqChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        dlqChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;

/**
 * KafkaCustomer
 * @author senfel
 * @version 1.0
 * @date 2024/6/18 15:22
 */
@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> inputChannel(){
        return message -> {
            System.out.println("接收到消息:" + message.getPayload());
            System.out.println("接收到消息:" + message.getHeaders());
            if(message.getPayload().contains("9")){
                boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());
                System.err.println("向dlqChannel发送消息:"+send);
            }
        };
    }

    /**
     * dlqChannel 死信消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> dlqChannel(){
        return message -> {
            System.out.println("死信dlqChannel接收到消息:" + message.getPayload());
            System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());
        };
    }
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
    return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1851162.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【深度学习】实现基于MNIST数据集的TensorFlow/Keras深度学习案例

基于TensorFlow/Keras的深度学习案例 实现基于MNIST数据集的TensorFlow/Keras深度学习案例0. 什么是深度学习&#xff1f;1. TensorFlow简介2. Keras简介3. 安装TensorFlow前的注意事项4. 安装Anaconda3及搭建TensorFlow环境1&#xff09; 下载安装Anaconda Navigator2&#xf…

使用ESP32和Flask框架实现温湿度数据监测系统

项目概述 在这个项目中&#xff0c;我们将使用ESP32微控制器读取温湿度传感器的数据&#xff0c;并将这些数据通过HTTP请求传输到基于Flask框架的服务器。Flask是一个轻量级的Python Web框架&#xff0c;非常适合快速开发和部署Web应用。通过这个项目&#xff0c;我们不仅可以了…

36 - 按分类统计薪水(高频 SQL 50 题基础版)

36 - 按分类统计薪水 -- 方法一 selectLow Salary category,sum(income <20000) accounts_count fromAccounts union selectAverage Salary category,sum(income between 20000 and 50000) accounts_count fromAccounts union selectHigh Salary category,sum(in…

Linux htop命令使用

文章目录 简介界面介绍第一行第二行第三行第四行 如何使用 简介 htop 是一个类似于 top 的命令&#xff0c;但具有更丰富的功能和更友好的界面。它可以实时显示系统中各个进程的资源占用情况&#xff0c;如 CPU 使用率、内存使用率等。以下是对 htop 命令的完全解析&#xff1…

CANoe CAPL如何模拟发送CAN错误帧?

目录 canOutputErrorFrame介绍代码output(errorframe)代码总结canOutputErrorFrame 介绍 代码 canOutputErrorFrame(errorFrame, 12, 0); //output Error Frame with 12 dominant bits on CAN1 canOutputErrorFrame(CAN2.errorFrame, 6,

物理层(一)

第2章 物理层 2.1 通信基础 2.1.1 基本概念 1、数据、信号与码元 通信的目的是传输信息&#xff0c;如文字、图像和视频等。数据是指传送信息的实体。信号则是数据的电气或电磁表现&#xff0c;是数据在传输过程中的存在形式。数据和信号都有模拟或数字之分:①模拟数据(或模…

一个电商创业者眼中的618:平台大变局

战役结束了&#xff0c;战斗还在继续。 一位朋友去年5月创业&#xff0c;网上卖咖啡&#xff0c;这个赛道很拥挤&#xff0c;时机也不好&#xff0c;今年是他参加第一个618。朋友说&#xff0c;今年的目标是锤炼团队&#xff0c;总结方法&#xff0c;以及最重要的——活下去。…

设计模式——设计模式原则

设计模式 设计模式示例代码库地址&#xff1a; https://gitee.com/Jasonpupil/designPatterns 设计模式原则 单一职责原则&#xff08;SPS&#xff09;&#xff1a; 又称单一功能原则&#xff0c;面向对象五个基本原则&#xff08;SOLID&#xff09;之一 原则定义&#xf…

PHP环境搭建之使用PhpStudy

文章目录 1 PhpStudy1.1 简介1.2 下载&安装1.3 修改配置1.3.1 Apache配置1.3.2 MySQL配置1.3.3 MySQL启动问题 1.4 Composer1.4.1 简介1.4.2 下载安装1.4.3 修改配置1.4.4 使用命令 1 PhpStudy 1.1 简介 phpstudy是一个php运行环境的集成包&#xff0c;用户不需要去配置运…

2024/06/21--代码随想录算法10-12/17| 子序列问题

300.最长递增子序列 力扣链接 动规五部曲 dp的定义 dp[i]表示子序列答案以nums[i]结尾的最长递增子序列的长度 为什么一定表示 “以nums[i]结尾的最长递增子序” &#xff0c;因为我们在 做 递增比较的时候&#xff0c;如果比较 nums[j] 和 nums[i] 的大小&#xff0c;那么两…

怎么采集阿里巴巴1688的商品或商家数据?

怎么使用简数采集器批量采集阿里巴巴1688的商品或商家相关信息呢&#xff1f; 简数采集器暂时不支持采集阿里巴巴1688的相关数据&#xff0c;谢谢。 简数采集器采集网络网页数据非常简单高效&#xff1a;输入要采集的网址&#xff0c;简数智能算法会自动提取出网页上的关键信…

windows端口被占用问题,杀死进程

描述&#xff1a;端口被占用 在使用IntelliJ IDEA运行程序时&#xff0c;可能会遇到端口占用的情况&#xff0c;这通常由以下几个原因引起&#xff1a; 1、同一程序多次启动&#xff1a;如果你没有正确关闭之前运行的程序实例&#xff0c;再次尝试运行相同的程序时&#xff0c;…

前端实现对本地文件的IO操作

前言 在网页中&#xff0c;前端已经可以读取本地文件系统&#xff0c;对本地的文件进行IO读写&#xff0c;甚至可以制作一个简单的VScode编辑器。这篇文章以渐进式方式实现此功能&#xff0c;文末附上所有代码。 首先看整体功能演示 功能概述 我们将实现一个简单的 Web 应…

全面国产化信创适配改造方案说明

一、概叙 系统的全面国产化适配改造需要从多个方面进行考虑&#xff0c;改造前需要进行充分的论证&#xff0c;在满足具体业务场景的前提下&#xff0c;以确保系统的稳定性和安全性&#xff0c;同时还要考虑技术的发展&#xff0c;不断优化和更新。因此全面国产化适配改造也面临…

【React】富文本编辑器react-quill

安装 react-quill 富文本编辑器 npm i react-quill2.0.0-beta.2报错解决&#xff1a; npm i react-quill2.0.0-beta.2 --legacy-peer-deps导入编辑器组件和配套样式文件 import ReactQuill from react-quill // 1 import react-quill/dist/quill.snow.css // 2const Publi…

C++:STL容器-map

C:STL容器-map 1. map构造和赋值2. map大小和交换3. map插入和删除4. map查找和统计5. map容器排序 map中所有元素都是pair&#xff08;对组&#xff09; pair中第一个元素为key&#xff08;键&#xff09;&#xff0c;起到索引作用&#xff0c;第二个元素为value&#xff08;实…

开发指南033-数据库兼容

元芳&#xff0c;你怎么看&#xff1f; 单一数据库自身就有一些不同处理之处&#xff0c;如果一个平台要兼容所有数据库&#xff0c;就是难上加难&#xff0c;像isnull函数各数据库就不同。 对于这类问题&#xff0c;平台采用统一自定义函数解决&#xff0c;例如上面的round函…

Go 与 Java 字符编码选择:UTF-8 与 UTF-16 的较量

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

Nature Climate Change | 气候变暖会造成未来全球干旱区面积扩张?

在气候变暖的情况下&#xff0c;旱地通常被预测将在全球范围内扩大&#xff0c;旱地包括以水资源有限、植被稀疏为特征的土地区域。然而&#xff0c;这种预测依赖于旱地的大气代用物&#xff0c;即干旱指数。最近的研究表明&#xff0c;干旱指数对陆地水循环的各种组成部分的预…

前端vite+vue3——利用环境变量和路由区分h5、pc模块打包(从0到1)

⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享 前端vitevue3——利用环境变量和路由对前端区分h5和pc模块打包&#xff08;从0到1&#xff09;。 背景&#xff1a; 前端本地开发pc和h5的项目&#xff0c;发布时需要区分开h5和pc的页面 vite Vite 通过在一开始将应…