kafka 消费者线程安全问题详细探讨

news2024/9/24 1:19:40

内容概要

图片

主要内容

常见错误案例

下面这段代码大概逻辑

  • 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理

  • 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放

表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行,因此存在线程安全问题, 如何修改,读者自己思考下。

    @PostConstruct
    public void consumer(){
        kafkaConsumer = new KafkaConsumer(getConfig());
        kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(running){
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
                    records.forEach(record->{
                        System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());
                    });
                }
            }
        }).start();
    }

    @PreDestroy
    public void close(){
        running = false;
        if(kafkaConsumer != null){
            kafkaConsumer.close();
        }
    }

消费者非线程安全代码解读

kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer

  • 相关操作前

    • 调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。

  • 操作完成后

    • 调用release()清除痕迹

acquire()相对于加锁,release()相当于释放锁。

参看poll 方法实现,一目了然。

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        } else {
            this.refcount.incrementAndGet();
        }
    }
    
      private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

图片

poll源码

如何实现消费者多线程消费消息呢

思路1

每次实例化一个 KafkaConsumer

这种方式实现简单,但每次都需要建立TCP 链接


思路2

相关操作方法 加上  synchronized,获取使用Lock 加锁保证线程安全

这种方式性能较差

思路3

拉取消息使用一个线程, 消息处理使用多线程

因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。

示例代码

ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));

executorService.execute(()->{
    //处理消息
    records.forEach(record->{
        System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());
    });
});

旁敲侧击 举一反三

面试题回顾 Dubbo 线程模型

通常我们线程分为两类

  • IO 线程:负责网络通信的读写操作,接收和发送请求与响应。

  • 业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。


Dubbo 线程模型有几种你还记得否?该如何选择?

  • AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

  • DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。

  • ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。

其实选择的依据 业务处理的快慢,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。

一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher 

是不是又和我一起温故知新了,加油吧 少年 !!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2158972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python按照财年分组案例

有如下数据&#xff1a;需要按照如下要求进行分组。 需求是对Site进行分组 条件当值是Act得时候&#xff0c;分组名字就是 条件当值是Rebase*得时候&#xff0c;分组名字就是FY?1/?1 条件当值是FIRM 得时候&#xff0c;分组名字就是 每年得7月到次年得6月为一个财年&#xff…

C++之初识STL(概念)

STL&#xff08;标准模板库&#xff09; STL广义分类为&#xff1a;容器&#xff0c;算法&#xff0c;迭代器 * **容器**和**算法**之间通过**迭代器**进行无缝连接 意义&#xff1a;C的**面向对象**和**泛型编程**思想&#xff0c;目的就是**复用性的提升** STL六大组件 1. 容…

MODELS 2024:闪现奥地利,现场直击报道

周末出逃&#xff01;小编闪现至奥地利林茨&#xff0c;亲临第27届MODELS 2024国际会议&#xff0c;以第一视角引领你深入会议现场&#xff0c;领略其独特风采。利用午饭时间&#xff0c;小编紧急码字&#xff0c;只为第一时间将热点资讯呈现给你~ 会议介绍&#xff1a; MODEL…

计算机毕业设计之:微信小程序的校园闲置物品交易平台(源码+文档+讲解)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

JavaEE: 深入探索TCP网络编程的奇妙世界(六)

文章目录 TCP核心机制TCP核心机制九: 面向字节流TCP核心机制十: 异常处理 小小的补充(URG 和 PSH)~TCP小结TCP/UDP 对比用UDP实现可靠传输(经典面试题) 结尾 TCP核心机制 上一篇文章JavaEE: 深入探索TCP网络编程的奇妙世界(五) 书接上文~ TCP核心机制九: 面向字节流 TCP是面…

开关频率与谐振频率对应的模态图

当fsfr时 当fr2<fs<fr1时 当fs>fr1时 开关频率对应输入电压的频率 谐振频率对应的是谐振电流的频率

JavaSE - 面向对象编程05

01 正则表达式 【1】概念&#xff1a;正则表达式是由一些特定字符组成的&#xff0c;代表的是一个规则。 【2】可以用来做什么&#xff1f; ① 用于校验数据格式的合法性 ② 用于在文本中爬取满足要求的内容 ③ 用于String类的replace方法&#xff0c;split方法的替换和分割 …

动态时间【JavaScript】

这个代码实现了一个动态显示当前日期和时间的功能。具体来说&#xff0c;它会每秒更新一次时间并在页面上显示出来。 实现效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><…

GUI编程之MATLAB入门详解(01)

