kafka之消费者进阶

news2025/1/22 8:21:15

一、几个概念

1. 消费者组

消费者组:一个消费者组包含多个消费者。同一个消费组的消费者,分别消费不同的partition,便于加快消费。

kafka约定在一个消费者组中,对于同一个topic,每个consumer会分配不同partition,即topic下的一个patition只能被同一个消费者组的一个消费者消费,所以当消费组中的消费者个数大于partition个数时,会存在消费者闲置的情况,因为分不到partition.

2. 点对点(P2P,point to point)和发布订阅模型(Publish/Sub)

消息中间件模型有两种经典模型:点对点(P2P,point to point)和发布订阅模型(Publish/Sub)

点对点模式是基于队列,消息生产者将消息发送到队列,消息消费者从队列拉取消息

发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题订阅消息

 kafka通过消费者和消费者组的契合可以实现点对点(P2P,point to point)和发布订阅模型(Publish/Sub):

如消费者都属于同一个消费组,那么partition的消息发给这些消费者时,一条消息只发给一个消费者,不会多发,相当于点对点模式

消费者隶属于不同消费组,那么同一个partition可能发给不同消费者组的多个消费者手中,相当于发布订阅模式;

3. 消费者再均衡:

同一个消费组内的消费者消费不同分区,当消费者组内再增加消费者时,原来的消费者对应的partition会被回收掉,然后重新分配给最新的所有消费者,这就是再均衡。

        consumer.subscribe(Collections.singletonList("topic-module"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.err.println("回收partitions:"+partitions);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.err.println("再分配partitions:"+partitions);
            }
        });

二、消费者多线程模型

kafka的consumer是非线程安全的,如果多个线程操作一个consumer实例,会报异常;因为KafkaConsumer中定义了一个acquire方法用来检测是否只有一个线程在操作,当检测到有其他线程时,会抛出ConcurrentModifactionException;

KafkaConsumer在执行所有动作时都会先执行acquire方法检测是否线程安全。

不过要实现消费者的多线程模型,也是有办法的

1. 多线程模型1

 一个partition对应一个consumer,一个consumer也只运行在一个线程中。

首先新建一个 consumer线程类KafkaConsumerMultiThread1.java

public class KafkaConsumerMultiThread1 implements Runnable {

    private KafkaConsumer<String, String> consumer;

    private volatile boolean isRunning = true;

    private String threadName;

    private static AtomicInteger num = new AtomicInteger(0);

    public KafkaConsumerMultiThread1(Properties properties, String topic) {
        this.consumer = new KafkaConsumer<String, String>(properties);
        this.consumer.subscribe(Collections.singletonList(topic));
        this.threadName = "consumer-thread-" + num.getAndIncrement();
        System.err.println(this.threadName + " started ");
    }

    @Override
    public void run() {
        try{
            while (isRunning){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for(TopicPartition topicPartition : records.partitions()){
                    List<ConsumerRecord<String, String>> consumerRecords = records.records(topicPartition);
                    int size = consumerRecords.size();
                    for(int i=0; i<size; i++){
                        ConsumerRecord<String, String> record = consumerRecords.get(i);
                        String value = record.value();
                        long messageOffset = record.offset();
                        System.err.println("当前消费者:"+ threadName
                                + ",消息内容:" + value
                                + ", 消息的偏移量: " + messageOffset
                                + "当前线程:" + Thread.currentThread().getName());
                    }
                }
            }
        }finally {
            if(consumer != null){
                consumer.close();
            }
        }
    }

    public boolean isRunning() {
        return isRunning;
    }

    public void setRunning(boolean running) {
        isRunning = running;
    }
}

 然后再创建一个用于生成consumer线程的线程池:

public class MultiThreadTest {

    public static void main(String[] args) {

        Properties properties = new Properties();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-module");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // 改成手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        int coreSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(coreSize);
        for(int i=0; i<coreSize; i++){
            executorService.execute(new KafkaConsumerMultiThread1(properties, "topic-module"));
        }


    }
}

之后在生产者端发送消息,可以观察到有五个线程共同处理发来的消息:

2. 消费者多线程模型2

 

