kafka消费者API

news2024/12/22 18:59:17

kafka的消费方式

pull(拉模式)

消费者采用从broker中主动拉去数据 kafka采用这种方式

push(推模式)

kafka没有采用这种方式,因为由broker决定消费发送速率。很难适应所有消费者

pull模式不足之处是,如果kafka没有数据。消费者还是会进行监听操作。

两者区别

由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理 了。消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是, push 模式下, 当broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了 。最终 Kafka 还是选取了传统的 pull 模式。

Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据 。 Push 模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推 送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪 费。 Pull 模式下, consumer 就可以根据自己的消费能力去决定这些策略。

Pull 有个缺点 是, 如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达 。为了避免这点, Kafka 有个参数可以让 consumer 阻塞知道新消息到达 ( 当然也可以阻塞知道 消息的数量达到某个特定的量这样就可以批量发送 )

消费者工作流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pVVYs8PI-1673513915321)(./assets/bde38b7cf3a44abbaf979ef2117125af.png)]

  1. 生产者生产消息到kafka集群中。集群将产生的消息放入不同的kafka的broker 每一个broker表示一个kafka节点 对应的分区中
  2. kafka的分区存在副本,leader和follower,进行消息的安全性(丢失,重复,顺序)
  3. 每一个消费者都属于一个消费者组。即使只有一个消费者。默认也是存在一个消费者组
  4. 消费者组绑定一个Topic主题。Topic下的存在多个分区。
  5. 每一个分区的数据只能给一个消费者消费。因为如果一个分区数据同时给同一个消费者组的两个消费者消费会造成数据的重复
  6. 一个消费者可以同时消费多个Topic中的多个分区数据。
  7. 一个topic中的数据可以给多个消费者或消费者组消费。对相同的数据存在不同的业务操作

分区的分配规则

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eMR1mi1a-1673513915322)(./assets/df1fca36d8914e0b9bc0c43f62ff2dc6.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-awMsEbtw-1673513915323)(./assets/b06c18cef2f4495985e303ebf279ef5b.png)]

代码展示消费者组下的消费者消费情况

生产者

public class KafkaProduce {

    public static void main(String[] args) {

        // 创建Kafka生产者配置对象
        Properties props = new Properties();

        // 给kafka配置对象添加配置信息
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "202.168.130.10:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建生产者 指定配置
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 循环发送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record =
                    new ProducerRecord<>("KafkaStudy", "HelloWord" + i);

            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if(e == null){
                        System.out.println("topic:"+metadata.topic()+"--patition:"+metadata.partition());
                    }
                }
            });
        }
        // 关闭生产者
        producer.close();

    }
}

消费者


@Component
public class KafkaConsumerGroup {

    // 该kafka注解 指定Topic 等信息去监听kafka中的消息
    // 这里进行指定 消费者组和Topic
    // 当我们同时指定多个消费者 指定相同的消费者组。此时这些消费者就是同一个组下的
    // 此时消费者就属于同一个消费组 每个消费者消费到不同分区的数据
    @KafkaListener(groupId = "groupIdTest", topics = "kafkaStudy")
    public void consumerOne(ConsumerRecord<String,String> record){
        // 获取 分区 偏移量 值 等kafka信息
        int partition = record.partition();
        long offset = record.offset();
        String value = record.value();

        // 打印信息
        System.out.println("consumerOne 分区:" + partition + "  consumerOne 偏移量:" + offset + "  consumerOne 值:" + value);

    }

    @KafkaListener(groupId = "groupIdTest", topics = "kafkaStudy")
    public void consumerTwo(ConsumerRecord<String,String> record){
        // 获取 分区 偏移量 值 等kafka信息
        int partition = record.partition();
        long offset = record.offset();
        String value = record.value();

        // 打印信息
        System.out.println("consumerTwo 分区:" + partition + "  consumerTwo 偏移量:" + offset + "  consumerTwo 值:" + value);

    }

    @KafkaListener(groupId = "groupIdTest", topics = "kafkaStudy")
    public void consumerThree(ConsumerRecord<String,String> record){
        // 获取 分区 偏移量 值 等kafka信息
        int partition = record.partition();
        long offset = record.offset();
        String value = record.value();

        // 打印信息
        System.out.println("consumerThree 分区:" + partition + "  consumerThree 偏移量:" + offset + "  consumerThree 值:" + value);

    }

}

结果展示

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DHycZo0u-1673513915323)(./assets/image-20230112101223634.png)]

消费者API操作

很多中间件在整合SpringBoot之后都会简化操作。

