kafka--技术文档--spring-boot集成基础简单使用

news2025/1/19 2:54:03

阿丹:

        查阅了很多资料了解到,使用了spring-boot中整合的kafka的使用是被封装好的。也就是说这些使用其实和在linux中的使用kafka代码的使用其实没有太大关系。但是逻辑是一样的。这点要注意!

使用spring-boot整合kafka

1、导入依赖

核心配置为:

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

如果在下面规定了spring-boot的版本那么就不需要再使用版本号,如果没有的话就需要规定版本号。 

      <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!--配置文件报错问题-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

2、写入配置

#服务端口号
server:
  port: 8025

spring:
  main:
    allow-circular-references: true
  application:
    name: producer
  kafka:
    bootstrap-servers: kafka的ip地址:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

3、生产者

将发送封装为一个工具类

    public void send(Object obj){
        String obj2String = JSON.toJSONString(obj);
        log.info("准备发送消息为:{}",obj2String);

        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj2String);
        //回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                //成功的处理
                log.info(TOPIC_TEST + " - 生产者 发送消息成功:" + result.toString());
            }
        });

4、消费者

 如果需要使用多线程来监听的话使用这个策略。

@KafkaListener(topics = "Hello-Kafka", groupId = "group1")
public void onMessage1(ConsumerRecord<?, ?> record) {
    // 消息处理逻辑
}

@KafkaListener(topics = "Hello-Kafka", groupId = "group2")
public void onMessage2(ConsumerRecord<?, ?> record) {
    // 消息处理逻辑
}

以上就可以简单实现一个kafka的监听消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/926010.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【DEVOPS】Jenkins使用问题 - 控制台输出乱码

0. 目录 1. 问题描述2. 解决方案3. 最终效果4. 总结 1. 问题描述 部门内部对于Jenkins的使用采取的是Master Slave Work Node的方式&#xff0c;即作为Master节点的Jenkins只负责任务调度&#xff0c;具体的操作由对应的Slave Work Node去执行。 最近团队成员反馈一个问题&a…

高忆管理股票分析:1年期LPR下调10个基点 融资成本稳中有降

8月21日&#xff0c;中国人民银行授权全国银行间同业拆借中心发布&#xff0c;最新借款商场报价利率(LPR)为&#xff1a;1年期种类报3.45%&#xff0c;较上一期下降10个基点&#xff1b;5年期以上种类报4.20%&#xff0c;与前一期相等。 上海高忆私募基金&#xff08;百度搜索高…

2023深圳智博会,正运动助力智能装备“更快更准”更智能!

■展会名称&#xff1a; 2023 深圳国际智能装备产业博览会暨深圳国际电子装备产业博览会&#xff08;以下简称“EeIE 智博会”&#xff09; ■展会日期 2023年8月29日-31日 ■展馆地点 深圳国际会展中心(宝安新馆) ■展位号 3B030 正运动技术&#xff0c;作为国内领先的…

C++动态规划DP Dynamic Programming实现B3635 硬币问题B3636 文字工作

DP动态规划的基本手段及如何解决问题 1. 那带一个问题&#xff0c;只要解决几个对应的小一点规模的问题就能得到问题本身的解 2. 设计一张表格&#xff0c;每一个格子都是一个问题的解 3. 一步步完成这张表格&#xff0c;根据一个数据&#xff0c;往表格前面的数据查找 4. …

APT80DQ40BG-ASEMI低功耗半导体APT80DQ40BG

编辑&#xff1a;ll APT80DQ40BG-ASEMI低功耗半导体APT80DQ40BG 型号&#xff1a;APT80DQ40BG 品牌&#xff1a;ASEMI 封装&#xff1a;TO-3P 恢复时间&#xff1a;&#xff1e;50ns 正向电流&#xff1a;80A 反向耐压&#xff1a;400V 芯片个数&#xff1a;2 引脚数量…

java八股文面试[JVM]——类加载器

一、类加载器的概念 类加载器是Java虚拟机用于加载类文件的一种机制。在Java中&#xff0c;每个类都由类加载器加载&#xff0c;并在运行时被创建为一个Class对象。类加载器负责从文件系统、网络或其他来源中加载类的字节码&#xff0c;并将其转换为可执行的Java对象。类加载器…

Kaniko在containerd中无特权快速构建并推送容器镜像

目录 一、kaniko是什么 二、kaniko工作原理 三、kanijo工作在Containerd上 基于serverless的考虑&#xff0c;我们选择了kaniko作为镜像打包工具&#xff0c;它是google提供了一种不需要特权就可以构建的docker镜像构建工具。 一、kaniko是什么 kaniko 是一种在容器或 Kube…

机器学习基础11-算法比较(基于印第安糖尿病Pima Indians 数据集)

比较不同算法的准确度&#xff0c;选择合适的算法&#xff0c;在处理机器学习的问题时是非常重要的。本节将介绍一种模式&#xff0c;在scikit-learn中可以利用它比较不同的算法&#xff0c;并选择合适的算法。你可以将这种模式作为自己的模板&#xff0c;来处理机器学习的问题…

