Kafka的Offset(偏移量)详解

news2024/11/15 10:35:36

Kafka的Offset详解

  • 1、生产者Offset
  • 2、消费者Offset
    • 2.1、消费者
    • 2.2、生产者
    • 2.3、实体类对象
    • 2.4、JSON工具类
    • 2.5、项目配置文件
    • 2.6、测试类
    • 2.7、测试
    • 2.8、总结

1、生产者Offset

在这里插入图片描述
在这里插入图片描述

2、消费者Offset

在这里插入图片描述

2.1、消费者

package com.power.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EventConsumer {

    /**
     * topics 用于指定从哪个主题中消费消息
     * concurrency 用于指定有多少个消费者
     * @param record
     */
    @KafkaListener(topics = {"offSetTopic"}, groupId = "offSetGroup")
    public void onEventA(ConsumerRecord<String, String> record) {
        System.out.println(Thread.currentThread().getId()+"---> 消费消息 record = " + record);
    }
}

2.2、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent(){
        for (int i = 0; i < 2; i++) {
            User user = User.builder().id(i).phone("1567676767"+i).birthday(new Date()).build();
            String userJson = JSONUtils.toJSON(user);
            kafkaTemplate.send("offSetTopic","k"+i, userJson);
        }
    }

}

2.3、实体类对象

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

2.4、JSON工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

2.5、项目配置文件

spring:
  application:
    #应用名称
    name: spring-boot-06-kafka-offset

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

    #配置消费者的反序列化
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.6、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot07KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendInterceptor(){
        eventProducer.sendEvent();
    }

}

2.7、测试

  • 先启动生产者,会发送两条消息到kafka服务器

  • 再启动消费者监听,此时我们发现,启动后的消费者并不会监听到生产者已发送的两条消息

  • 在kafka安装目录的bin文件夹下执行命令:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe
  • 根据命令结果:查看kafka消费者的偏移量offset,我们发现当前消费者偏移量CURRENT-OFFSET值为2 ,当前日志记录的生产者消息偏移量LOG-END-OFFSET值为2,消费者偏移量和日志记录的生产者消息偏移量差值LAG值为0 ,所以消费者查询不到生产者发送的消息。

在这里插入图片描述

  • 关闭消费者,再次使用生产者发送消息,再次执行命令查看消费者偏移量

在这里插入图片描述

  • 此时我们发现消费者偏移量为4,日志记录的偏移量为6,两者差值为2,此时启动消费者,读取到了差值为2的数据

2.8、总结

在这里插入图片描述

  • 消费者从什么地方开始消费,就看消费者的offset是多少,消费者启动后他的offset是多少。
  • 消费者offset是多少,可以通过命令查看
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group offSetGroup --describe

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2074617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android - 自定义view

为什么要自定义view&#xff1f; 在Android开发中有很多业务场景&#xff0c;原生的控件无法满足需求&#xff0c;并且经常也会遇到一个UI在多处重复使用情况&#xff0c;于是可以通过自定义View的方式来实现这些UI效果。 自定义view的分类 自定义属性 Window window是一个…

图数据库查询语言 cypher 与 memgraph

Cyper 作为声明式查询语言, SQL 在计算机行业无人不晓, 无人不知. 而 Cypher 就是 Graph Database 图数据库的 SQL. Cypher 用"圆括号"来表示节点, 用"方括号,连接线及箭头"表示关系 这样一句话 - "Sally likes Graphs. Sally is friends with John. …

完成控制器方法获取参数-@RequestParam

文章目录 1.将方法的request和response参数封装到参数数组1.SunDispatcherServlet.java1.根据方法信息&#xff0c;返回实参列表2.具体调用 2.测试 2.封装Http请求参数到参数数组1.自定义RequestParam注解2.MonsterController.java 增加参数3.SunDispatcherServlet.java1.resol…

软件架构的发展经历了从单体结构、垂直架构、SOA架构到微服务架构的过程剖析

1.单体架构 特点: 1、所有的功能集成在一个项目工程中。 2、所有的功能打一个war包部署到服务器。 3、应用与数据库分开部署。 4、通过部署应用集群和数据库集群来提高系统的性能。 优点: 1、项目架构简单,前期开发成本低,周期短,小型项目的首选。 缺点: 1、全部…

c++实现mysql关系型数据库连接与增删改查操作

最近老师让我实现这个功能&#xff0c;顺便发个东西&#xff0c;我感觉mysql从入门到精通这本书写的蛮好的&#xff0c;其实连接数据库就是调用mysql-c-api库里面的函数mysql_real_connect,下来的增删改查&#xff0c;也无非就是cmd命令台里面的语句&#xff0c;插入&#xff1…

Javaweb学习之Vue实践小界面(四)

目录 前情回顾 本期介绍 效果图 第一步&#xff1a;前期工作 第二步&#xff1a;建立页眉 效果图 第三步&#xff1a;建立导航栏 效果图 第四步&#xff1a;主要内容放置 效果图 第五步&#xff1a;建立页脚 效果图 综合&#xff1a;将文字和背景更改成自己喜欢的…

PEM燃料电池启停控制策略优化的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 PEM燃料电池启停控制策略优化的simulink建模与仿真。 1.燃料电池提供是燃料转换为电能和热能的装置。 2.功率的输出的改变通过很多因素&#xff0c;如温度&#xff0c;压力…

谷歌、火狐及Edge等浏览器如何使用allWebPlugin中间件响应ActiveX插件事件