Kafka也是一样没整合之前的监听我们是需要通过手动创建监听,poll消息等操作

整合SpringBoot之后,提供了一个@KafkaListener注解

未使用@KafkaListener注解

public class CustomConsumer {

    public static void main(String[] args) {

        // 创建配置信息 这里与生产者一个意思
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.143:9092");
        props.put("group.id", "testGroup1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // kafka生产者指定读取配置信息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消费者订阅 指定的Topic 注意这里是可以订阅多个Topic的
        consumer.subscribe(Arrays.asList("first"));

        // 循环 拉去信息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

@KafkaListener注解

@KafkaListener的主要属性

  1. id:监听器的id

  2. groupId:消费组id

  3. idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId

  4. topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

  5. topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一) 类似于表达式 通过表达式来匹配多个Topic

  6. topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

常用操作

监听一个Topic

指定消费者组,指定监听Topic

    /**
     * 指定一个消费者组,一个主题主题。
     * @param record
     */
    @KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
    public void simpleConsumer(ConsumerRecord<String, String> record) {
        System.out.println("进入simpleConsumer方法");
        System.out.printf(
                "分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
                record.partition(),
                record.offset(),
                record.key(),
                record.value(),
                record.timestamp()
        );
    }

监听多个主题(Topic)

    /**
     * 指定多个主题。
     *
     * @param record
     */
    @KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
    public void topics(ConsumerRecord<String, String> record) {
        
    }

监听一个Topic,指定多个分区

    /**
     * 监听一个主题,且指定消费主题的哪些分区。
     * 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
            },
            concurrency = "2"
    )
    public void consumeByPattern(ConsumerRecord<String, String> record) {
       
    }

指定多个分区,指定起始偏移量消费消息

    /**
     * 指定多个分区从哪个偏移量开始消费。
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(
                            topic = IPAD_TOPIC,
                            partitions = {"0","1"},
                            partitionOffsets = {
                                    @PartitionOffset(partition = "2", initialOffset = "10"),
                                    @PartitionOffset(partition = "3", initialOffset = "0"),
                            }
                    )
            },
            concurrency = "10"
    )
    public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {
       
    }

监听多个主题,指定多个分区,指定起始偏移量消费消息

    /**
     * 指定多个主题。参数详解如上面的方法。
     * @param record
     */
    @KafkaListener(
            groupId = APPLE_GROUP,
            topicPartitions = {
                    @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                    @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
            },
            concurrency = "4"
    )
    public void topics2(ConsumerRecord<String, String> record) {
      
    }

指定多个kafka监听器

    /**
     * 指定多个消费者组。参数详解如上面的方法。
     *
     * @param record
     */
    @KafkaListeners({
            @KafkaListener(
                    groupId = APPLE_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = IPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset="5"))
                    },
                    concurrency = "3"
            ),
            @KafkaListener(
                    groupId = XM_GROUP,
                    topicPartitions = {
                            @TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
                            @TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset="5"))
                    },
                    concurrency = "3"
            )
    }
    )
    public void groupIds(ConsumerRecord<String, String> record) {
        System.out.println("groupIds");
        System.out.println("内容:" + record.value());
        System.out.println("分区:" + record.partition());
        System.out.println("偏移量:" + record.offset());
        System.out.println("创建消息的时间戳:" + record.timestamp());
    }

kafka偏移量

offset是指某一个分区的偏移量。

在Kafka0.9版本之前消费者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。

新版消费者不在保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“_consumer_offsets”

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ezkcjwtN-1673513915324)(./assets/image-20230112150315217.png)]

enable.auto.commit开启自动提交

Kafka 默认是定期帮你自动提交位移的(enable.auto.commit=true)。有时候,我们需要采用自己来管理位移提交,这时候需要设置 enable.auto.commit=false。

auto.commit.interval.ms 设置自动提交间隔 默认5s

auto.offset.reset

auto.offset.reset 值含义解释如下:

  1. ​ earliest :当各分区下有已提交的 Offset 时,从“提交的 Offset”开始消费;无提交的Offset 时,从头开始消费;
  2. ​ latest : 当各分区下有已提交的 Offset 时,从提交的 Offset 开始消费;无提交的 Offset时,消费新产生的该分区下的数
  3. ​ none : Topic 各分区都存在已提交的 Offset 时,从 Offset 后开始消费;只要有一个分区不存在已提交的 Offset,则抛出异常。

手动提交

因为自动提交当还未进行下次提交的时候,程序出现问题。提交失败就会导致下次消费数据重复或丢失数据等问题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yXQprP5j-1673513915324)(./assets/da8fd89cb9ed4155b89649a6c84371e0.png)]

