探究:kafka生产者/消费者与多线程安全

news2024/11/25 7:25:22

目录

1. 多线程安全

1.1. 生产者是多线程安全的么?

1.1. 消费者是多线程安全的么?

2. 消费者规避多线程安全方案

2.1. 每个线程维护一个kafkaConsumer

2.2. [单/多]kafkaConsumer实例 + 多worker线程

2.3.方案优缺点对比


1. 多线程安全

1.1. 生产者是多线程安全的么?

        Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例。这是因为Kafka生产者实例内部使用了一些同步机制来保证线程安全,例如使用了线程安全的队列来缓存消息,使用了同步锁来保护共享资源的访问等。

        同时,Kafka生产者的send()方法是非阻塞的,可以在多个线程中并发调用,不会阻塞线程。Kafka生产者还提供了异步发送和同步发送两种发送方式,可以根据实际需求选择不同的发送方式。

然而,如果多个线程共享同一个Kafka生产者实例,需要注意以下几点:

  1. 同一个线程中不要同时调用send()方法和flush()方法,可能会导致消息发送顺序不一致。

  2. 不同线程中调用send()方法时,需要注意消息的顺序,可以使用Kafka的分区机制来保证消息的顺序。

  3. 如果多个线程发送的消息都是针对同一个主题或分区,可能会导致消息的重复发送或丢失。因此,建议在多线程情况下,使用Kafka的分区机制,将消息发送到不同的分区中,以避免消息的重复和丢失。

        综上所述,Kafka生产者是线程安全的,可以在多个线程中共享一个Kafka生产者实例,但需要注意消息的顺序和分区的使用,以保证消息的可靠性和顺序性。

1.1. 消费者是多线程安全的么?

        Kafka消费者是非线程安全的,主要原因是因为:Kafka消费者使用了内部的状态来跟踪消费进度和偏移量。这种状态包括消费者的位置,消费者的偏移量,以及消费者的订阅主题和分区等信息。如果多个线程同时访问同一个Kafka消费者实例,就会导致这些状态信息的不一致,从而导致消费进度出现错误。

2. 消费者规避多线程安全方案

背景:Kafka消费者中具有poll()方法,该方法是一个阻塞方法,如果在同一个线程中多次调用poll()方法,会导致消费者阻塞在poll()方法中,无法及时消费新到达的消息。因此,为了提高Kafka消费者的并发性能,通常使用多个消费者线程来消费+处理消息。为了保证Kafka消费者的线程安全性,通常采用以下2种方案。

方案的本质:一个线程只能绑定一个消费者实例(不能多个线程共用一个消费者实例)

2.1. 每个线程维护一个kafkaConsumer

 1. 创建多个线程,去消费topic

2. 每个线程绑定固定数量的分区(最好的情况是一个消费者绑定一个分区)

        代码比较简单,唯一需要关注的点:创建的消费者线程和分区的数量可能不相等,此时,可以采用「分区副本自动分配策略」,该策略会将消费者线程和分区绑定在一起。

场景讨论:

        按照现在的代码,假设消费者线程获取了分区A 100个msg,然后处理,若还未处理完,此时,offset未提交。因为此时offset还未提交,消费者线程还会去分区A调用poll,获取到的msg依然会是刚才的100个msg。存在消费重复的场景,因此,消费者需要做好幂等处理。

        上面的代码,不会出现这个场景:消费者获取了消息1,2,3,3处理完成了,执行了提交偏移,但是消息1还未提交的场景。

代码示例:

package com.bie.kafka.kafkaThrea;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 *
 *       1、KafkaConsumer是非线程安全的,KafkaProducer是线程安全的。
 *       2、该案例是创建多个线程,每个线程维护一个KafkaConsumer实例
 *           用户创建多个线程消费topic数据,使用分区副本自动分配策略,将消费者线程与分区进行绑定
 *       3、ConsumerRunnable,消费线程类,执行真正的消费任务
 */
public class ConsumerRunnable implements Runnable {

