3.kafka-3.生产者,消费者

news2024/12/25 9:31:13

文章目录

    • 1.个性化配置,增加吞吐量
    • 2.发送事务消息
    • 3.消费组
    • 手动提交offset
    • 指定offset位置进行消费
    • 指定时间消费
    • 当新增消费者,或者消费组时,如何消费
    • 漏消息和重复消息
    • 如何解决消费解压问题

在这里插入图片描述

1.个性化配置,增加吞吐量

 private static void sendWithCustomerParameter(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //默认16k
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        //默认1
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //默认32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);
        //compression.type 压缩,默认是none,可配置为gzip,snappy,lz4,和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("消息发送到"+metadata.partition()+"分区");
                    }
                }
            });

        }

        kafkaProducer.close();
    }

消费端

[root@node3 kafka_2.12-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic first
hello0
hello1
hello2
hello3
hello4

如果想个性化设定消息推送的分区规则,可以自定义分区器
此外生产端,还可以设定ack机制,以及重试次数

 properties.put(ProducerConfig.ACKS_CONFIG, "all");// 等于配置ack=-1
 properties.put(ProducerConfig.RETRIES_CONFIG, 3);//默认是int的最大值

2.发送事务消息

private static void transactionSend(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092" );
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设置事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0" );

        KafkaProducer producer = new KafkaProducer(properties);
        //初始化事务
        producer.initTransactions();
        //开启事务
        producer.beginTransaction();
        try{
            for (int i = 0; i < 5; i++) {
                producer.send(new ProducerRecord("first", "I love you "+i));
            }
//            int i = 4/0;
            producer.commitTransaction();
        }catch (Exception e){
            e.printStackTrace();
            //丢弃事务
            producer.abortTransaction();
        }finally {
            producer.close();
        }
    }

broker重要参数
在这里插入图片描述
在这里插入图片描述

3.消费组

消费组中的消费者作为整体消费某个主题,而主题的每个分区只能被一个一个消费组中的一个消费者消费

同一个分区中的消息可以保证有序消费
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者参数
在这里插入图片描述
在这里插入图片描述

 private static void consume() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
        //创建消费者对象
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        //注册要消费的主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(20));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }
       
        

    }

key 是topic-分区编号
在这里插入图片描述
当发送的时候指定key

    private static void sendWithCustomerParameter(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //默认16k
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        //默认1
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //默认32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);
        //compression.type 压缩,默认是none,可配置为gzip,snappy,lz4,和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");// 等于配置ack=-1
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);//默认是int的最大值
        //创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","customer-first-"+i, "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("消息发送到"+metadata.partition()+"分区");
                    }
                }
            });

        }

        kafkaProducer.close();
    }

消费者接受到的key如下
在这里插入图片描述

消费者也可以消费指定partition
在这里插入图片描述

手动提交offset

  private static void commitByManual(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test" );
        //是否自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false );
        KafkaConsumer consumer = new KafkaConsumer(properties);
        consumer.subscribe(Arrays.asList("first"));
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接受到:"+record);
            }
            //异步提交
            consumer.commitAsync();
            //同步提交
//            consumer.commitSync();;
        }

    }

指定offset位置进行消费

private static void assignOffset(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test" );
        //是否自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false );
        KafkaConsumer consumer = new KafkaConsumer(properties);
        consumer.subscribe(Arrays.asList("first"));

        Set<TopicPartition> assignment = new HashSet<>();
        while(assignment.size()==0){
            //获取分区消息
            consumer.poll(Duration.ZERO);
            assignment=consumer.assignment();
        }

        //遍历每个分区,指定offset从1700位置开始消费
        for (TopicPartition topicPartition : assignment) {
            consumer.seek(topicPartition, 1700);
        }


        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接受到:"+record);
            }
            //异步提交
            consumer.commitAsync();
            //同步提交
//            consumer.commitSync();;
        }

    }

获取的分区集合为
在这里插入图片描述

指定时间消费

 public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1");
        //创建消费者对象
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        //注册要消费的主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);


        Set<TopicPartition> assignments = new HashSet<>();
        while (assignments.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(5));
            //获取分区信息
            assignments = kafkaConsumer.assignment();
        }

        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
        //封装集合存储,每个分区对应一天的数据
        for (TopicPartition topicPartition : assignments) {
            timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

        //获取从一天前开始消费的每个分区的offset
        Map<TopicPartition, OffsetAndTimestamp> offsets =
                kafkaConsumer.offsetsForTimes(timestampToSearch);

        //遍历每个分区,对每个分区设置消费时间
        for (TopicPartition topicPartition : assignments) {
            OffsetAndTimestamp offsetAndTimestamp= offsets.get(topicPartition);
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }


        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(20));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }


    }

}

当新增消费者,或者消费组时,如何消费

