【Kafka】常用操作

news2025/1/10 2:00:54

1、基本概念

在这里插入图片描述

1. 消息: Kafka是一个分布式流处理平台,它通过消息进行数据的传输和存储。消息是Kafka中的基本单元,可以包含任意类型的数据。

2. 生产者(Producer): 生产者负责向Kafka主题发送消息。它将消息发布到指定的主题,可以按照自定义的逻辑生成消息,并决定消息发送的频率和顺序。

3. 消费者(Consumer): 消费者从Kafka主题订阅并接收消息。它可以以不同的方式消费消息,如批量拉取、实时流式处理或订阅特定的消息主题。

4. 主题(Topic): 主题是Kafka中消息的分类标签,用于组织消息。每个主题可以有多个生产者和多个消费者。主题通常与特定的业务领域或数据类型相关联。

5. 分区(Partition): 主题可以被分割成多个分区,每个分区都是一个有序且持久化的消息队列。分区允许Kafka对消息进行水平扩展,并提供了并行处理和负载均衡的能力。

6. 偏移量(Offset): 偏移量是消息在分区中的唯一标识符,用于表示消息在分区内的顺序位置。消费者可以跟踪偏移量来记录已经读取的消息,以便实现精确的消费位置控制。

7. 消费者组(Consumer Group): 消费者组是一组具有相同逻辑的消费者,它们共同消费一个或多个主题中的消息。消费者组允许Kafka进行水平扩展和负载均衡,在该组内的每个消费者负责处理不同的分区。

8. 副本(Replication): Kafka使用副本机制来提供数据冗余和高可用性。每个分区都可以配置多个副本,这些副本保持分区数据的一致性,并可以替代主副本以提供故障恢复功能。

2、安装部署

参考:
https://juejin.cn/post/7158663198411849741

https://www.cnblogs.com/linjiqin/p/13196347.html

3、常用命令

配置文件解析:cat server.properties

#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600 #kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

启动/关闭 kafka:

cd /usr/local/kafka/kafka_2.12-3.5.0/bin/

bin/kafka-server-start.sh config/server.properties
bin/kafka-server-stop.sh stop

验证kafka是否可以使用,仍在bin目录下

运行kafka生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic sun

运行kafka消费者接收消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

4、常用操作API

创建生产者并发送消息

from kafka import KafkaProducer
import time
# 创建生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 发送单条消息
producer.send('my_topic', b'Hello, Kafka!')

# Kafka的发送实际上是异步的
# 生产者在发送消息之后并不会等待确认消息是否已经成功到达Kafka broker
# 而是立即继续执行下一行代码或退出程序
# 在生产者发送完消息后,给消费者足够的时间来连接到Kafka broker并订阅主题

# 等待消费者订阅主题
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题

# 发送多条消息
messages = [b'Message 1', b'Message 2', b'Message 3']
for message in messages:
    producer.send('my_topic', message)
time.sleep(2)  # 延迟2秒钟,给消费者足够的时间连接到Kafka并订阅主题

创建消费者并订阅主题并消费消息

from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# 消费消息
for message in consumer:
    print(message.value.decode())

指定消费者组和自动提交偏移量

from kafka import KafkaConsumer

# 创建消费者,并指定消费者组和自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',
                         bootstrap_servers='localhost:9092',
                         enable_auto_commit=True)

# 消费消息
for message in consumer:
    print(message.value.decode())

指定消费者组和自动提交偏移量

为什么需要指定消费者组呢?

在Kafka中,消费者组是一组消费者的逻辑名称,它们共同协作来消费一个或多个主题中的消息。通过将消费者组绑定到特定主题上,Kafka能够提供高可用性、负载均衡和容错能力。

指定消费者组有以下几个原因:

  1. 负载均衡: 当多个消费者以相同的消费者组订阅同一个主题时,Kafka会自动分配分区给每个消费者,从而实现负载均衡。每个消费者只处理被分配的分区,这样可以确保所有分区被均匀地消费。
  2. 容错能力: 如果有消费者发生故障或离线,指定消费者组可以确保其他消费者接管该消费者组失去的分区,从而实现容错能力。这意味着即使某些消费者不可用,消息仍然可以被处理。
  3. 消费者协作: 消费者组允许多个消费者协同工作,以实现更高的消费并行度。每个消费者可以独立地处理其分配的分区,并且可以扩展系统的整体处理能力。