    // 每个线程维护私有的kafkaConsumer实例
    private final KafkaConsumer<String, String> consumer;

    /**
     * 默认每个消费者的配置参数初始化
     *
     * @param brokerList
     * @param groupId
     * @param topic
     */
    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        // 带参数的构造方法
        Properties props = new Properties();
        // kafka的列表
        props.put("bootstrap.servers", brokerList);
        // 消费者组编号
        props.put("group.id", groupId);
        // 自动提交
        props.put("enable.auto.commit", true);
        // 提交提交每个一秒钟
        props.put("auto.commit.interval.ms", "1000");
        // 反序列化key
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化value
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 将配置信息进行初始化操作
        this.consumer = new KafkaConsumer<>(props);
        // 定义响应的主题信息topic:使用分区副本自动分配策略
        consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        // 消费者保持一直消费的状态
        while (true) {
            // 将获取到消费的信息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(200));
            // 遍历出每个消费的消息
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息:deal msg
            }
            // 提交offset
        }
    }
}
package com.bie.kafka.kafkaThrea;

import java.util.ArrayList;
import java.util.List;

/**
 *       1、消费线程管理类,创建多个线程类执行消费任务
 */
public class ConsumerGroup {

    // 消费者群组,多消费者。
    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        // 初始化消费者组
        consumers = new ArrayList<>(consumerNum);
        // 初始化消费者,创建多少个消费者
        for (int i = 0; i < consumerNum; i++) {
            // 根据消费者构造方法,创建消费者实例
            ConsumerRunnable consumerRunnable = new ConsumerRunnable(brokerList, groupId, topic);
            // 将创建的消费者实例添加到消费者组中
            consumers.add(consumerRunnable);
        }
    }

    public void execute() {
        // 将消费者组里面的消费者遍历出来
        for (ConsumerRunnable task : consumers) {
            // 创建一个消费者线程,并且启动该线程
            new Thread(task).start();
        }
    }
}
package com.bie.kafka.kafkaThrea;


public class ConsumerMain {

    public static void main(String[] args) {
        // kafka即broker列表
        String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
        // group组名称
        String groupId = "group1";
        // topic主题名称
        String topic = "topic1";
        // 消费者的数量
        int consumerNum = 3;
        // 通过构造器创建出一个对象
        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        // 执行execute的方法,创建出ConsumerRunnable消费者实例。多线程多消费者实例
        consumerGroup.execute();
    }
}

上面代码,在实际工程应用中存在问题,描述见下:

  1. 每个消费者线程,先执行poll拉取一批batchsize命令后
  2. 批量处理这批消息
  3. 提交offset

        步骤2,处理batchsize的消息,可能中间的某一个失败了,但是步骤3提交了整体的offset,会导致失败的消息丢失了。

解决方案:处理每个消息的流程中,增加重试机制(本地消息表),保证该消息能执行成功

2.2. [单/多]kafkaConsumer实例 + 多worker线程

 与2.1方案一的区别在于,将「消息的获取」与「消息的处理」解耦开

1. 消息的获取:维护一个or多个kafkaConsumer实例,获取消息,获取到消息后,将消息丢到消息处理线程池中

2. 消息的处理:创建一个线程池,里面存放了worker线程,每个worker线程处理获取到的消息

ConsumerWorker类

        worker线程:执行msg的处理,更新offsets信息

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
 
import java.util.List;
import java.util.Map;
 
public class ConsumerWorker<K, V> implements Runnable {
 