auto.offset.reset=earliest | latest | none

当kafka没有偏移量(消费组第一次消费)或服务上不再存在当前偏移量时(例如数据已经被删除),该怎么办呢?
1.earliest 自动将偏移量设置为最早的偏移量 ,–from-beginning
在这里插入图片描述

2.latest 默认值,自动将偏移量设置为最新偏移量
3.none,如果未找到消费组以前的偏移量,则向消费者抛出异常

    private static void sendWithCustomerParameter(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //默认16k
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        //默认1
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //默认32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32*1024*1024);
        //compression.type 压缩,默认是none,可配置为gzip,snappy,lz4,和zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");// 等于配置ack=-1
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);//默认是int的最大值
        //创建生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","customer-first-"+i, "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception==null){
                        System.out.println("消息发送到"+metadata.partition()+"分区");
                    }
                }
            });

        }

        kafkaProducer.close();
    }

漏消息和重复消息

重复消费:已经消费了消息,但是offset未提交
先提交了offset,但是消息未消费
在这里插入图片描述

如何解决消费解压问题

在这里插入图片描述
如何消费端为手动提交,且涉及到数据库交互IO操作,数据库操作慢,也会影响消费速度,大量的消费线程阻塞到数据库连接上,等待保存数据到db中

网络问题慢,也会导致消息挤压

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/168630.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用 .NET 7、Blazor 和 .NET MAUI 构建你自己的 Podcast App

.NET Podcast App 首次在 .NET Conf 2021上推出&#xff0c;最近进行了更新以在 .NET Conf 2022 keynote 中突出显示 .NET 7 中的新功能。该 Podcast App 已准备好使用展示 .NET&#xff0c;ASP.NET Core&#xff0c;Blazor&#xff0c;.NET MAUI&#xff0c;Azure Container A…

Android 蓝牙开发——概述(一)

一、蓝牙简介 蓝牙技术是一种无线数据和语音通信开放的全球规范&#xff0c;它是基于低成本的近距离无线连接&#xff0c;为固定和移动设备建立通信环境的一种特殊的近距离无线技术连接。 其中将1.x~3.0之间的版本称之为经典蓝牙&#xff0c;4.x开始的蓝牙称之为低功耗蓝牙&…

Memcache学习总结

这里写自定义目录标题介绍一致性哈希寻找节点一致性哈希介绍内存管理slab结构寻找存储chunkChunk中存储的Item数据结构grow factor 调优回收删除一些特性介绍 基于内置内存Key-Value形式存储数据(字符串、对象)集群服务器是通过数组链表方式存储K-V数据<分布式>基于哈希…

编程语言那么多,我为什么推荐你学Java?

Java一直都是稳居排行榜第一的语言&#xff0c;在未来10年Java都会是最热门的语言之一&#xff0c;因为Java技术具有卓越的通用性、高效性、安全性和平台移植性&#xff0c;它可以跨平台的应用到不同的领域&#xff0c;工作需求足够大。 为什么选择学习Java编程语言&#xff1…

更具科技感的中塔机箱,模块设计兼容性强,鑫谷昆仑御风机箱上手

大家装机的时候应该都接触过鑫谷的机箱和散热器外设&#xff0c;作为一家有年头的外设品牌&#xff0c;这两年鑫谷推陈出新&#xff0c;像是在电源方面&#xff0c;就有不少很受欢迎的产品&#xff0c;像是昆仑系列等&#xff0c;前端鑫谷在昆仑系列中带来了一款设计新颖的机箱…

琥珀酰亚胺-双硫键-琥珀酰亚胺NHS-SS-NHS双端活性酯二硫键交联剂

名称:NHS-SS-NHS 中文名称:活性酯-双硫键-活性酯 琥珀酰亚胺-双硫键-琥珀酰亚胺 分子式 :C14H16N2O10S2 分子量 :436.41 存储条件&#xff1a;-20C&#xff0c;避光&#xff0c;避湿 用 途&#xff1a;仅供科研实验使用&#xff0c;不用于诊治 外观: 固体或粘性液体&am…

VMwareWorkstationPro16的下载与安装,以及vm账号注册的问题

VMwareWorkstationPro16的下载与安装&#xff0c;以及vm账号注册的问题查看虚拟化支持是否开启vm的安装vm账号注册的常见问题VM 16的安装步骤查看虚拟化支持是否开启 可以从任务管理器中的性能去查看CPU是否开启虚拟化支持 vm的安装 访问 vm 的官网: https://www.vmware.co…

I2C_Adapter驱动框架讲解与编写

I2C_Adapter驱动框架讲解与编写 文章目录I2C_Adapter驱动框架讲解与编写参考资料&#xff1a;一、 回顾1.1 2C驱动程序的层次1.2 I2C总线-设备-驱动模型二、 I2C_Adapter驱动框架2.1 核心的结构体1. i2c_adapter2. i2c_algorithm2.2 驱动程序框架1. 所涉及的函数2. i2c_algorit…