需要注意的是,如果您没有为消费者指定消费者组,则它将成为一个独立的消费者。这种情况下,每个消费者将独立地消费所有分区中的消息,而不会共享负载或具备容错能力。

因此,在大多数情况下,为了实现负载均衡、容错和提高处理能力,您应该指定消费者组,尤其是在需要同时处理大量消息或要求高可用性的场景中。如果您只需要简单地消费主题中的消息,而不关注这些特性,那么可以选择不指定消费者组。

手动提交偏移量

from kafka import KafkaConsumer

# 创建消费者,并禁用自动提交偏移量
consumer = KafkaConsumer('my_topic', group_id='my_consumer_group',
                         bootstrap_servers='localhost:9092',
                         enable_auto_commit=False)

# 消费消息并手动提交偏移量
for message in consumer:
    print(message.value.decode())
    consumer.commit()

自动提交偏移量和手动提交偏移量有什么区别呢?

自动提交偏移量(Auto Commit Offset)和手动提交偏移量(Manual Commit Offset)是两种不同的消费者偏移量管理方式。

自动提交偏移量:

  • 在自动提交模式下,消费者会定期自动将已消费的消息偏移量提交给Kafka。
  • 消费者无需显式调用提交偏移量的方法,Kafka会在后台自动处理。
  • 自动提交偏移量可以简化代码,减少了手动提交的复杂性。
  • 然而,自动提交偏移量可能会导致一些问题。例如,如果消费者在处理消息之前发生故障,那么已经消费但尚未提交的偏移量将丢失,造成消息重复或丢失。

手动提交偏移量:

  • 在手动提交模式下,消费者需要显式地调用提交偏移量的方法,将已消费的消息偏移量提交给Kafka。
  • 手动提交偏移量提供了更好的控制能力,可以确保消息的准确处理和可靠提交。
  • 消费者可以在适当的时机调用commit()方法来提交偏移量。通常,在成功处理消息后再进行提交是一个常见的模式。
  • 手动提交偏移量需要额外的代码来管理和处理偏移量的提交,但它提供了更高的灵活性和可靠性。

选择使用自动提交偏移量还是手动提交偏移量取决于具体的使用场景和需求。如果您的应用程序对消息处理的准确性和可靠性要求较高,或者需要更精细的控制以避免重复消费或消息丢失,那么手动提交偏移量可能更适合。否则,自动提交偏移量可以提供一种简化的方式来管理偏移量,尤其在简单的消费者应用中很常见。

手动提交偏移量与自动提交偏移量在性能方面可能存在一些差异,但这取决于具体的使用情况和配置。

性能方面的考虑:

  1. 提交频率: 自动提交偏移量会定期提交偏移量到Kafka服务器,默认情况下是每隔一段时间提交一次。相比之下,手动提交偏移量可以根据应用程序的需求选择何时提交,可以控制提交的频率。如果手动提交偏移量过于频繁,可能会影响性能。
  2. 网络延迟: 手动提交偏移量需要与Kafka服务器进行通信来提交偏移量。如果手动提交偏移量的操作导致频繁的网络调用,而且网络延迟较高,可能会对性能产生一定的影响。
  3. 消息处理时间: 如果消息处理时间很长,手动提交偏移量可能会在处理消息之前进行提交,以保证消息处理的可靠性。然而,这样也会增加提交偏移量的开销,可能降低整体性能。

需要注意的是,性能差异通常是微小的,并且在大多数情况下不会成为主要限制因素。如果性能是一个关键问题,可以根据实际情况进行测试和优化。

此外,可以通过调整参数来改善性能,例如增加自动提交的间隔时间、批量提交偏移量等。使用合适的配置和优化技术可以平衡性能和可靠性之间的权衡。