    private final ConsumerRecords<K, V> records;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
    public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = record;
        this.offsets = offsets;
    }
 
    @Override
    public void run() {
        for (TopicPartition partition : records.partitions()) {
            // 获取到分区的消息记录
            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
            // 遍历获取到的消息记录
            for (ConsumerRecord<K, V> record : partitionRecords) {
                // 消息处理逻辑
            }
 
            /* 下面操作,是更新各个分区的offset信息到offsets变量,并没有真正的提交位移 */
            /* 真正的更新位移操作,不是在worker线程,而是在消费者线程 */

            // 待更新的位移
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 同步锁,锁住offsets位移
            synchronized (offsets) {
                // 如果offsets位移不包含partition这个key信息,就将位移信息设置到map集合里面
                if (!offsets.containsKey(partition)) {
                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    // 否则,offsets位移包含partition这个key信息,获取到offsets的位置信息
                    long curr = offsets.get(partition).offset();
                    if (curr <= lastOffset + 1) { // 如果获取到的位置信息小于等于上一次位移信息大小,将这个partition的位置信息设置到map集合中。并保存到broker中。
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}

 ConsumerThreadHandler类

        一个主线程,定时去poll消息,然后将消息投递到worker线程中,最后,当offsets信息发生变更后,提交offset

package huxi.test.consumer.multithreaded;
 
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
 
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
public class ConsumerThreadHandler<K, V> {
 
    // KafkaConsumer实例
    private final KafkaConsumer<K, V> consumer;
    // ExecutorService实例
    private ExecutorService executors;
    // 位移信息offsets
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
    public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
        // 构造kafkaConsumer配置
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false"); // 非自动提交位移信息
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        // 创建kafkaConsumer,赋值给成员变量consumer
        consumer = new KafkaConsumer<>(props);

        // 消费者订阅消息,并实现重平衡rebalance
        // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
        // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            // 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(offsets); // 提交位移
            }

            // rebalance完成后会调用onPartitionsAssigned方法。
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                offsets.clear(); // 清除位移信息
            }
        });
    }
 
    /**
     * 消费主方法
     * @param threadNumber  线程池中线程数
     */
    public void consume(int threadNumber) {
        // 创建一个worker线程池,线程数量为threadNumber个
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 只有一个消费者线程
        try {
            // 消费者一直处于等待状态,等待消息消费
            while (true) {
                // 从主题中获取消息
                ConsumerRecords<K, V> records = consumer.poll(1000L);
                // 如果获取到的消息不为空
                if (!records.isEmpty()) {
                    // submit: 将一批msg交给worker线程处理,消息处理完后,更新offsets信息
                    executors.submit(new ConsumerWorker<>(records, offsets));
                }
                // 调用提交位移信息
                commitOffsets();
            }
        } catch (WakeupException e) {
            // swallow this exception
        } finally {
            commitOffsets();  // 调用提交位移信息
            consumer.close(); // 关闭consumer
        }
    }
 
    private void commitOffsets() {
        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
        // 保证线程安全、同步锁,锁住offsets
        synchronized (offsets) {
            // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
            if (offsets.isEmpty()) {
                return;
            }
            // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
            // 清除位移信息offsets
            offsets.clear();
        }
        // 将封装好的位移信息unmodfiedMap集合进行同步提交
        // 手动提交位移信息
        consumer.commitSync(unmodfiedMap);
    }
 
    public void close() {
        consumer.wakeup();
        // 关闭ExecutorService实例
        executors.shutdown();
    }
}

Main类

        包装了所有的工具类,启动整个程序

package huxi.test.consumer.multithreaded;
 
public class Main {
 
    public static void main(String[] args) {
        // broker列表、topic、group id
        String brokerList = "localhost:9092";
        String topic = "test-topic";
        String groupID = "test-group";
        
        // 1. 根据ConsumerThreadHandler构造方法,创建出消费者handler
        final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);
        final int cpuCount = Runtime.getRuntime().availableProcessors();
 
        // 创建线程的匿名内部类
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 执行consume,在此线程中执行消费者消费消息。
                handler.consume(cpuCount); 
            }
        };
        new Thread(runnable).start();  // 3. 该函数会调用上面的run()方法
 
        try {
            // 20秒后自动停止该测试程序
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // swallow this exception
        }
        System.out.println("Starting to close the consumer...");
        handler.close();
    }
}  

2.3.方案优缺点对比

优点缺点
方案1

实现简单

