kafka菜鸟教程

news2025/4/21 19:20:04

一、kafka原理

1、kafka是一个高性能消息队列系统,能够处理大规模的数据流,并提供低延迟的数据传输,它能够以每秒数十万条消息的速度进行读写操作。

二、kafka优点

1、服务解耦

(1)提高系统的可维护性

   通过服务解耦,可以将系统分解为独立的部分,当需要更新或修复某个服务时,可以独立地进行操作,而不会影响到其他服务的正常运作。这大大减少了维护工作的难度和所需时间。

‌(2)增强系统的可扩展性

      解耦后的系统更容易扩展。添加新功能或服务通常不会影响现有系统的其他部分,从而快速响应市场和用户的需求变化。

2、高吞吐量、低延迟

    kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。

3、可扩展性

    集群支持热扩展(kafka-reassign-partitions.sh)分区重分配、迁移

4、持久性、可靠性


消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

5、容错性


允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

6、高并发


支持数千个客户端同时读写

三、主要概念


1. 主题 (Topic)


Kafka 中的消息以主题 (Topic) 为单位进行组织。每个主题代表一个消息流,消息生产者向主题发送消息,消息消费者从主题消费消息。

2. 分区 (Partition)


每个主题可以分为多个分区 (Partition),每个分区是一个有序、不可变的消息序列。分区的存在使得 Kafka 能够水平扩展,可以处理大量数据并提供高吞吐量。

3. 副本 (Replica)


为了保证数据的高可用性,Kafka 允许每个分区有多个副本 (Replica),这些副本存储在不同的服务器上。这样,即使某个服务器故障,数据仍然可用。

4. 生产者 (Producer)


生产者是向 Kafka 主题发送消息的客户端。生产者可以选择将消息发送到特定的分区,也可以让 Kafka 根据某种策略(如轮询)决定将消息发送到哪个分区。

5. 消费者 (Consumer)


消费者是从 Kafka 主题消费消息的客户端。消费者通常属于某个消费者组 (Consumer Group),一个消费者组中的多个消费者可以并行消费同一个主题的不同分区,提高消费速度和效率。

6. 经纪人 (Broker)


Kafka 集群由多个经纪人 (Broker) 组成,每个经纪人是一个 Kafka 实例。经纪人负责存储消息并处理消息的读写请求。

7. ZooKeeper


ZooKeeper 是一个分布式协调服务,Kafka 使用 ZooKeeper 来管理集群元数据,如主题、分区、经纪人等信息。

四、kafka安装教程

   1、 点此链接进入官网下载地址

   2、点击 图1红色方框DOWNLOAD KAFKA 

                            图1

3、点击图2选中的链接下载即可

图2

3、将kafka解压到服务器后修改配置项kafka_2.13-4.0.0\config\zookeeper.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=true
admin.enable=true
audit.enable=true
# admin.serverPort=8080

4、修改配置项 kafka_2.13-4.0.0\config\server.properties     

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.11.22.122:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/app/test/kafka/kafka_2.13-3.2.3/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=10.11.22.121:2181,10.11.22.122:2181,10.11.22.124:2181
zookeeper.connect=10.11.22.122:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=180000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

5、下载zookeeper

按照图3所示点击网站链接下载zookeeper

