【kafka实战】03 SpringBoot使用kafka生产者和消费者示例

news2025/1/15 14:04:43

本节主要介绍用SpringBoot进行开发时,使用kafka进行生产和消费

一、引入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.6</version>
    </dependency>
</dependencies>

二、添加topic

使用offset explore软件,添加一个topic
在这里插入图片描述

三、配置文件

server:
  port: 8080
spring:
  kafka:
    consumer:
      # Kafka服务器
      bootstrap-servers: 192.168.56.201:9092
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #auto-commit-interval: 2s
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)
      # none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      #key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
      # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
      # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
      # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 100
    properties:
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      max:
        poll:
          interval:
            ms: 600000
      # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
      session:
        timeout:
          ms: 10000
    listener:
      # 在侦听器容器中运行的线程数,一般设置为 机器数*分区数
      concurrency: 4
      # 自动提交关闭,需要设置手动消息确认
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
      missing-topics-fatal: false
      # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
      poll-timeout: 600000

    producer:
      # Kafka服务器
      bootstrap-servers: 192.168.56.201:9092
      # 发生错误后,消息重发的次数,开启事务必须设置大于0。
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: all
      # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 生产者内存缓冲区的大小。
      buffer-memory: 1024000
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

四、生产者代码

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendMessageController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("send")
    public String send() {
        for (int i = 0; i < 100; i++) {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setAddress("成都市高新区");
            orderInfo.setOrderId(String.valueOf(i));
            orderInfo.setProductName("华为P60:" + i);
            kafkaTemplate.send("order.topic", "order:" + i, new Gson().toJson(orderInfo));
        }
        return "success";
    }
}

五、消费者代码

消费者代码采用手动ack的方式

@Slf4j
@Component
public class KafkaOrderConsumer {
    @KafkaListener(topics = "order.topic", groupId = "orderGroup")
    public void consumeOrder(ConsumerRecord<String, String> record, Acknowledgment ack) {
        log.info("消费订单消息key={},value={}", record.key(), record.value());
        ack.acknowledge();
    }
}

代码git仓库:https://gitee.com/syk1234/mqdmo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1031859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

降低半导体金属线电阻的沉积和蚀刻技术

铜的电阻率取决于其晶体结构、空隙体积、晶界和材料界面失配&#xff0c;这在较小的尺度上变得更加重要。传统上&#xff0c;铜(Cu)线的形成是通过使用沟槽蚀刻工艺在低k二氧化硅中蚀刻沟槽图案&#xff0c;然后通过镶嵌流用Cu填充沟槽来完成的。不幸的是&#xff0c;这种方法产…

prometheus+process_exporter进程监控

一、需要监控进程的服务器上配置 1、进入到临时工作目录&#xff0c;传入process_exporter包 [root Nginx1 ~]# cd work/ [root Nginx1 work]# rz 2、解压&#xff0c;并移动至/usr/local/目录下 [root Nginx1 work]# tar xzf process-exporter-0.7.5.linux-amd64.tar.gz [root…

错过成考报名,今年你还有这两种方式升学!

2023年广东成人高考已经报名结束啦 错过报名或没有抢到考位的同学不用伤心 你还有另外两个提升学历的机会 开放大学or小自考 今天一起来了解一下吧~ 什么是开放大学&#xff1f; 开放教育其实也就是开放大学&#xff0c;也就是我们所说的中央广播电视大学&#xff0c;现在…

无源供电无线测温系统的应用意义

电力系统设备在长期的运行中&#xff0c;往往会产生老化或过热现象&#xff0c;如果没有及时发现和解决&#xff0c;可能会造成严重的火灾事故。由于变电站设备地理位置偏远&#xff0c;对于其维护和监控&#xff0c;管理人员不能做到面面俱到&#xff0c;巡检和维护的难度较大…

气体放电模拟装置中1Pa~101kPa范围内的真空度控制技术

摘要&#xff1a;针对微间隙气体放电特性分析中需要对不同真空压力进行精密控制的要求&#xff0c;本文提出了相应的解决方案。解决方案采用了双路调节技术&#xff0c;由真空计、电控针阀和真空压力控制器组成进气和排气控制回路&#xff0c;可实现真空度1Pa~101kPa全量程范围…

20230919后台面经整理

1.你认为什么是操作系统&#xff0c;操作系统有哪些功能 os是&#xff1a;管理资源、向用户提供服务、硬件机器的扩展 1.进程线程管理&#xff1a;状态、控制、通信等 2.存储管理&#xff1a;分配回收、地址转换 3.文件管理&#xff1a;目录、操作、磁盘、存取 4.设备管理&…

ctf做题小技巧

1 先打开一张图片 这是一道简单的题&#xff0c;猜猜图片中的人是谁。 直接用百度搜索 上传照片就可以知道她是刘亦菲了。 2 点击链接打开一张图片&#xff0c;比如 点击打开链接 就会得到一张图片&#xff0c;然后用notepad打开 可以看到下面的一段转义序列&#xff0c;用…

Python计算机二级基本操作题和简单应用题