采用Master-worker模型(master负责分发,worker负责处理)。master初始化阶段,创建多个worker实例,然后master负责拉取消息,并负责分发给各个worker执行。各个worker执行完成后,将执行结果回传给master,由master统一进行commit。

这里的master为consumer,在 consumer master 中,会有多个任务队列,用来接收生产者端的消息,consumer会创建多个worker实例,并且将任务队列中的消息交给每个队列对应的worker对象处理,,worker处理完成的结果放到任务结果集中,然后master单线程最后做 consumer.commitSync()或者consumer.commitAsync()提交操作。这里的consumer作为master是单线程的,worker是多线程的。

这样避免了多个线程操作consumer,避免出现异常ConcurrentModifacationException

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/522255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity搭建VR全景图

VR全景图片显示和相机旋转 ** 如果需要内置面材质球文件&#xff0c;可以私信下我 ** 场景构建 创建项目后拖进所需文件 文件有内置面材质球、图片等 创建文件 拖拽内置面材质球进入场景&#xff0c;并设置相机在球内部 再创建一个材质球&#xff0c;命名和图片相同 选…

springcloud+springboot+vue学生信息管理系统(选课,成绩,奖惩,奖学金,缴费)xnt81

后端语言&#xff1a;Java 框架&#xff1a;springcloudspringboot 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 学生信息管理系统主要实现角色有管理员和学生,教师,管理员在后台管理学生模块、学籍信息模块、选择课程模块、用户表模块、收藏表模块、课程信息模块…

国产航顺HK32F030M: 448byte EEPROM

EEPROM (~Electrically Erasable Programmable read only memory~)是指带电可擦可编程只读存储器。是一种掉电后数据不丢失的存储芯片。 HK32F030M用户手册V1.1.9.pdf bsp_eeprom.c #include "bsp_eeprom.h"/*****************************************************…