设置手动提交偏移量

/**
 * 设置手动提交偏移量
 *
 * @param record
 */
@KafkaListener(
        topics = IPHONE_TOPIC,
        groupId = APPLE_GROUP,
        //3个消费者
        concurrency = "3"
)
public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {
    System.out.println("setCommitType");
    System.out.println("内容:" + record.value());
    System.out.println("分区:" + record.partition());
    System.out.println("偏移量:" + record.offset());
    System.out.println("创建消息的时间戳:" + record.timestamp());
    
    // 手动提交 偏移量
    ack.acknowledge();
}

注意

如果设置为自动 提交 enable.auto.commit=true

// 此时就不能在  方法中带入Acknowledgment 手动提交的类。
// 会抛出异常
public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/157971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 的 Vim,gcc,makefile 的使用

坚持看完&#xff0c;结尾有思维导图总结 这里写目录标题vimvim的安装vim的配置vim 的使用vim 的三种模式三种模式对应的命令通用命令模式底行模式gcc 和 g编译和执行预编译编译汇编链接过程make总结vim Vim 是Linux 中使用的编辑器&#xff0c;一般的程序要经历一个过程才能运…

Codeforces Round #841 (Div. 2) and Divide by Zero 2022(A~E)

A. Joey Takes Money给出一个序列a&#xff0c;每次操作可以选择两个数&#xff0c;将两个数分别改成与原数乘积相同的两个数&#xff0c;问最后得到的最大的数组和是多少。思路&#xff1a;乘积一定&#xff0c;和最大一定是与1相乘。则整个数组的积与n - 1个1的和就是最大值。…

探索云原生技术之容器编排引擎-Kubernetes/K8S详解(7)

❤️作者简介&#xff1a;2022新星计划第三季云原生与云计算赛道Top5&#x1f3c5;、华为云享专家&#x1f3c5;、云原生领域潜力新星&#x1f3c5; &#x1f49b;博客首页&#xff1a;C站个人主页&#x1f31e; &#x1f497;作者目的&#xff1a;如有错误请指正&#xff0c;将…

缓存(redis)与数据库(MYSQL)数据一致性问题

在MYSQL数据库集文章中&#xff0c;仔细的学习了一些MYSQL数据库的知识。但是&#xff0c;随着我们的业务越来越好&#xff0c;那么我们不可能直接去操作MYSQL数据库。因为直接去操作MYSQL终究会有比较多的I/O操作&#xff0c;而使整个系统的性能最终受到数据库I/O的制约而无法…

教外篇(6):C++ qrencode 实现二维码生成

系列文章目录 文章目录 系列文章目录前言一、qrencode库的基本使用二、BMP图片生成原理三、二维码生成四、放大图像、解决编码问题前言 该系列教程目录与说明可以查看这篇文章::C/C++教程 本文主要介绍如何使用C++来实现二维码的生成,使用到了开源库:qrencode 代码生成结…

C++入门--vector

目录 vector的介绍 vector的使用 对象的定义 遍历 reserve与resize insert与erase 迭代器失效 vector的模拟实现 vector的介绍&#xff1a; vector是表示可变大小数组的序列容器。 vector的使用&#xff1a; 对象的定义&#xff1a; void test_vector1() {vector<int…

ZYNQ图像-腐蚀膨胀笔记

大磊fpga 腐蚀 下图从左到右依次为a&#xff0c;b&#xff0c;c step1&#xff1a;将b中的黄色十字架在a中遍历 step2&#xff1a;当b的黄色方格在a中 没有碰到白色方格 时输出中心坐标 step3&#xff1a;将step2中所有输出的坐标涂成黄色&#xff0c;得出c图 膨胀 step1…

Redhat 7 安装 iftop软件

1.关闭subscription-manager vi /etc/yum/pluginconf.d/subscription-manager.conf enable 0 2.通过浏览器下载Centis-7.repo http://mirrors.aliyun.com/repo/Centos-7.repo 3.上传至/etc/yum.repos.d/ 4.修改Centos-7.repo文件 #cd /etc/yum.repos.d/ #ls #vim CentOS…

怎么看电脑是32位还是64位?超级简单的方法!

熟悉计算机的朋友都知道&#xff0c;电脑系统可以分为32位和64位系统。它们之间有什么区别&#xff1f;它们支持不同的内存&#xff1a;32位操作系统最多支持4G内存&#xff0c;但64位系统可以支持4G、8G、16G、32G、64G、128G、256G等。兼容软件也不同&#xff1a;32位只支持3…