总而言之,手动提交偏移量可能会稍微影响性能,但仍然取决于具体的使用情况和配置。对于大多数应用程序而言,差异通常是可以接受的,并且可以根据实际需求进行调整和优化。

查看当前有哪些topic

from kafka import KafkaAdminClient

# 创建AdminClient连接到Kafka集群
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092')

# 获取主题列表
topic_list = admin_client.list_topics()

# 打印主题列表
print(topic_list)

# ['my_topic', 'sun', '__consumer_offsets']
# __consumer_offsets是Kafka中的一个系统内置主题
# 这个特殊的主题用于存储消费者组的偏移量(offsets)
# 以跟踪消费者在每个分区中读取消息的位置
# __consumer_offsets主题的目的是为了支持Kafka的消费者组功能
# 当消费者组启用自动提交偏移量时,Kafka会将消费者组的偏移量信息存储在__consumer_offsets主题中
# 以便能够在重平衡、故障恢复等情况下为消费者提供正确的偏移量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/786289.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智慧园区楼宇合集:数字孪生管控系统

智慧园区是指将物联网、大数据、人工智能等技术应用于传统建筑和基础设施,以实现对园区的全面监控、管理和服务的一种建筑形态。通过将园区内设备、设施和系统联网,实现数据的传输、共享和响应,提高园区的管理效率和运营效益,为居…

2023年一建学霸笔记

考点:单方取消或辞去委托承担的民事责任女《民法典》规定,因解除合同造成对方损失的,除不可归责于该当事人的事由外,无偿委托合同的解除方应当赔偿因解除时间不当造成的直接损失,有偿委托合同的解除方应当赔偿对方的直接损失和合同…

光模块高低温消光比差异大的原因分析

用于高速数字通信的光模块,需要具备一些特定的参数条件。其中的一个参数,就是消光比。消光比被用来描述最优的偏置条件和激光发射功率转化成调制功率的效率。今天就跟着小易来了解一下在实际应用中消光比产生差异的原因吧! 一、消光比的定义…

给照片加水印软件让你保护版权不麻烦

嘿!想要保护你的照片免受盗用吗?或者想为你的作品增添独特的标识?好消息是现在有一种水印技术可以帮你解决这些问题,那么,你知道照片加水印软件有哪些吗?还不清楚的朋友请你关注下这篇文章哦。接下来让我来…

125K天线驱动器芯片UM12020D 最大直流驱动电流高达1.8A