速度较快,因为无线程交互开销

方便位移管理

易于维护分区之间的消息消费顺序

socket连接开销大

consumer受限于topic的分区数,扩展性差

broker端处理负载高(因为发往broker的请求较多)

reblance可能性增大

方案2

消息获取和处理解耦

可独立扩展consumer和worker数,伸缩性好

实现负载

难于维护分区内的消息顺序

处理链路变长,导致位移管理困难

worker线程异常可能导致消费数据丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/401455.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我的Git stash不小心清空了怎么办,提了代码能反悔吗

文章目录1. 前言2. git stash清空场景2. git stash clear后如何还原3.Git撤销已经推送(push)至远端仓库的信息1. 前言 本文总结的知识很实用哈&#xff0c;虽然是git工具的不常用操作&#xff0c;但是绝对不是冷知识&#xff0c;学会可以从会用git升级到git高手。 主要是两种场…

Centos7 安装Mysql8.0

1、到指定目录下下载安装包[rootVM-0-14-centos ~]# cd /usr/local/src2、下载mysql8[rootVM-0-14-centos src]# wget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.20-linux-glibc2.12-x86_64.tar.xz3、解压mysql8, 通过xz命令解压出tar包&#xff0c; 然后通过t…

KDZD耐电压高压击穿强度测试仪

一、技术参数 01、输入电压&#xff1a; 交流 220 V。 02、输出电压&#xff1a; 交流 0--50KV ; 直流 0—50kv 。 03、电器容量&#xff1a;3KVA。 04、高压分级&#xff1a;0—50KV&#xff0c;&#xff08;全程可调&#xff09;。 05、升压速率&#xff1a;0.1KV/s-…

c++11 标准模板(STL)(std::unordered_map)(八)

定义于头文件 <unordered_map> template< class Key, class T, class Hash std::hash<Key>, class KeyEqual std::equal_to<Key>, class Allocator std::allocator< std::pair<const Key, T> > > class unordered…

【C++】你不得不爱的——继承

凡是面向对象的语言&#xff0c;都有三大特性&#xff0c;继承&#xff0c;封装和多态&#xff0c;但并不是只有这三个特性&#xff0c;是因为者三个特性是最重要的特性&#xff0c;那今天我们一起来看继承&#xff01; 目录 1.继承的概念及定义 1.概念 2.继承的定义 2.基类…

Linux进程学习【进程地址】

✨个人主页&#xff1a; Yohifo &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f38a;每篇一句&#xff1a; 图片来源 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 Perseverance is not a long race; it is many short races one after another…

Dynabook笔记本电脑无法开机怎么重装新系统?

Dynabook笔记本电脑无法开机怎么重装新系统&#xff1f;有用户使用Dynabook笔记本电脑出现了无法正常开机的情况。遇到这样的问题是我们的电脑系统出现了损坏&#xff0c;可以尝试进行系统修复。如果无法修复的话&#xff0c;就需要进行系统重装了。以下为大家带来Dynabook笔记…

SQLMap安装教程

注意&#xff1a;在python3环境下安装sqlmap的时候会提示需要在python2的环境下才能安装&#xff0c;其实在python3.6以后也都支持sqlmap了。 sqlmap安装步骤&#xff1a; 一、下载python&#xff1b; 下载地址 https://www.python.org/downloads/ 下载教程参考&#xff08…

通过反射获取注解的属性值(内含源代码)

通过反射获取注解的属性值&#xff08;内含源代码&#xff09; 源代码下载链接地址&#xff1a;https://download.csdn.net/download/weixin_46411355/87554543 目录通过反射获取注解的属性值&#xff08;内含源代码&#xff09;源代码下载链接地址&#xff1a;[https://downl…

做互联网自媒体创业的月薪收入真的能过万吗?

搞自媒体创业有前途吗&#xff1f;收入月薪过万是真的吗&#xff1f; 自媒体创业是一种新兴的创业方法&#xff0c;它的远景十分广阔。自媒体创业能够让人们在自己的兴趣爱好和专业范畴上发挥自己的才能&#xff0c;一起也能够获得不错的收入。可是&#xff0c;月薪过万并不是…