老照片修复方法是什么?这篇文章来告诉你

我们每年回老家时&#xff0c;都喜欢看看以前的老物件&#xff0c;尤其是照片&#xff0c;因为它承载了我们一代又一代人的回忆&#xff0c;不管过去了多久&#xff0c;家里的长辈拿到一张照片时&#xff0c;都可以准确的说出当时在哪里&#xff1f;在做什么&#xff1f;由此引…

基于python和cv2、pytorch实现的车牌定位、字符分割、字符识别项目

问题描述车牌的检测和识别的应用非常广泛&#xff0c;比如交通违章车牌追踪&#xff0c;小区或地下车库门禁。在对车牌识别和检测的过程中&#xff0c;因为车牌往往是规整的矩形&#xff0c;长宽比相对固定&#xff0c;色调纹理相对固定&#xff0c;常用的方法有&#xff1a;基…

linux C -- 内存管理

链接: linux C学习目录 linux C 共享内存机制共享内存物理位置shared memory常用函数编程模型范例write.cread.c修改参数实验共享内存 二个或者多个进程,共享同一块由系统内核负责维护的内部内存区域其地址空间通常被映射到堆和栈之间无需复制信息,最快的一种IPC机制需要考虑同…

web应用 —— HTML

web应用 一、HTML 1.插件 1.Live Server 模拟网站服务器 2.Auto Rename Tag 自动修改标签对 3.设置settings-format-勾选Format On Save &#xff08;创建文件&#xff1a;File-Open Folder-新建文件夹-命名文件&#xff09; 2.html文档结构 html所有标签为树形结构&…

基于YOLOv5+C3CBAM+CBAM注意力的海底生物[海参、海胆、扇贝、海星]检测识别分析系统

在我前面的一些文章中也有用到过很多次注意力的集成来提升原生检测模型的性能&#xff0c;这里同样是加入了注意力机制&#xff0c;区别在于&#xff0c;这里同时在两处加入了注意力机制&#xff0c;第一处是讲CBAM集成进入原生的C3模块中&#xff0c;在特征提取部分就可以发挥…

Microsoft系统漏洞修复

近期收到服务器系统漏洞扫描&#xff0c;发现很多关于Microsoft本身的系统漏洞。 有很多新手不知道怎么去修复系统漏洞&#xff0c;害怕一旦修复出问题&#xff0c;自己要担责。 我这里讲解下怎么准备的去寻找漏洞&#xff0c;并把它修复的过程。 我已下列的漏洞为例&#x…

RK3588平台开发系列讲解(日志篇)syslog介绍

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、syslog介绍二、syslog的架构三、syslog日志组成四、syslog接口说明1、openlog2、syslog3、closelog五、syslog.conf接口说明1、selector2、level3、action4、示例沉淀、分享、成长,让自己和他人都能有所收获!&am…

计算机网络第三章

目录 1.数据链路层 1.数据链路层的基本概述 2.数据链路层的功能概述 3.封装成帧 4.差错控制 1.检错编码 2.纠错编码 5.流量控制 1.停止-等待协议 2.选择重传协议(SR) 3.后退N帧协议(GBN) 6.介质访问控制 1.静态划分信道(信道划分介质访问控制) 2.动态分配信道 7.局域网 8.链路…

【经验分享】美赛报名以及注册方法-以2023年美赛为例

首先点击COMAP的官网链接&#xff1a; https://www.comap.com/ 然后选择Contests目录下的MCM/ICM 选择 Learn More and Register 然后选择 Click here to register for the 2023 MCM/ICM contest 注册分为两个步骤&#xff1a;顾问&#xff08;指导教师&#xff09;注册和填…

uni-app中自定义TabBar

1.由于原生的tabBar不能做到事件的拦截处理所以才自定义 注意点&#xff1a;自定义tabBar后则原生的uni.switchTab(OBJECT)不能再使用了 第一步&#xff1a;需要把原生的tabBar注释掉 第二步&#xff1a;在components下新建TabBar.vue文件&#xff08;那个页面用那个页面引入…

RHCE-Web服务器在linux上的部署,了解hash算法以及常见的加密方式

目录 1.WEB服务器&#xff08;Web Server&#xff09; 浏览器 工作原理 常见状态码&#xff1a; www服务器的基本配置 2.web服务配置样例 3.了解hash算法以及常见的加密方式 hash算法&#xff1a; 常用HASH函数 处理冲突方法 常用hash算法的介绍&#xff1a; ssh协议…