如何备份系统?很简单,2个方法教会你!

在计算机使用过程中&#xff0c;系统故障、病毒攻击、意外损坏等问题可能导致数据丢失和系统无法正常运行。为了保障数据安全和系统稳定&#xff0c;如何备份系统是至关重要的。本文将介绍备份系统的2个方法&#xff0c;帮助用户轻松备份系统&#xff0c;确保数据的安全和系统的…

什么是网络中的服务质量 (QoS),其相关技术和关键指标有哪些?

QoS&#xff08;Quality of Service&#xff0c;服务质量&#xff09;指一个网络能够利用各种基础技术&#xff0c;为指定的网络通信提供更好的服务能力&#xff0c;是网络的一种安全机制&#xff0c;是用来解决网络延迟和阻塞等问题的一种技术。QoS的保证对于容量有限的网络来…

MES管理系统解决方案,助力汽配企业打造透明化管理

随着汽车行业的不断发展&#xff0c;汽配行业面临着越来越严格的质量要求和生产效率提升挑战。为了满足这些需求&#xff0c;汽配企业需要实现生产过程的透明化和精细化。MES管理系统解决方案作为生产过程的核心管理系统&#xff0c;可以为汽配企业提供全面的解决方案&#xff…

ubuntu22.04安装搜狗输入法后始终无法输入中文

这次真的整我很久很久&#xff0c;我都不想用搜狗输入法了&#xff0c;结果无意间还是被我解决了。 ubuntu22.04安装搜狗输入法的步骤参考官网给的文档就行&#xff0c;这里我只说我的为啥输入不了中文 点击Fcitx配置 把搜狗输入法个人版放在第一位就行(我的系统语言是中文&am…

思维导图的作用有哪些?了解一下这几个作用

思维导图的作用有哪些&#xff1f;思维导图是一种以图形和颜色为主要表现形式的思维工具&#xff0c;它可以帮助人们更好地组织和表达思想。它的作用有很多&#xff0c;下面就给大家简单介绍一下。 1、帮助记忆 思维导图可以将大量信息整合到一个图形中&#xff0c;这有助于人…

一次由摔碎手机屏幕导致的急速搬家

点击文末“阅读原文”即可参与节目互动 剪辑、音频 / 卷圈 运营 / SandLiu 卷圈 监制 / 姝琦 封面 / 姝琦midjourney 产品统筹 / bobo 想问问大家&#xff0c;都在什么情况下搬过家&#xff1f; 有的时候搬家是迫不得已&#xff0c;房东突然发难&#xff1b; 有的时候搬…

MyBatis分页插件PageHelper的使用及MyBatis的特殊符号---详细介绍

一&#xff0c;分页的概念 分页是一种将大量数据或内容分割成多个页面以便逐页显示的方式。在分页中&#xff0c;数据被分割成一定数量的页&#xff0c;每页显示一部分数据或内容&#xff0c;用户可以通过翻页或跳分页是一种将大量数据或内容分割成多个页面以便逐页显示的方式。…

深入浅出 RPC框架

RPC 框架分层设计 01 基本概念 1.1 本地函数调用 以上步骤只是为了说明原理。事实上编译器经常会做优化&#xff0c;对于参数和返回值少的情况会直接将其存放在寄存器&#xff0c;而不需要压栈弹栈的过程&#xff0c;甚至都不需要调用call&#xff0c;而直接做inline操作 1.2 远…

Forrester首次面向中国的开源报告:阿里云在云原生领域开源布局最全面

Forrester 于近期发布了《Navigate The Cloud-Native Ecosystem In China, 2023》&#xff0c;报告概述了中国云原生领域的开源项目对构建云原生生态的促进作用&#xff0c;这些开源项目正深刻影响着企业的技术决策者以何种策略拥抱云原生这一现代 IT 基础设施的核心。 报告表…

SMC状态机 讲解2 从模型到SMC

SMC状态机 讲解2 从模型到SMC 1、实例化有限状态机&#xff08;FSM)2、简单转换 Simple Transition3、外部环回转换 External Loopback Transition4、内部环回转换 Internal Loopback Transition5、转换动作6、转换Guard7、转换参数8、Entry 和 Exit动作9、Push 转换10、Pop转换…

AI加持,创意设计效率百倍提升,探秘背后的数字化魔法

在当今创新潮流不断涌现的时代&#xff0c;人工智能正以惊人的速度和深度赋能各行各业&#xff0c;食品包装设计界也已来到了一个“拼创意、拼二创和拼审美”的时代。有了AI的加入&#xff0c;设计界正迎来一股AI创意风暴&#xff0c;不仅颠覆了设计流程&#xff0c;更为食品包…

机器学习教程(非常详细)从零基础入门到精通,看完这一篇就够了

一、机器学习的定义 从广义上来说&#xff0c;机器学习是一种能够赋予机器学习的能力以此让它完成直接编程无法完成的功能的方法。但从实践的意义上来说&#xff0c;机器学习是一种通过利用数据&#xff0c;训练出模型&#xff0c;然后使用模型预测的一种方法。 “训练”与“…