kafka基本概念及操作

news2025/1/12 23:11:22

kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的 (replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、 Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

1.使用场景

  • 日志收集:可以用Kafka收集各种服务的log,通过kafka以统⼀接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

  • 消息系统:解耦和生产者和消费者、缓存消息等。

  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、 搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

2.kafka基本概念

image-20230811105243277

整个流程应该是:producer通过网络发送消息到Kafka集群,然后consumer 来进行消费,如下图:

image-20230811105555825

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

kafka基本使用

1.安装

以下所有操作全部基于kafka_2.13-3.0.1.tgz**(3.0.1版本)**这个版本

配置文件server.properties(主要修改以下配置)

#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9092
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9092
#kafka的消息存储⽂件
log.dir=/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181

启动

./kafka-server-start.sh -daemon ../config/server.properties

验证

# 查看端口是否占用
netstat -ntlp 

或者

进入到zk内查看是否有kafka的节点:/brokers/ids/0

./zkCli.sh

image-20230814134630858

2.创建topic

执行以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因子也设置为 1

./kafka-topics.sh --bootstrap-server kafkahost:9092 --create --topic test --partitions 1 --replication-factor 1

-- 新版本的kafka,已经不需要依赖zookeeper来创建topic,新版的kafka创建topic指令如下:
./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test --partitions 1 --replication-factor 1

3.查看kafka中所有的主题

./kafka-topics.sh --bootstrap-server kafkahost:9092 --list

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --list

4.发送消息

kafka自带了⼀个producer命令客户端,可以从本地文件中读取内容或者以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每⼀个行会被当做成⼀个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

把消息发送给broker中的某个topic,打开⼀个kafka发送消息的客户端,然后开始用客户端向kafka服务器发送消息

./kafka-console-producer.sh --bootstrap-server 124.222.253.33:9092 --topic test

5.消费消息

消费消息两种方式

对于consumer,kafka同样也携带了⼀个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。 使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

  1. 从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --topic test
    
  2. 从当前主题中的第⼀条消息开始消费

    ./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --from-beginning --topic test
    

6.消息的细节

image-20230811094410650

  • 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
  • 消息的保存是有序的,通过offset偏移量来描述消息的有序性
  • 消费者消费消息时可以通过offset来描述当前要消费的那条消息的位置

7.单播&多播消息

单播还是多播消息取决于topic有多少消费组

1)单播

如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。(同⼀个消费组中只能有⼀个消费者收到订阅topic中的消息。)

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup --topic test

2)多播

不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同⼀个消息。

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092 --consumer-property group.id=testGroup2 --topic test

3)区别

image-20230811111854111

8.查看消费组详细信息

# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --list

# 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 124.222.253.33:9092 --describe --group testGroup

image-20230811113410786

  • Currennt-offset:当前消费组的已消费偏移量(最后被消费的消息的偏移量)
  • Log-end-offset:主题对应分区消息的结束偏移量(HW) 【消息总量,最后一条消息偏移量】
  • Lag:当前消费组未消费的消息数(积压消息量)

Kafka中主题和分区的概念

1.主题

主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。

但是有⼀个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的。为了解决单个文件过大的问题,kafka提出了Partition分区的概念。

2.分区

1)分区的概念

通过partition将⼀个topic中的消息分区来存储。

这样的好处有多个:

  • 分区存储,可以解决统⼀存储文件过大的问题
  • 提高了读写的吞吐量:读和写可以同时在多个分区中进行

image-20230813215613707

2)创建多分区的主题

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic test1 --partitions 2 --replication-factor 1

3.kafka中消息日志文件中保存的内容

  • 00000.log: 这个文件中保存的就是消息

  • __consumer_offsets-49:

    kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区(可以通过offsets.topic.num.partitions设置)。

    • 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
    • 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前offset的值
  • 文件中保存的消息,默认保存7天。七天到后消息会被删除,最后就保留最新的那条数据。

Kafka集群操作

1.搭建kafka集群(三个broker)

  • 创建三个server.properties文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://xx.xx.xx.xx(服务器内网IP地址):9094
advertised.listeners=PLAINTEXT://xx.xx.xx.xx(服务器对外IP地址):9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
  • 通过命令来启动三台broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
  • 校验是否启动成功

进入到zk中查看/brokers/ids中过是否有三个znode(0,1,2)

2.副本的概念

在创建主题时,除了指明了主题的分区数以外,还指明了副本数 replication-factor参数

如下主题,创建了两分区、三副本(副本对应集群中broker数量)

./kafka-topics.sh --bootstrap-server 124.222.253.33:9092 --create --topic my-replicated-topic --partitions 2 --replication-factor 3