⛄前言 图形用户界面的设计是MATLAB的核心应用之一。当用户与计算机之间或用户与计算机程序之间进行交互操作时&#xff0c;舒服高效的用户接口功能则会对用户产生极大的吸引力。图形用户界面&#xff08;GUI&#xff09;则通过窗口、图标、按钮、菜单、文本等图形对象构成用户…

美业SaaS收银系统如何收银?博弈美业实操/美业门店管理系统源码

1.打开博弈美业APP 2.工作台上方的【收银台】、【扫码核销】、【密码核销】均可完成收银 3.【收银台】可直接选择商品/服务/课程&#xff0c;再选择客户后提交订单收款 4.【扫码核销】【密码核销】可直接扫描二维码、输入核销码进行收银

大模型日报|7 篇必读的大模型论文

大家好&#xff0c;今日必读的大模型论文来啦&#xff01; 1.中科大团队提出人像视频编辑方法 PortraitGen 中国科学技术大学团队提出了 PortraitGen&#xff0c;这是一种功能强大的人像视频编辑方法&#xff0c;它能通过多模态提示实现一致且富有表现力的风格化。 传统的人…

SLAM面经1(百度)

百度面经 百度共三面,如果面试效果俱佳,会增加一个hr面。前二面主要是技术面,分为在线coding+代码知识+专业知识+工程能力。第三面是主管面,偏向于管理方面,和hr面相似。 一面 1)在线coding 在线coding的考试内容为下面力扣的变种。 2)专业面 (1)VINS-FUSION与ORB…

鲲鹏计算这五年:硬生态基本盘稳住,才能放手进击软生态

文 | 智能相对论 作者 | 叶远风 数智化深入发展、新质生产力成为主旋律的当下&#xff0c;本土计算产业的发展被寄予越来越多的关注和期待。自2019年开启以来&#xff0c;鲲鹏计算产业生态已经整整走过5个年头。 因此&#xff0c;今年华为全联接大会的鲲鹏之夜&#xff0c;在…

【网络安全】依赖混淆漏洞实现RCE

未经许可&#xff0c;不得转载。 文章目录 正文 依赖混淆是一种供应链攻击漏洞&#xff0c;发生在企业的内部依赖包错误地从公共库&#xff08;如npm&#xff09;下载&#xff0c;而不是从其私有注册表下载。攻击者可以在公共注册表中上传一个与公司内部包同名的恶意包&#xf…

java基础(2)方法的使用

目录 1.前言 2.正文 2.1方法的定义 2.2方法的调用过程 2.3方法的实参与形参 2.3.1形参 2.3.2实参 2.3.3参数传递 2.4方法的重载 3.小结 1.前言 哈喽大家好啊&#xff0c;今天博主继续带领大家学习java的基本语法&#xff0c;java的基础语法部分打算用六到七篇博文完…

关于uniapp wifi调用走过的坑

1. uniapp老脚手架与uni-wif带来的兼容性问题 且几乎找不到解决方法 2. uni-wif需要插件市场安装 3.还有一种可以使用导入安卓类的方式&#xff0c;可以正常获取到已经连接ssid&#xff08;wifi名称&#xff09;&#xff0c;也可以获取到wifi列表 &#xff0c; 但ScanResul…

p18 docker镜像原理之联合文件系统,p19 docker镜像分层的理解

镜像是什么 镜像其实就是一种轻量级的&#xff0c;可执行的一种软件包&#xff0c;用来打包基于环境开发的软件&#xff0c;里面可以包括代码&#xff0c;环境&#xff0c;数据库&#xff0c;配置文件等信息 如何得到镜像&#xff1f; 可以从镜像仓库下载比方说dockerhub 比…

道路车辆功能安全 ISO 26262标准(2)—功能安全管理

写在前面 本系列文章主要讲解道路车辆功能安全ISO26262标准的相关知识&#xff0c;希望能帮助更多的同学认识和了解功能安全标准。 若有相关问题&#xff0c;欢迎评论沟通&#xff0c;共同进步。(*^▽^*) 1. 道路车辆功能安全ISO 26262标准 2. ISO 26262-2 功能安全管理 IS…

基于SpringBoot+Vue的旅游攻略平台管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码 精品专栏&#xff1a;Java精选实战项目…

论文不会写快来看!分享4款ai改写论文软件

在当今学术研究和写作领域&#xff0c;AI论文改写工具已经成为不可或缺的助手。这些工具不仅能够帮助研究人员提高写作效率&#xff0c;还能确保论文的质量和原创性。以下是四款值得推荐的AI改写论文软件&#xff0c;其中特别推荐千笔-AIPassPaper。 千笔-AIPassPaper 传送门&…