springboot+vue房产销售平台(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的房产销售平台。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 &#x1f495;&#x1f495;作者&#xff1a;风歌&a…

切换以太网接口MAC地址的批处理脚本

MAC&#xff08;媒体访问控制&#xff09;地址是网络设备的唯一标识符&#xff0c;用于网络通信。MAC 地址通常在设备制造时被分配并写入硬件。然而&#xff0c;在某些情况下&#xff0c;你可能需要或想要更改设备的 MAC 地址。以下是一些可能的理由&#xff1a; 1. **隐私和安…

Linux期末复习总结

一、Linux基础及安装 LINUX是在UNIX基础上开发,具有UNIX全部功能。 **Linux特点&#xff1a;**开放性、多用户、多任务、出色的稳定性、良好的用户界面、设备独立性、丰富的网络功能、安全性、可移植性 Linux由4个主要部分组成&#xff1a;内核、Shell、文件系统、应用程序 …

【Python入门篇】——Python中循环语句(while循环的基础语法和基础案例)

作者简介&#xff1a; 辭七七&#xff0c;目前大一&#xff0c;正在学习C/C&#xff0c;Java&#xff0c;Python等 作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; Python入门&#xff0c;本专栏主要内容为Python的基础语法&#xff0c;Python中的选择循环语句…

如何获得高并发经验?看这篇文章就够了

如何获得高并发经验&#xff1f; 这是我今天逛知乎的时候系统邀请我回答的一个问题&#xff0c;由此也引发了我的一些思考&#xff1a;为什么人人都想要获得高并发经验&#xff1b;想拥有高并发系统设计技能&#xff1f; 其原因LZ认为主要有以下三点&#xff1a; 涨薪&#x…

MD-MTSP:麻雀搜索算法SSA求解多仓库多旅行商问题(提供MATLAB代码,可以修改旅行商个数及起点)

一、多仓库多旅行商问题 多旅行商问题&#xff08;Multiple Traveling Salesman Problem, MTSP&#xff09;是著名的旅行商问题&#xff08;Traveling Salesman Problem, TSP&#xff09;的延伸&#xff0c;多旅行商问题定义为&#xff1a;给定一个&#x1d45b;座城市的城市集…

【C++】内联函数----inline函数的详细使用教程

&#x1f339;作者:云小逸 &#x1f4dd;个人主页:云小逸的主页 &#x1f4dd;Github:云小逸的Github &#x1f91f;motto:要敢于一个人默默的面对自己&#xff0c;强大自己才是核心。不要等到什么都没有了&#xff0c;才下定决心去做。种一颗树&#xff0c;最好的时间是十年前…

分享一个一键换肤按钮(超酷)

先上效果图&#xff1a; 效果图中转换为黑夜会有一个宇航员小猴子飞上来&#xff0c;途中没有截到。。。。 废话不多说&#xff0c;上代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title…

通过VS开发人员命令提示符(developer command prompt)查看类

1.首先打开开始菜单栏&#xff0c;找到相应的VS版本。这里以VS2022为例 2.找到developer command prompt选项&#xff0c;点击进入 3.进入控制台&#xff0c;进入源文件所在的盘符&#xff08;这里以D盘为例&#xff0c;如果是默认C盘可以不用改&#xff09; 4.输入cd 文件地址…

printBanner

主类 package com.example.demo;import com.application.Application;public class Demo {public static void main(String[] args) {Application application new Application("application.properties");application.run();}}创建对象&#xff0c;需要的参数&…

第十三届蓝桥杯青少组省赛Python真题,包含答案

目录 一、选择题 二、编程题 第十三届蓝桥杯青少组省赛Python真题,包含答案 一、选择题 第 1 题 单选题 下列关于函数的说法正确的是 () 。 答案&#xff1a;C 第 2 题 单选题 十进制数55转换成十六进制是 () 。 答案&#xff1a;C 第 3题 单选题 下列方法中&#xff0c;哪…

Sysfs简介

Sysfs学习记录 主要参考文献&#xff1a;https://xuesong.blog.csdn.net/article/details/109522945 Sysfs(System File System)是一种虚拟文件系统&#xff0c;它提供了一种在Linux和Unix系统中管理设备和内核参数的机制。Sysfs基于内核&#xff0c;将底层硬件信息和内核参数…

WPF:WPF原生布局说明

前言 WPF在国内讨论度很小&#xff0c;我在这里记录一下WPF简单的原生控件是如何使用的&#xff0c;顺便回忆一下WPF的基础知识&#xff0c;有些忘记的比较厉害了 WPF简介 WPF是微软推出的桌面UI软件&#xff0c;是我觉得最早实现MVVM&#xff08;数据驱动事务&#xff09;&…

三、H3C-NE实验-配置管理实验

实验拓扑图&#xff1a; 实验描述&#xff1a; 1&#xff09;把路由器R配置为FTP服务器&#xff1b; 2&#xff09;在R上save配置文件后&#xff0c;主机登录FTP&#xff0c;把R的配置文件copy备份&#xff1b; 3&#xff09;删掉R的配置文件后&#xff0c;利用主机恢复R的配…

42个网工高效率工具,我只告诉你(一)

晚上好&#xff0c;我是老杨。 不知道上一篇书单总结&#xff0c;你是否觉得干货 今天更新第四篇&#xff0c;也是最后一篇总结——2022年全年&#xff0c;我安利给你的网工好用工具&#xff0c;整整42个。 它是什么&#xff0c;为什么好用&#xff0c;哪里下载&#xff0c;…

【Linux】进程信号详解(二)

文章目录 前言一、信号阻塞1.信号其他相关常见概念信号递达&#xff1a;信号未决&#xff1a;信号阻塞&#xff1a;信号阻塞vs信号递达的忽略动作 2. 在内核中的表示3. sigset_t4. 信号集操作函数5.sigprocmask函数6.sigpending 二、深入理解捕捉信号1. 虚拟地址空间2.用户态和…

mybatis是如何集成到spring的?

文章目录 1 前言1.1 集成spring前使用mybatis的方式1.2 集成mybatis到spring的关键步骤 2 SqlSessionFactoryBean对象分析2.1 buildSqlSessionFactory做了什么事情&#xff1f;2.2 为什么是SqlSessionFactoryBean却可以使用SqlSessionFactory&#xff1f; 3 验证demo4 举一反三…