UM12020D一个集成的天线驱动器,该芯片提供高达1.8A的输出直流电流,可在0至11V的天线电源(VM)和1.8V至5V的器件电源电压 (VCC) 上工作。该产品具有超低的rds-on,采用SOP-8封装。UM12020D具有PWM(IN1-IN2&…

防止超卖的7种实现

高并发场景在现场的日常工作中很常见,特别是在互联网公司中,这篇文章就来通过秒杀商品来模拟高并发的场景。 本文环境: SpringBoot 2.5.7 MySQL 8.0 X MybatisPlus Swagger2.9.2模拟工具: Jmeter模拟场景: 减库存-…

一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布

之前介绍了RabbitMQ以及如何在SpringBoot项目中整合使用RabbitMQ,看过的朋友都说写的比较详细,希望再总结一下目前比较流行的MQTT。所以接下来,就来介绍什么MQTT?它在IoT中有着怎样的作用?如何在项目中使用MQTT&#x…

【Android】setContentView的学习笔记

启动一个Activity performLaunchActivity() ActivityThread.performLaunchActivity() 方法是 Android 系统中负责启动一个 Activity 的关键方法。 当调用startActivity()方法启动一个 Activity 时,ActivityThread 对象会接收到该请求&…

小白必看系列之图书管理系统-登录和注册功能示例代码

文章目录 前言变量定义区域实体部分区域注册账号逻辑用户登录逻辑退出程序打印用户信息完整代码完结 前言 在现代社会中,计算机科学和编程技术的重要性日益凸显。作为开发者和技术爱好者,我们时刻追求着创新和实用性,希望通过技术的力量改善…

Springboot+Netty

目录 一、netty入门 二、启动方式 三、netty服务启动类 四、handler链 五、具体业务 六、 线程或者非spring管理的bean中获取spring管理的bean 七、效果 一、netty入门 Netty-导学_哔哩哔哩_bilibili 入门视频量比较大,最主要是了解netty的架构 netty官网&am…

Chapter 9 Port Delays (端口延迟)set input/output delay

文章目录 9.1 Input Availability---输入有效9.1.1 Min and Max Availability Time---最小和最大有效时间9.1.2 Multiple Clocks9.1.3 Understanding Input Arrival Time 9.2 Output Requirement9.2.1 Min and Max Required Time9.2.2 Multiple Reference Events9.2.3 Understa…

【梦辛工作室】IF判断优化、责任链模式 IfChain

大家好哇,我是梦辛工作室的灵,在最近的开发中,有许多需要判断的分支处理,且处理内容较多且复杂,代码就容易越写越复杂,导致后期无法继续更新跌打,然后基于这个环境,我用责任链模式写…

热备盘激活失败导致raid5阵列崩溃的服务器数据恢复案例

服务器数据恢复环境: 一台Linux Redhat操作系统服务器上有一组由5块硬盘组建的raid5阵列,包含一块热备盘。上层部署一个OA系统和Oracle数据库。 服务器故障: raid5阵列中的1块磁盘离线,硬盘离线却没有激活热备盘,直到…

系统集成|第四章(笔记)

目录 第四章 项目管理一般知识4.1 项目与项目管理4.1.1 项目4.1.2 项目的组织4.1.3 项目生命周期4.1.4 典型的信息系统项目的生命周期模型4.1.5 单个项目管理过程 上篇:第三章、系统集成专业技术 第四章 项目管理一般知识 4.1 项目与项目管理 4.1.1 项目 定义&…

SQL注入实操二

文章目录 一、sqli-lab靶场1.轮子模式总结2.Less-21a.注入点判断b.轮子测试c.获取数据库名称d.获取表信息e.获取列信息f.获取表内数据 3.Less-22a.注入点判断b.轮子测试c.获取数据库名称d.获取表信息e.获取列信息f.获取表内数据 4.Less-23a.注入点判断b.轮子测试c.获取数据库名…

如何模拟实现分布式文件存储

如何解决海量数据存不下的问题 传统做法是是在宕机存储。但随着数据变多,会遇到存储瓶颈 单机纵向扩展:内存不够加内存,磁盘不够家磁盘。有上限限制,不能无限制加下去 多机横向扩展:采用多台机器存储,一…

vue+axios实现点击取消请求功能

代码片段 <template> <el-button type"primary" click"clickExportData">导出</el-button><el-dialog title"正在导出&#xff0c;请稍等" :visible.sync"progressShow" :close-on-click-modal"false"…

sql优化:为什么通常选用根据id查询而不是根据name?

先来看一个最常见的问题,下面两个sql语句哪个效率更高一些&#xff1f; select * from user where id 1; select * from user where name 张三 在没有给name加索引的时候&#xff0c;id是有主键索引的&#xff0c;也就是聚集索引&#xff0c;这样就是一个BTree结构&#xf…

成为一名数字IC后端工程师需要掌握哪些技能?(内附学习视频)

众所周知&#xff0c;数字后端设计是IC设计中必不可少的一个环节&#xff0c;数字后端工程师是将门级网表转换成标准的GDS文件&#xff0c;又称为设计实现或物理设计。正所谓前端保证功能正确&#xff0c;后端保证芯片的实现正确。 数字后端工程师是做什么的&#xff1f; 数字…

操作系统启动相关概念(BIOS、MBR、GPT、BRUB)

不管是 Windows 还是 Linux 操作系统&#xff0c;底层设备一般均为物理硬件&#xff0c;操作系统启动之前会对硬件进行检测&#xff0c;然后硬盘引导启动操作系统&#xff0c;如下为操作系统启动相关的各个概念。 一、BIOS 基本输入输出系统&#xff08;Basic Input Output Sy…