副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个副本作为leader,其他是follower。

查看topic情况:

# 查看topic情况
./kafka-topics.sh --describe --bootstrap-server 124.222.253.33:9092 --topic my-replicated-topic

image-20230814105758165

image-20230814105921066

  • leader:

kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产⽣⼀个新的leader(follower通过poll的方式来同步数据)

  • follower:

接收leader的同步的数据,leader挂了,参与leader选举

  • replicas:

当前副本存在的broker节点

  • isr:

可以同步和已同步的broker节点会被存入到isr集合中。如果isr中的broker节点性能较差,会被踢出isr集合。

3.broker、主题、分区、副本

综上broker、主题、分区、副本概念已全部展示:

集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。

4.kafka集群消息的发送

./kafka-console-producer.sh --broker-list 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --topic my-replicated-topic

5.kafka集群消息的消费

1)普通消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --topic my-replicated-topic

2)指定消费组消费

./kafka-console-consumer.sh --bootstrap-server 124.222.253.33:9092,124.222.253.33:9093,124.222.253.33:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic

6.分区分消费组的集群消费中的细节

image-20230814115045080

  • ⼀个partition只能被⼀个消费组中的⼀个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?(Kafka只在partition的范围内保证消息消费的局部顺序性不能在同⼀个topic中的多个partition中保证总的消费顺序性。 ⼀个消费者可以消费多个partition。)

  • partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息

  • 如果消费者挂了,那么会触发rebalance机制,会让其他消费者来消费该分区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/875734.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

技术解析丨主轴自动换刀系统是如何工作的?有哪些优点?

一、主轴气动自动换刀系统原理 1.当加工过程中需要更换主轴上的刀具时,操作人员通过控制系统发出换刀指令。 2.控制系统根据指令向气动系统发送动作信号,驱动气动马达带动换刀机构运动。 3.换刀机构中的刀具夹持器将现有刀具从主轴上取下,…

FreeRTOS(事件组)

资料来源于硬件家园:资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、事件的概念与应用 1、事件的概念 2、事件的应用 二、事件的运作机制 1、FreeRTOS中事件组的句柄 2、FreeRTOS 任务间事件标志组的实现 3、FreeRTOS 中断方式事件标志组的实现…

vue2学习:reduce方法和computed计算属性用法

reduce reduce可以遍历集合并将集合所有的值汇总为一个。 第一个参数是一个回调函数,函数第一个参数是汇总起来的最终值,默认是集合的第一项,函数第二个参数是集合遍历出来的集合元素; 第二个参数可以指定回调函数中第一个参数汇…

62、华为昇腾开发板Atlas 200I DK A2配置mmpose的hrnet模型推理python/c++

基本思想:适配mmpose模型,记录一下流水帐,环境配置和模型来自,请查看参考链接。 链接: https://pan.baidu.com/s/1IkiwuZf1anyKX1sZkYmD1g?pwdi51s 提取码: i51s 一、转模型 (base) rootdavinci-mini:~/sxj731533730# atc --mo…

优测云服务平台|【压力测试功能升级】轻松完成压测任务

一、本次升级主要功能如下: 1.多份报告对比查看测试结果 2.报告新增多种下载格式 Word格式Excel格式 3.新增多种编排复杂场景的控制器 漏斗控制器并行控制器事务控制器仅一次控制器分组控制器集合点 4.新增概览页面,包含多种统计维度 二、报告对比…

智慧工地源码,互联网+建筑工地,基于微服务+Java+Spring Cloud +Vue+UniApp开发

基于微服务JavaSpring Cloud VueUniApp MySql开发的智慧工地云平台源码 智慧工地概念: 智慧工地就是互联网建筑工地,是将互联网的理念和技术引入建筑工地,然后以物联网、移动互联网技术为基础,充分应用BIM、大数据、人工智能、移…

DoorGym:开源的可拓展的开门仿真环境,用于域随机化的强化学习、深度强化学习

0.概述 目的:创建一个可以改变门把手形状、类型、位置、环境颜色、照明条件、机械臂结构的仿真环境,以训练出鲁棒性更高、更能关注到任务本质特征、容易迁移到现实的模型 网址:环境下载, 1.领域随机化DR 假设很难对目标域进…

在Visual Studio上,使用OpenCV实现人脸识别

1. 环境与说明 本文介绍了如何在Visual Studio上,使用OpenCV来实现人脸识别的功能 环境说明 : 操作系统 : windows 10 64位Visual Studio版本 : Visual Studio Community 2022 (社区版)OpenCV版本 : OpenCV-4.8.0 (2023年7月最新版) 实现效果如图所示&#xff0…

SAP SM30 自动带出描述实现