(1点击链接到zookeeper下载网址

           图3

解压到服务器进入到apache-zookeeper-3.9.3-bin\conf目录下新建文件zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/app/install-test/zk/zookeeper-3.4.6/zkdata
dataLogDir=/app/install-test/zk/zookeeper-3.4.6/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
audit.enable=true
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

server.1=10.11.22.122:2888:2889
#server.2=10.11.22.123:2888:2889
#server.3=10.11.22.124:2888:2889

  6、修改各配置项后先启动zookeeper服务,进入到kafka_2.13-4.0.0文件夹启动命令如下

./bin/zookeeper-server-start.sh  -daemon ./config/zookeeper.properties 

启动成功后查看进程

ps -ef| grep zookeeper

启动成功后如图4所示

图4

7、切换到目录apache-zookeeper-3.9.3-bin\bin目录下输入命令启动zookeeper客户端

./zkCli.sh  -daemon

启动成功后如图5所示

图5

8、切换到kafka_2.13-4.0.0文件夹输入kafka启动命令

./bin/kafka-server-start.sh  -daemon ./config/server.properties 

启动成功后查看进程如图6所示

ps -ef| grep kafka

     图6 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2339623.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

应用镜像是什么?轻量应用服务器的镜像大全

应用镜像是轻量应用服务器专属的,镜像就是轻量应用服务器的装机盘,应用镜像在原有的纯净版操作系统上集成了应用程序,例如WordPress应用镜像、宝塔面板应用镜像、WooCommerce等应用,阿里云服务器网aliyunfuwuqi.com整理什么是轻量…

深入理解分布式缓存 以及Redis 实现缓存更新通知方案

一、分布式缓存简介 1. 什么是分布式缓存 分布式缓存:指将应用系统和缓存组件进行分离的缓存机制,这样多个应用系统就可以共享一套缓存数据了,它的特点是共享缓存服务和可集群部署,为缓存系统提供了高可用的运行环境&#xff0c…

Spring Boot 中的自动配置原理

2025/4/6 向全栈工程师迈进! 一、自动配置 所谓的自动配置原理就是遵循约定大约配置的原则,在boot工程程序启动后,起步依赖中的一些bean对象会自动的注入到IOC容器中。 在讲解Spring Boot 中bean对象的管理的时候,我们注入bean对…

剑指Offer(数据结构与算法面试题精讲)C++版——day16

剑指Offer(数据结构与算法面试题精讲)C版——day16 题目一:序列化和反序列化二叉树题目二:从根节点到叶节点的路径数字之和题目三:向下的路径节点值之和附录:源码gitee仓库 题目一:序列化和反序…

windows server C# IIS部署

1、添加IIS功能 windows server 2012、windows server 2016、windows server 2019 说明:自带的是.net 4.5 不需要安装.net 3.5 尽量使用 windows server 2019、2016高版本,低版本会出现需要打补丁的问题 2、打开IIS 3、打开iis应用池 .net 4.5 4、添…

【教程】PyTorch多机多卡分布式训练的参数说明 | 附通用启动脚本

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 目录 torchrun 一、什么是 torchrun 二、torchrun 的核心参数讲解 三、torchrun 会自动设置的环境变量 四、torchrun 启动过程举例 机器 A&#…

Neo4j初解

Neo4j 是目前应用非常广泛的一款高性能的 NoSQL 图数据库,其设计和实现专门用于存储、查询和遍历由节点(实体)、关系(边)以及属性(键值对)构成的图形数据模型。它的核心优势在于能够以一种自然且…

音视频小白系统入门课-2

本系列笔记为博主学习李超老师课程的课堂笔记,仅供参阅 课程传送门:音视频小白系统入门课 音视频基础ffmpeg原理 往期课程笔记传送门: 音视频小白系统入门笔记-0音视频小白系统入门笔记-1 课程实践代码仓库:传送门 音视频编解…

Linux:安装 CentOS 7(完整教程)

文章目录 一、简介二、安装 CentOS 72.1 虚拟机配置2.2 安装CentOS 7 三、结语 一、简介 CentOS(Community ENTerprise Operating System)是一个基于 Linux 的发行版之一,旨在提供一个免费的、企业级的计算平台,因其稳定性、安全…

MATLAB 控制系统设计与仿真 - 34

多变量系统知识回顾 - MIMO system 这一章对深入理解多变量系统以及鲁棒分析至关重要 首先,对于如下系统: 当G(s)为单输入,单输出系统时: 如果: 则: 所以 因此,对于SISO,系统的增…

【网络】通过Samba实现Window挂在Linux服务器路径

有时候我们去进行内网部署时,会遇到客户或者甲方爸爸说,需要将Linux中的某个路径共享出去到Window上,挂载出比如Z:\这种盘符。通过打开Z盘,来查看服务器的指定目录下的数据。 步骤1: 在Linux中安装samba yum install…

架构思维:缓存层场景实战_读缓存(下)

文章目录 Pre业务场景缓存存储数据的时机与常见问题解决方案1. 缓存读取与存储逻辑2. 高并发下的缓存问题及解决方案3. 缓存预热(减少冷启动问题) 缓存更新策略(双写问题)1. 先更新缓存,再更新数据库(不推荐…

uniapp微信小程序实现sse

微信小程序实现sse 注:因为微信小程序不支持sse请求,因为后台给的是分包的流,所以我们就使用接受流的方式,一直接受,然后把接受的数据拿取使用。这里还是使用uniapp的原生请求。 上代码 //注意:一定要下…

新能源汽车能量流测试的传感器融合技术应用指南

第一部分:核心原理模块化拆解 模块1:多源传感器物理层融合 关键技术: 高精度同步采集架构 采用PXIe-8840控制器同步定时模块(NI PXIe-6674T),实现CAN/LIN/模拟量信号的μs级同步光纤电压传感器&#xff0…

人工智能与网络安全:AI如何预防、检测和应对网络攻击?

引言:网络安全新战场,AI成关键角色 在数字化浪潮不断推进的今天,网络安全问题已经成为每一家企业、每一个组织无法回避的“隐形战场”。无论是电商平台、金融机构,还是政府机关、制造企业,都可能面临数据泄露、勒索病毒…

链表知识回顾

类型:单链表,双链表、循环链表 存储:在内存中不是连续存储 删除操作:即让c的指针指向e即可,无需释放d,因为java中又内存回收机制 添加节点: 链表的构造函数 public class ListNode {// 结点…

FPGA学习(五)——DDS信号发生器设计

FPGA学习(五)——DDS信号发生器设计 目录 FPGA学习(五)——DDS信号发生器设计一、FPGA开发中常用IP核——ROM/RAM/FIFO1、ROM简介2、ROM文件的设置(1)直接编辑法(2)用C语言等软件生成初始化文件 3、ROM IP核配置调用 二、DDS信号发…

OpenCv高阶(六)——图像的透视变换

目录 一、透视变换的定义与作用 二、透视变换的过程 三、OpenCV 中的透视变换函数 1. cv2.getPerspectiveTransform(src, dst) 2. cv2.warpPerspective(src, H, dsize, dstNone, flagscv2.INTER_LINEAR, borderModecv2.BORDER_CONSTANT, borderValue0) 四、文档扫描校正&a…

性能比拼: Go vs Bun

本内容是对知名性能评测博主 Anton Putra Go (Golang) vs. Bun: Performance (Latency - Throughput - Saturation - Availability) 内容的翻译与整理, 有适当删减, 相关指标和结论以原作为准 我对 Bun 在之前的基准测试中的出色表现感到惊讶,因此我决定将它与 Go …

定制化 Docsify 文档框架实战分享

🌟 定制化 Docsify 文档框架实战分享 在构建前端文档平台时,我们希望拥有更友好的用户界面、便捷的搜索、清晰的目录导航以及实用的代码复制功能。借助 Docsify,我实现了以下几个方面的定制优化,分享给大家 🙌。 &…