ArangoDB——AQL编辑器

AQL 编辑器 ArangoDB 的查询语言称为 AQL。AQL与关系数据库管理系统 (RDBMS)区别在于其更像一种编程语言&#xff0c;更自然地适合无模式模型&#xff0c;并使查询语言非常强大&#xff0c;同时保持易于读写。数据建模概念 数据库是集合的集合。集合存储记录&#xff0c;称为文…

三维人脸实践:基于Face3D的人脸生成、渲染与三维重建 <三>

face3d: Python tools for processing 3D face git code: https://github.com/yfeng95/face3d paper list: PaperWithCode 基于BFM模型&#xff0c;估计3DMM的参数&#xff0c;可以实现线性的人脸表征&#xff0c;该方法可用于基于关键点的人脸生成、位姿检测以及渲染等。推荐…

信息收集之搜索引擎

Google Hacking 也可以用百度&#xff0c;不过谷歌的搜索引擎更强大 site 功能&#xff1a;搜索指定域名的网页内容&#xff0c;可以用来搜索子域名、跟此域名相关的内容 示例&#xff1a; site:zhihu.com 搜索跟zhihu.com相关的网页“web安全” site:zhihu.com 搜索zhihu…

提升学习 Prompt 总结

NLP现有的四个阶段&#xff1a; 完全有监督机器学习完全有监督深度学习预训练&#xff1a;预训练 -> 微调 -> 预测提示学习&#xff1a;预训练 -> 提示 -> 预测 阶段1&#xff0c;word的本质是特征&#xff0c;即特征的选取、衍生、侧重上的针对性工程。 阶段2&…

C++核心编程

一、内存分区模型概述&#xff1a;C程序在执行时&#xff0c;将内存划分为4个区域程序运行前&#xff1a;代码区&#xff1a;存放函数体的二进制代码&#xff0c;由操作系统管理①共享。共享的目的是对于频繁被执行的程序&#xff0c;在内存中只需有一份代码即可②只读。使其只…

组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比

组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比 目录 组合预测 | MATLAB实现EMD-KPCA-LSTM、EMD-LSTM、LSTM多输入单输出回归预测对比预测效果基本介绍模型描述程序设计参考资料预测效果 基本介绍 MATLAB实现EMD-KP

传输层协议 TCP UDP

目录 协议前菜 端口号 ​编辑端口号范围划分 认识知名端口号(Well-Know Port Number) netstat pidof 传输层协议 UDP协议 UDP协议端格式 UDP的特点 面向数据报 UDP的缓冲区 UDP使用注意事项 基于UDP的应用层协议 TCP协议 TCP协议概念 TCP协议段格式 标志…

深度分析中国高端投教市场第一股“九方财富”的投资价值

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;九方财富&#xff08;09636&#xff09;已于3月10在港交所成功IPO上市&#xff0c;并成为了“中国在线高端投教市场第一股”。 作为中国领先的在线投资决策方案提供商&#xff0c;九方财富…

一起来学习配置Combo接口吧!

Combo接口是一个光电复用的逻辑接口&#xff0c;一个Combo接口对应设备面板上一个GE电接口和一个GE光接口。电接口与其对应的光接口是光电复用关系&#xff0c;两者不能同时工作&#xff08;当激活其中一个接口时&#xff0c;另一个接口就自动处于禁用状态&#xff09;&#xf…

常用存储芯片-笔记本上固态硬盘PTS11系列推荐

在存储领域中&#xff0c;除了存储颗粒之外&#xff0c;还有一种极其重要的芯片&#xff1a;存储控制芯片。存储控制芯片是CPU与存储器之间数据交换的中介&#xff0c;决定了存储器最大容量、存取速度等多个重要参数。特别是在AI、5G、自动驾驶时代&#xff0c;对于数据处理及存…