kafka-消费者服务搭建配置简单消费(SpringBoot整合Kafka)

news2025/1/16 14:01:59

文章目录

  • 1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
  • 2、创建生产者发送消息
  • 3、application.yml配置
  • 4、创建消费者监听器
  • 5、创建SpringBoot启动类
  • 6、屏蔽 kafka debug 日志 logback.xml
  • 7、引入spring-kafka依赖

1、使用efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述在这里插入图片描述

2、创建生产者发送消息

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9095,192.168.748:9096,192.168.74.148:9097 --topic my_topic1
>1
>2
>3
>

在这里插入图片描述

[
  [
    {
      "partition": 1,
      "offset": 0,
      "msg": "1",
      "timespan": 1717592203289,
      "date": "2024-06-05 12:56:43"
    },
    {
      "partition": 1,
      "offset": 1,
      "msg": "2",
      "timespan": 1717592204046,
      "date": "2024-06-05 12:56:44"
    },
    {
      "partition": 1,
      "offset": 2,
      "msg": "3",
      "timespan": 1717592204473,
      "date": "2024-06-05 12:56:44"
    }
  ]
]

3、application.yml配置

server:
  port: 8120

# v1
spring:
  Kafka:
    bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097
    consumer:
      # read-committed读事务已提交的消息 解决脏读问题
      isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息
      # 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量
      enable-auto-commit: true # ??????offset
      # 消费者提交ack时多长时间批量提交一次
      auto-commit-interval: 1000
      # 消费者第一次消费主题消息时从哪个位置开始
      auto-offset-reset: earliest  #指定Offset消费:earliest | latest | none
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {

    @KafkaListener(topics ={"my_topic1"},groupId = "my_group1")
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println("消费者获取到消息:topic = "+ record.topic()
                +",partition:"+record.partition()
                +",offset = "+record.offset()
                +",key = "+record.key()
                +",value = "+record.value());
    }

}

5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaConsumerApplication.class, args);
    }

}

6、屏蔽 kafka debug 日志 logback.xml

<configuration>      
    <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="debug" />
</configuration>

7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.0.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <!-- Generated by https://start.springboot.io -->
    <!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn -->
    <groupId>com.atguigu</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-kafka-consumer</name>
    <description>spring-kafka-consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

此时启动SpringKafkaConsumerApplication,控制台会打印数据

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = null,value = 1
消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = null,value = 2
消费者获取到消息:topic = my_topic1,partition:1,offset = 2,key = null,value = 3

如果此时重新启动SpringKafkaConsumerApplication,控制台将不会打印数据,因为已经消费过数据

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.0.5)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1790935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VS2022,lib调用dll工程的一个函数

lib工程本身是一个静态库工程&#xff0c;没有链接器设置。然而&#xff0c;我们依然可以在lib工程中调用DLL工程中的函数&#xff0c;只需要确保头文件正确导入&#xff0c;并在最终使用lib的可执行文件项目中正确链接DLL的.lib文件。下面是一个详细的步骤说明&#xff1a; 假…

做自媒体素材哪里找?做自媒体必备的几个高质量素材网站分享

在自媒体的世界里&#xff0c;内容是王道。无论是视频还是文章&#xff0c;优秀的自媒体作品都需要有力的内容和高质量的素材作支撑。今天&#xff0c;我为大家整理了一些优质的素材网站&#xff0c;帮助每一位自媒体创作者&#xff0c;无论新手还是老手&#xff0c;都能找到适…

Vuforia AR篇(七)— 二维码识别

目录 前言一、什么是Barcode &#xff1f;二、使用步骤三、点击二维码显示信息四、效果 前言 在数字化时代&#xff0c;条形码和二维码已成为连接现实世界与数字信息的重要桥梁。Vuforia作为领先的AR开发平台&#xff0c;提供了Barcode Scanner功能&#xff0c;使得在Unity中实…

正则表达式运用

已经写了表达式&#xff0c;下一步就是匹配字符串得到结果 使用matcher的源码&#xff08;匹配&#xff09;普通方法&#xff0c;find&#xff08;寻找&#xff09;合适的代码&#xff0c;看字符串是否匹配成功 是否可以匹配上 匹配么&#xff0c;匹配就留下&#xff0c;fin…

DBeaver连接Elasticsearch

一、下载DBeaver 二、连接&#xff1a; 1、一定要选择开源的 Open Distro Elasticsearch 2、填写地址&#xff1a; 3、选择“URL”&#xff0c;将https改为http 否则会报SSL错误 4、测试连接

python学习笔记-05

函数 基本上所有的高级语言都支持函数&#xff0c;函数就是一种代码抽象的方式。之前所使用的len、print等都是python的内置函数。 1.初识函数 在编写程序过程中&#xff0c;如果一段代码经常出现&#xff0c;为了提高编写效率&#xff0c;将这类实现某个功能的代码作为一个…

【网络安全】Web安全基础 - 第二节:前置基础知识- HTTP协议,握手协议,Cookie及Session