lq-递归

1、递归实现指数型枚举从 1∼n 这 n个整数中随机选取任意多个&#xff0c;输出所有可能的选择方案。输入格式输入一个整数 n。输出格式每行输出一种方案。同一行内的数必须升序排列&#xff0c;相邻两个数用恰好 1个空格隔开。对于没有选任何数的方案&#xff0c;输出空行。本题…

AStar(A*)算法核心思想( for unity)

AStar算法算法思想举例理解核心代码A* 算法&#xff0c;A* (A-Star)算法是一种静态路网中求解最短路径最有效的直接搜索方法&#xff0c;也是解决许多搜索问题的有效算法。算法中的距离估算值与实际值越接近&#xff0c;最终搜索速度越快。 注意:AStar的类应该作为一种单例类只…

软考初级哪个好考

其实软考初级的实用性差不多。只是看自身怎么看&#xff0c;考哪一科对你来说&#xff0c;产生的意义更大&#xff0c;对自己以后的发展前景帮助大&#xff0c;那你就选择哪一科就行。 软考初级科目有&#xff1a;程序员、网络管理员、信息处理技术员、信息系统运行管理员、网…

C语言之vs2022安装教程,还不会的快来看

下载安装包官方无毒下载链接:https://visualstudio.microsoft.com/zh-hans/vs/点击链接或者复制到浏览器出现如下按钮点击下载社区版本Visual Studio 2022Community 2022: 社区版&#xff0c;也可以理解为个人版。适用于学生、开源和个人。一些新手用来学习是个不错的选择Profe…

第十三届蓝桥杯省赛 C++ B组 - 修剪灌木

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;蓝桥杯题解集合 &#x1f4dd;原题地址&#xff1a;付账问题 &#x1f4e3;专栏定位&#xff1a;为想参加蓝桥别的小伙伴整理常考算法题解&#xff0c;祝大家…

sentinel部署配置

sentinel部署配置sentinel 部署&#xff08;V1.8.6&#xff09;获取 Sentinel 控制台启动修改用户名密码控制台登录客户端集成sentinel 部署&#xff08;V1.8.6&#xff09; 获取 Sentinel 控制台 您可以从 release 页面 下载最新版本的控制台 jar 包。 官网&#xff1a; ht…

【C语言】深度剖析数据在内存中的存储---(附源码 | 建议收藏)

&#x1f680;write in front&#x1f680; &#x1f4dd;个人主页&#xff1a;认真写博客的夏目浅石. &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f4e3;系列专栏&#xff1a;凡人修C传 &#x1f4ac;总结&#xff1a;希望你看完之后&…

让Tomcat服务器运行在Windows后台

让tomcat运行在Windows后台一、设置环境变量二、安装tomcat服务三、设置后台自动运行本机环境&#xff1a;win10一、设置环境变量 进入tomcat主目录的bin文件夹&#xff0c;复制路径 快捷键winr键唤出运行框&#xff0c;输入sysdm.cpl然后点击“确定”进入系统属性面板 在系统…

传统工科应该怎么学习机器学习or深度学习?

还是老生常谈。建议先不要直接上手机器学习/深度学习&#xff0c;先把你研究对象/信号的机理给搞清楚&#xff0c;然后再开始上现代信号处理&#xff0c;机器学习/深度学习算法&#xff0c;只有对你研究对象的机理深入了解&#xff0c;才能更好地对信号处理算法&#xff0c;机器…

【职工管理系统】C++全栈体系(十三)

职工管理系统 第一章 管理系统需求 职工管理系统可以用来管理公司内所有员工的信息 公司中职工分为三类&#xff1a;普通员工、经理、老板&#xff0c;显示信息时&#xff0c;需要显示职工编号、职工姓名、职工岗位、以及职责 普通员工职责&#xff1a;完成经理交给的任务 …

运算符重载及组合与继承

目录 一、运算符重载 1.1普通运算符重载 1.2特殊运算符重载 二、标准输入输出流 三、组合与继承 3.1组合 3.2继承 1) public继承方式 2) protected继承方式 3) private继承方式 小作业&#xff1a;模仿c的string类&#xff0c;自己实现string类 一、运算符重载 百度…

【文件随机读写和文件缓冲区】

1.1fseek函数 1.2ftell函数1.3rewind函数2. 文件读取结束的判定2.1文件缓冲区 1.1fseek函数 根据文件指针的位置和偏移量来定位文件指针。 int fseek ( FILE * stream, long int offset, int origin );看不懂没关系&#xff0c;举个例子你就明白了。 我们首先在text.txt文…