allWebPlugin简介 allWebPlugin中间件是一款为用户提供安全、可靠、便捷的浏览器插件服务的中间件产品&#xff0c;致力于将浏览器插件重新应用到所有浏览器。它将现有ActiveX控件直接嵌入浏览器&#xff0c;实现插件加载、界面显示、接口调用、事件回调等。支持Chrome、Firefo…

模型 OGSM(战略规划)

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。目标引领&#xff0c;策略驱动&#xff0c;量化衡量。 1 OGSM模型的应用 1.1 电商企业年度增长战略 某电商企业面临激烈的市场竞争&#xff0c;决定运用OGSM模型来规划其年度战略&#xff0c;以实现…

代码随想录Day 25|回溯篇完结,题目:491.递增子序列、46、全排列、47.全排列Ⅱ

提示&#xff1a;DDU&#xff0c;供自己复习使用。欢迎大家前来讨论~ 文章目录 第七章 回溯算法part05一、题目题目一&#xff1a;491.递增子序列解题思路&#xff1a;回溯三部曲优化 题目二&#xff1a;46.全排列[46. 全排列](https://leetcode.cn/problems/permutations/)解…

日撸Java三百行(day34:图的深度优先遍历)

目录 一、深度优先搜索 二、图的深度优先遍历 三、代码实现 总结 一、深度优先搜索 深度优先搜索&#xff08;Depth First Search&#xff1a;DFS&#xff09;是一种用于遍历树或图的算法&#xff0c;具体来说就是从起始节点开始&#xff0c;沿某一分支路径不断深入&#…

Linux内核定时器、阻塞_非阻塞IO

一.内核时间管理 Linux 内核中有大量的函数需要时间管理,比如周期性的调度程序、延时程序、对于我们驱动编写者来说最常用的定时器。硬件定时器提供时钟源,时钟源的频率可以设置, 设置好以后就周期性的产生定时中断,系统使用定时中断来计时。中断周期性产生的频率就是系统频率…

吴恩达谈AI未来:Agentic Workflow、推理成本下降与开源的优势

近年来&#xff0c;人工智能&#xff08;AI&#xff09;领域的发展势如破竹&#xff0c;然而随着技术的普及&#xff0c;市场也开始出现对AI泡沫的质疑声。2024年8月&#xff0c;AI领域的权威专家吴恩达&#xff08;Andrew Ng&#xff09;在与ARK Invest的对谈中&#xff0c;分…

利用Matlab求解高阶微分方程(ode45)

1、高阶微分方程的基本概念 二阶以及二阶以上的微分方程称之为高阶微分方程&#xff0c;一般来说&#xff0c;微分方程的阶数越高&#xff0c;求解的难度也就越大。求高阶方程的一个常用方法就是降低阶数。对二阶方程 &#xff0c;如果能用变量代换把它化成一阶方程&#xff0c…

【Tesla FSD V12的前世今生】从模块化设计到端到端自动驾驶技术的跃迁

自动驾驶技术的发展一直是全球汽车行业的焦点&#xff0c;Tesla的Full-Self Driving&#xff08;FSD&#xff09;系统凭借其持续的技术革新和强大的数据支持&#xff0c;在这个领域独占鳌头。本文将深入介绍Tesla FSD V12的演进历史&#xff0c;从自动驾驶的基础概念入手&#…

【XML详解】

XML基本概念 XML&#xff08;全称EXtensible Markup Language&#xff0c;可扩展标记语言&#xff09;&#xff1a;是一种用于存储和传输数据的标记语言&#xff0c;通过标签&#xff08;tags&#xff09;来定义数据的结构和含义。数据格式&#xff1a;XML本质上是一种数据的格…

【异常错误】pycharm可以在terminal中运行,但是无法在run中运行(没有输出错误就停止了)

问题&#xff1a; pycharm的命令可以在terminal中运行&#xff0c;但是复制到无法在run中运行&#xff08;没有输出错误就停止了&#xff09; run中运行后什么错误提示都没有 搞不懂为什么 解决&#xff1a; 降低run中batch-size的大小&#xff0c;即可以运行 我并没有观察到…

视频在线去水印解析相册怎么弄,轻松掌握五大技巧

在当前短视频流行的时代&#xff0c;我们常常需要下载一些短视频来进行剪辑或分享&#xff0c;但视频中的水印却成了一个不小的烦恼。为了帮助大家解决这个问题&#xff0c;本文将介绍五款高效的短视频去水印免费软件&#xff0c;让你轻松告别水印烦恼。 工具一&#xff1a;奈…

在VB.net中,LINQ有什么方法与属性

标题 在VB.net中&#xff0c;LINQ有什么方法与属性 正文 在VB.NET中使用LINQ&#xff08;Language Integrated Query&#xff09;&#xff0c;你可以利用一系列的方法和属性来查询和操作内存中的集合&#xff08;如数组、列表等&#xff09;以及数据库等数据源。LINQ提供了丰富…

Python相关系数导图

&#x1f3af;要点 量化变量和特征关联绘图对比皮尔逊相关系数、斯皮尔曼氏秩和肯德尔秩汽车性价比相关性矩阵热图大流行病与资产波动城镇化模型预测交通量宝可梦类别特征非线性依赖性捕捉向量加权皮尔逊相关系数量化图像相似性 Python皮尔逊-斯皮尔曼-肯德尔 皮尔逊相关系…