基本操作题1-13 这里使用 jieba.lcut()分割后默认使用的换行符&#xff0c;会一行一行的分开&#xff0c;需要加入 end 强行变成一行输出。 简单应用题1-10 1. 2&#xff0c; 3.

ImportError: cannot import name ‘OrderedDict‘ from ‘typing‘

唉&#xff0c;先给大家讲个故事听&#xff01;由于小张昨天被迫需要将Anaconda环境迁移至一个新的磁盘&#xff0c;在博客上查了超级多的资料&#xff0c;终于把环境迁移成功了&#xff0c;但这个时候我的python项目在选择解释器时&#xff0c;却一直出错&#xff0c;一直显示…

基于云服务器 EC2 的云上堡垒机的设计和自动化实现

背景 在很多企业的实际应用场景中&#xff0c;特别是金融类的客户&#xff0c;大部分的应用都是部署在私有子网中&#xff0c;如何能够让客户的开发人员和运维人员从本地的数据中心中安全的访问云上资源&#xff0c;堡垒机是一个很好的选择。传统堡垒机的核心实现原理是基于 S…

好用到哭!没想到听书神器这么适合我~

名称&#xff1a;听书神器 适用&#xff1a;安卓 好处&#xff1a;全网资源&#xff0c;无论是热门小说、经典文学&#xff0c;只要能搜索到的&#xff0c;这里都可以听 30专业主播朗读&#xff0c;可以在优美的声音中享受阅读的乐趣&#xff01; 可听网页、听本地文件、听…

指针进阶2(内含库函数qsort的模拟实现)

指针进阶2 函数指针数组 之前给大家介绍过函数指针的相关知识&#xff0c;下面我们进一步讲解一下指针的相关知识&#xff1a;喜欢的小伙伴可以给追秋点点关注&#xff0c;三连走一波&#xff01;&#xff01;&#xff01; 我们学习了函数指针数组之后&#xff0c;那肯定有朋…

Python入门自学进阶-Web框架——42、Web框架了解-bottle、flask

WEB框架的三大组件&#xff1a;路由系统、控制器&#xff08;含模板渲染&#xff09;、数据库操作 微型框架&#xff1a;依赖第三方写的socket&#xff0c;WSGI&#xff0c; 本身功能少 安装&#xff1a; pip install bottle pip install flask 安装flask&#xff0c;同时安…

低代码助力企业数字化转型

在当今这个数字化快速发展的时代&#xff0c;企业面临的竞争越来越激烈&#xff0c;数字化转型已成为企业发展的必经之路。低代码平台作为一种新型的开发工具&#xff0c;正在逐渐成为企业数字化转型的重要助力。本文将从数字化转型背景、低代码平台介绍、低代码平台的应用、低…

MySQL远程登录提示Access denied的场景

厂商给的某个MySQL库&#xff0c;通过客户端远程登录&#xff0c;提示这个错误&#xff0c; Access denied for user 用户名IP (using password: YES) 确认输入的账号密码都是正确的&#xff0c;出现这个错误说明端口是通的。 此时可以检索mysql.user&#xff0c;如果待登录账号…

渗透测试中的前端调试(一)

前言 前端调试是安全测试的重要组成部分。它能够帮助我们掌握网页的运行原理&#xff0c;包括js脚本的逻辑、加解密的方法、网络请求的参数等。利用这些信息&#xff0c;我们就可以更准确地发现网站的漏洞&#xff0c;制定出有效的攻击策略。前端知识对于安全来说&#xff0c;…

day03_基础语法

今日内容 零、复习昨日 一、Idea安装&#xff0c;配置 二、Idea使用 三、输出语句 四、变量 五、数据类型 附录: 单词 零、 复习昨日 1 装软件(typora,思维导图) 2 gpt(学会让他帮你解决问题) 3 java发展(常识) 4 HelloWorld程序 5 编码规范 6 安装jdk,配置环境变量 电脑常识 任…

计算机竞赛 深度学习YOLOv5车辆颜色识别检测 - python opencv

文章目录 1 前言2 实现效果3 CNN卷积神经网络4 Yolov56 数据集处理及模型训练5 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习YOLOv5车辆颜色识别检测 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0…

【CNN-FPGA开源项目解析】03--单格乘加运算单元PE 单窗口卷积块CU 模块

03–单格乘加运算单元PE & 单窗口卷积块CU 文章目录 03--单格乘加运算单元PE & 单窗口卷积块CU前言单格乘加运算单元PE代码模块结构时序逻辑分析对其上层模块CU的要求 单窗口卷积块CU代码逻辑分析 前言 ​ 第一和第二篇日志已经详细阐述了"半精度浮点数"的加…

Unity截图生成图片 图片生成器 一键生成图片

使用Unity编辑器扩展技术实现快速截图功能 效果&#xff1a; 里面没有什么太难的技术&#xff0c;直接上源码吧 注意&#xff01;代码需要放在Editor文件下才能正常运行 using System; using UnityEditor; using UnityEngine;[ExecuteInEditMode] public class Screenshot …