本章节主要介绍一些基础知识 d(^_^o) HTTP协议 什么是HTTP 超文本传输协议&#xff08;HyperText Transfer Protocol&#xff09;是一种用于分布式、协作式和超媒体信息系统的应用层协议。 HTTP是一个基于请求与响应&#xff0c;无状态的&#xff0c;应用层协议&#xff0c;…

推荐系统学习 一

参考&#xff1a;一文看懂推荐系统&#xff1a;召回08&#xff1a;双塔模型——线上服务需要离线存物品向量、模型更新分为全量更新和增量更新_数据库全量更新和增量更新流程图-CSDN博客 一文看懂推荐系统&#xff1a;概要01&#xff1a;推荐系统的基本概念_王树森 小红书-CSD…

Web网站攻击技术

文章目录 Web应用体系结构脆弱性分析HTTP协议安全问题Cookie的安全问题 常见Web应用攻击及防范SQL注入攻击及防范SQL注入原理 防御注入漏洞跨站脚本(XSS)攻击及防范跨站脚本(XSS)攻击原理 跨站脚本攻击类型储存式XSS反射式XSSDOM式XSS Cookie欺骗及防范CSRF攻击及防范防御CSRF攻…

python图像识别库-pytesseract

内容目录 一、安装1.安装tesseract OCR1) MAC中安装2) Windows中安装3) 中文报下载 二、pytesseract的简单使用 pytesseract是python的一个用于图像提取的库, 它实际上是对Tesseract OCR引擎的封装。pytesseract使得在Python项目中调用Tesseract变得更加简便&#xff0c;主要用…

Python实现定时任务的方式

大家好&#xff0c;在当今数字化的时代&#xff0c;定时任务的需求在各种应用场景中频繁出现。无论是数据的定时更新、周期性的任务执行&#xff0c;还是特定时间点的操作触发&#xff0c;Python 都为我们提供了强大而灵活的手段来实现这些定时任务。当我们深入探索 Python 的世…

All-in-One WP Migration插件+汉化包+扩展优化版

下载地址&#xff1a;All-in-One WP Migration插件汉化包扩展优化版 此插件支持大量的 WordPress 主机&#xff0c;不用担心网站数据搬家不完全&#xff0c;它使用区块方式导入数据&#xff0c;可避免大多数主机的上传限制&#xff08;还原网站的时候&#xff09;。

C#WPF数字大屏项目实战04--设备运行状态

1、引入Livecharts包 项目中&#xff0c;设备运行状态是用饼状图展示的&#xff0c;因此需要使用livechart控件&#xff0c;该控件提供丰富多彩的图形控件显示效果 窗体使用控件 2、设置饼状图的显示图例 通过<lvc:PieChart.Series>设置环状区域 3、设置饼状图资源样…

Ubuntu系统配置DDNS-GO【笔记】

DDNS-GO 是一个基于 Go 语言的动态 DNS (DDNS) 客户端&#xff0c;用于自动更新你的 IP 地址到 DNS 记录上。这对于经常变更 IP 地址的用户&#xff08;如使用动态 IP 的家庭用户或者小型服务器&#xff09;非常有用。 此文档实验环境为&#xff1a;ubuntu20.04.6。 在Ubuntu…

bison flex 实现tiny语言的编译器

bison flex 实现tiny语言的编译器 项目地址&#xff1a;tiny-compiler 完成了词法分析&#xff0c;语法分析&#xff0c;中间代码生成&#xff0c;虚拟机执行&#xff0c;没有进行类型检查、错误处理和中间代码优化。 词法分析 %{ #include <iostream> #include "…

STM32——hal_SPI_(介绍)

SPI&#xff08;串行外围设备接口&#xff09;是一种高速的、全双工、同步的通信协议&#xff0c;通常用于短距离通信&#xff0c;尤其是在嵌入式系统中与各种外围设备进行通信。SPI接口由摩托罗拉公司推出&#xff0c;由于其简单和灵活的特性&#xff0c;它被广泛用于多种应用…

运行软件缺失vcruntime140.dll怎么办?vcruntime140.dll缺失的详细解决方法分享

vcruntime140.dll 是一个动态链接库文件&#xff0c;它是 Microsoft Visual C Redistributable Package 的一部分&#xff0c;为使用 Visual C 编译器开发的应用程序提供必要的运行时环境。该文件包含了大量应用程序运行时需要调用的库函数&#xff0c;这些函数是实现 C 标准库…

基于GFlowNets的蚁群抽样组合优化

本文将基于GFACS论文&#xff0c;探讨其核心思想、技术细节以及在实际应用中的优势。 GFlowNet&#xff1a;摊销MCMC成本的有效工具 GFACS的核心是GFlowNet&#xff0c;它通过训练学习状态转移的概率分布&#xff0c;从而替代传统的MCMC采样方法。GFlowNet的优势在于&#xff1…

真实场景 这周的任意一天,获取上周一到周日的时间范围-作者:【小可耐教你学影刀RPA】

用户场景 我想在这周的任意一天&#xff0c;获取上周一到周日的时间范围&#xff0c;应该怎么做 解决办法1 用指令解决 最简单 解决办法2 自己写逻辑 不过要用到 获取当前日期指令 当前是礼拜几