【大数据学习 | kafka】简述kafka的消费者consumer

news2024/11/12 21:24:02

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        //设定组id
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定key的反序列化器
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定value的反序列化器
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_a","topic_b");
        //一个消费者可以消费多个分区的数据
        consumer.subscribe(topics);
        //订阅这个topic
        while (true){
            //死循环要一直消费数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            //间隔一秒钟消费一次数据,拉取一批数据过来
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b

>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;

/**
 * ClassName : test3_producer
 * Package : com.hainiu.kafka.consumer
 * Description
 *
 * @Author HeXua
 * @Create 2024/11/3 23:40
 * Version 1.0
 */

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class test3_producer {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");
        ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");
        ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");
        producer.send(record1);
        producer.send(record2);
//        producer.send(record3);
        producer.close();
    }
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
   //分别修改消费者组的id不同
package com.hainiu.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_c");
        //订阅多个topic的数据变化
        consumer.subscribe(topics);

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始训练一个大语言模型需要多少天?

一&#xff0c;前言 在AI领域&#xff0c;训练一个大型语言模型&#xff08;LLM&#xff09;是一个耗时且复杂的过程。几乎每个做大型语言模型&#xff08;LLM&#xff09;训练的人都会被问到&#xff1a;“从零开始&#xff0c;训练大语言模型需要多久和花多少钱&#xff1f;”…

【SQL50】day 1

目录 1.可回收且低脂的产品 2.寻找用户推荐人 3.使用唯一标识码替换员工ID 4.产品销售分析 I 5.有趣的电影 6.平均售价 7.每位教师所教授的科目种类的数量 8.平均售价 1.可回收且低脂的产品 # Write your MySQL query statement below select product_id from Products w…

Qt菜单功能实现

本文介绍Qt菜单功能实现。 Qt开发过程中&#xff0c;菜单功能用的还是比较多的&#xff0c;本文针对菜单栏和右键菜单功能实现作简要描述。 1.菜单栏 1)界面设计 在界面中添加菜单栏&#xff08;本例中名为“menubar”&#xff09;&#xff0c;并依次添加需要的菜单&#x…

Jupyter Notebook添加kernel的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

Java:二维数组

目录 1. 二维数组的基础格式 1.1 二维数组变量的创建 —— 3种形式 1.2 二维数组的初始化 \1 动态初始化 \2 静态初始化 2. 二维数组的大小 和 内存分配 3. 二维数组的不规则初始化 4. 遍历二维数组 4.1 for循环 ​编辑 4.2 for-each循环 5. 二维数组 与 方法 5.1…

手机内卷下一站,AI Agent

作者 | 辰纹 来源 | 洞见新研社 2024年除夕夜&#xff0c;OPPO在央视春晚即将开始前举办了一场“史上最短发布会”&#xff0c;OPPO首席产品官刘作虎宣布&#xff0c;“OPPO正式进入AI手机时代”。 春节假期刚过&#xff0c;魅族又公开表示&#xff0c;将停止“传统智能手机…

Python实战:调用淘宝API以抓取商品页面数据

在数据驱动的商业决策中&#xff0c;获取电商平台的商品数据至关重要。淘宝作为中国最大的在线购物平台&#xff0c;其商品数据对于市场分析、价格监控和竞品研究等方面都具有极高的价值。本文将通过一个Python实战案例&#xff0c;展示如何调用淘宝API来抓取商品页面的数据。 …

SpringBoot14-任务

任务 14.1异步任务 所谓异步&#xff0c;在某些功能实现时可能要花费一定的时间&#xff0c;但是为了不影响客户端的体验&#xff0c;选择异步执行 案例&#xff1a; 首先创建一个service&#xff1a; Service public class AsyncService {public void hello(){try {Threa…

如何在Android中自定义property

在Android中创建自定义的属性&#xff08;Android property&#xff09;通常用于调试、性能调优或传递应用和系统之间的信息。 以下是如何在Android中创建和使用自定义属性的步骤&#xff1a; 1. 定义属性 在Android中&#xff0c;属性是以“属性名称属性值”形式定义的键值对…

SSH实验5密钥登录Linuxroot用户(免密登录)

当用户尝试通过SSH连接到远程服务器时&#xff0c;客户端会生成一对密钥&#xff1a;公钥和私钥。公钥被发送到远程服务器&#xff0c;并存储在服务器的~/.ssh/authorized_keys文件中。而私钥则由客户端保管&#xff0c;不会传输给服务器。 在连接过程中&#xff0c;客户端使用…

CelebV-Text——从文本生成人脸视频的数据集

概述 近年来&#xff0c;生成模型在根据文本生成和编辑视频方面受到了广泛关注。然而&#xff0c;由于缺乏合适的数据集&#xff0c;生成人脸视频领域仍然是一个挑战。特别是&#xff0c;生成的视频帧质量较低&#xff0c;与输入文本的相关性较弱。在本文中&#xff0c;我们通…

天地图入门|标注|移动飞行|缩放,商用地图替换

“天地图”是国家测绘地理信息局建设的地理信息综合服务网站。集成了来自国家、省、市&#xff08;县&#xff09;各级测绘地理信息部门&#xff0c;以及相关政府部门、企事业单位 、社会团体、公众的地理信息公共服务资源&#xff0c;如果做的项目是政府部门、企事业单位尽量选…

Python、Delphi 和 C++ 复制文件速度比较

比较 Python、Delphi 和 C 在文件处理上的速度&#xff0c;可以分为以下几个方面进行测试和分析&#xff1a;文件读写速度&#xff1a;指的是在这三种语言中执行相同的文件读写操作所花费的时间。文件大小影响&#xff1a;不同语言对小文件和大文件的处理是否有显著不同。并发性…

复现LLM:带你从零认识语言模型

前言 本文会以Qwen2-0.5B模型为例&#xff0c;从使用者的角度&#xff0c;从零开始一步一步的探索语言模型的推理过程。主要内容如下&#xff1a; 从使用的角度来接触模型本地运行的方式来认识模型以文本生成过程来理解模型以内部窥探的方式来解剖模型 1. 模型前台使用 1.1…

企业IT架构转型之道:阿里巴巴中台战略思想与架构实战感想

文章目录 第一章&#xff1a;数据库水平扩展第二章&#xff1a;中台战略第三章&#xff1a;阿里分布式服务架构HSF&#xff08;high speed Framework&#xff09;、早期Dubbo第四章&#xff1a;共享服务中心建设原则第五章&#xff1a;数据拆分实现数据库能力线性扩展第六章&am…

R语言实战——一些批量对地理数据进行操作的方法

各位朋友在进行数据处理时&#xff0c;当有多张栅格影像时&#xff0c;如果我们都要进行同一操作时&#xff0c;一张一张做很繁琐&#xff0c;用ArcGIS模型构建器是一种比较好的方法。当然&#xff0c;今天小编新学了R语言上面进行批量裁剪&#xff0c;一起来学习一下吧&#x…

详解如何创建SpringBoot项目

目录 点击New Project 选择依赖 简单使用SpringBoot 前面已经讲解了如何获取IDEA专业版&#xff0c;下面将以此为基础来讲解如何创建SpringBoot项目。 点击New Project 选择依赖 注意&#xff0c;在选择SpringBoot版本时&#xff0c;不要选择带SNAPSHOT的版本。 这样&#…

点云分割总结

点云分割总结 point transformerbackground 标量自注意力和向量自注意力&#xff08;可参考论文&#xff09;标量自注意力向量注意力 Point Transformer Layer下采样上采样整体结构 point transformer v2group vector attentionPosition Encoding MultiplerPartition-based Poo…

智象未来(HiDream.ai):从科技创新启程,绘制智能未来新篇章

在人工智能领域飞速演进的当下&#xff0c;智象未来&#xff08;HiDream.ai&#xff09;作为全球领先的多模态生成式人工智能技术供应商&#xff0c;正以其独树一帜的视觉多模态大模型及创新应用&#xff0c;推动行业趋势的前进。智象未来&#xff08;HiDream.ai&#xff09;自…

CSP/信奥赛C++刷题训练:经典例题 - 栈(2):洛谷P1981 :[NOIP2013 普及组] 表达式求值

CSP/信奥赛C刷题训练&#xff1a;经典例题 - 栈&#xff08;2&#xff09;&#xff1a;洛谷P1981 &#xff1a;[NOIP2013 普及组] 表达式求值 题目背景 NOIP2013 普及组 T2 题目描述 给定一个只包含加法和乘法的算术表达式&#xff0c;请你编程计算表达式的值。 输入格式 …