需求: 在SM30中维护销售订单类型的时候,根据维护的销售订单类型自动带出订单类型描述 事务码: SE11 进入表维护生成器中 创建事件 选择【维护事件】: 05 自定义子例程: SET_DESCRIPTION 点击编辑器按钮进行代码编辑 具体代码…

浅学实战:探索PySpark实践,解锁大数据魔法!

文章目录 Spark和PySpark概述1.1 Spark简介1.2 PySpark简介 二 基础准备2.1 PySpark库的安装2.2 构建SparkContext对象2.3 SparkContext和SparkSession2.4 构建SparkSession对象2.5 PySpark的编程模型 三 数据输入3.1 RDD对象3.2 Python数据容器转RDD对象3.3 读取文件转RDD对象…

【力扣每日一题】1572. 矩阵对角线元素的和 8.11打卡

文章目录 题目思路代码 题目 1572. 矩阵对角线元素的和 难度: 简单 描述: 给你一个正方形矩阵 mat,请你返回矩阵对角线元素的和。 请你返回在矩阵主对角线上的元素和副对角线上且不在主对角线上元素的和。 返回合并后的二叉树。 注意…

(leecode)密码检查

有点感觉&#xff0c;试试看~ 先贴解法&#xff0c;再说题目和思路 题解 #include <stdio.h> #include <string.h> #include <ctype.h>int main() {int N 0;scanf("%d",&N);getchar();while(N--) {char str[101] {0};scanf("%s&…

Android性能优化——内存优化

一、内存问题 内存抖动&#xff0c;锯齿状&#xff0c;GC导致卡顿内存泄漏&#xff0c;可用内存减少&#xff0c;频繁GC 内存溢出&#xff0c;OOM&#xff0c;程序异常 二、内存分析工具 Memory ProfilerMemory Analyzer LeakCanary Memory Profiler 实时图表展示应用内存使…

10分钟极速入门dash应用开发

大家好我是费老师&#xff0c;几天前我发布了由我开源维护的dash通用网页组件库fac的0.2.x全新版本&#xff0c;为大家介绍了其具有的诸多实用特性功能&#xff0c;也吸引了很多对基于dash的Python全栈应用开发感兴趣的朋友&#xff0c;为了方便更多对dash应用开发不甚了解的朋…

stable diffusion 电商应用技术(插图部分重绘)

1.下载inpaint anything插件 2.下载识别模型 3.使用全景分割 4.分割模版,获取蒙版 5.发送到图生图重绘制 6.固定姿势 7.clip反推提示词 8.生成重绘衣服

msvcr110.dll缺失的解决方法分享,多种方法教你修复msvcr110.dll

我们在使用电脑的时候会遇到各种各样的问题&#xff0c;特别是dll文件缺失的这一块更是经常可以看到的&#xff0c;如你在使用电脑的时候&#xff0c;突然弹出一个电脑缺失了msvcr110.dll文件&#xff0c;一些程序无法运行&#xff0c;这时候我们就要针对于这方面来进行一些解决…

【C++】vector容器

0.前言 1.vector构造函数 #include <iostream> using namespace std; #include <vector>void printVector(vector<int>& v) //此处&代表 引用 &#xff1b;若取地址&#xff0c;则是 数据类型* 变量名 {for (vector<int>::iterator it v.begi…

PLUS操作流程、应用与实践,多源不同分辨率数据的处理、ArcGIS的应用、PLUS模型的应用、InVEST模型的应用

PLUS模型是由中国地质大学&#xff08;武汉&#xff09;地理与信息工程学院高性能空间计算智能实验室开发&#xff0c;是一个基于栅格数据的可用于斑块尺度土地利用/土地覆盖(LULC)变化模拟的元胞自动机(CA)模型。PLUS模型集成了基于土地扩张分析的规则挖掘方法和基于多类型随机…

从LeakCanary看Fragment生命周期监控

前文中我们已经了解到LeakCanary中Service生命销毁的监听方式&#xff0c;那么Fragment的生命周期监听又是怎么实现的呢&#xff1f; Activity生命周期监听&#xff0c;在Application里面有ActivityLifecycleCallbacks&#xff0c;那么Fragment是否相似呢&#xff1f;我们的第…

Docker 本地镜像发布到私有仓库

1. 本地镜像发布到私有库流程 2. 是什么 1 官方Docker Hub地址&#xff1a;https://hub.docker.com/&#xff0c;中国大陆访问太慢了且准备被阿里云取代的趋势&#xff0c;不太主流。 2 Dockerhub、阿里云这样的公共镜像仓库可能不太方便&#xff0c;涉及机密的公司不可能提供镜…