kafka(一)——简介

news2024/12/27 17:05:01

简介

Kafka 是一种分布式、支持分区、多副本的消息中间件,支持发布-订阅模式,多用于实时处理大量数据缓存的场景,类似于一个“缓存池”。

架构

在这里插入图片描述

  • Producer:消息生产者;
  • Consumer:消息消费者;
  • Broker:一台kafka服务器也称作一个broker,kafka集群包含多个broker;
  • Topic:一个topic为一个消息队列,生产者、消费者基于topic进行发布-订阅;
  • Partition:消息分区,一个topic可以分为多个partition,每个partition是一个消息队列;
  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader;
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader;

安装部署

  1. 下载kafka安装包
# 下载地址,取最新包即可,最新版本为kafka_2.13-3.6.1.tgz
https://kafka.apache.org/downloads
  1. 解压
tar -zxvf kafka_2.13-3.6.1.tgz

mv kafka_2.13-3.6.1 kafka

cd kafka/
  1. 修改配置文件

    1)本次集群配置采用kraft模式,集群节点为192.168.1.12~14;

    2)在3个节点解压后,进入kafka/config/kraft目录,修改server.properties文件:

    • 192.168.1.12节点配置文件修改如下
    "node.id=1"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.12:9092"
    
    • 192.168.1.13节点配置文件修改如下
    "node.id=2"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.13:9092"
    
    • 192.168.1.14节点配置文件修改如下
    "node.id=3"
    "controller.quorum.voters=1@192.168.1.12:9093,1@192.168.1.13:9093,3@192.168.1.14:9093"
    "advertised.listeners=PLAINTEXT://192.168.1.14:9092"
    

    节点说明:

    # Broker节点
    Kafka集群中的数据节点(消息队列),它们负责接收客户端的消息和传递消息给客户端,默认情况下,每个Broker节点会监听9092端口,该端口用于与客户端进行通信,客户端可以将消息发送到这个端口,或者从这个端口接收消息,这个端口可以称作客户端通信端口。
    
    # Controller节点
    Kafka集群中的控制器节点,负责管理集群的状态和元数据,Controller节点监听的端口通常是9093,该端口用于集群中其他节点获取元数据或在混合节点选举新的Controller时进行通信,通过该端口,其他节点可以与Controller节点交互,获取集群的元数据信息或参与控制器的选举过程,这个端口可以称作控制器端口。
    
    # 混合节点
    同时担任Broker和Controller角色的节点,这两个端口都会被使用,默认情况下混合节点将监听9092端口接收和传递消息给客户端,并监听9093端口用于与其他节点进行元数据交换和控制器选举通信,可见混合节点会同时使用两个端口分别作为客户端通信端口与控制器端口。
    
  2. 生成集群id

# KRaft模式的kafka集群需要设定一个id,在任意节点执行如下命令:
sh /usr/local/incvs-kafka/bin/kafka-storage.sh random-uuid
  1. 格式化数据目录
# 在集群所有节点执行如下命令:
sh /home/kafka/bin/kafka-storage.sh format -t "步骤4)生成的id" -c /home/kafka/config/kraft/server.properties
  1. 启动Kakfa
# 在3个节点依次执行如下命令:
sh /home/kafka/bin/kafka-server-start.sh /home/kafka/config/kraft/server.properties

测试

  • 在192.168.1.13节点启动消费者脚本订阅“MykafkaTopic”
sh /home/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.13:9092 --topic MykafkaTopic
hello kafka
  • 在192.168.1.12节点启动生产者脚本往topic生产消息
sh /home/kafka/bin/kafka-console-producer.sh --bootstrap-server 192.168.1.12:9092 --topic MykafkaTopic
>hello kafka
>

配置文件说明

server.properties配置示例:

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
node.id=1

# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093

############################# Socket Server Settings #############################

# The address the socket server listens on.
# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
# with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kraft-combined-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
  • process.roles:服务器角色,包含broker和controller。如示例中使用kraft模式使用的混合模式,即“process.roles=broker,controller”;
  • node.id:节点ID,一个集群中的node.id不能重复;
  • controller.quorum.voters:控制选举的投票节点,格式为“node-id@host:port”;
  • listeners:服务器监听的地址,格式为“listener_name://host_name:port”;
  • inter.broker.listener.name:用于broker之间通信的监听器名称;
  • advertised.listeners:服务器向客户端宣告的监听器名称、主机名和端口;
  • controller.listener.names:控制器使用的监听器名称列表;
  • listener.security.protocol.map:监听器名称到安全协议的映射。默认情况下,它们是相同的;
  • num.network.threads:服务器用于从网络接收请求和向网络发送响应的线程数;
  • num.io.threads:服务器用于处理请求(可能包括磁盘 I/O)的线程数;
  • socket.send.buffer.bytes:服务器用于发送数据的缓冲区大小;
  • socket.receive.buffer.bytes:服务器用于接收数据的缓冲区大小;
  • socket.request.max.bytes:服务器接受的请求的最大大小(用于防止内存溢出);
  • log.dirs:用于存储日志文件的目录列表;(重要,需清楚日志存储的原理)
  • num.partitions:每个主题的默认日志分区数;
  • num.recovery.threads.per.data.dir:每个数据目录在启动时用于日志恢复和关闭时用于刷新的线程数;
  • offsets.topic.replication.factor:内部主题 “__consumer_offsets” 和 “__transaction_state” 的复制因子;
  • transaction.state.log.replication.factor:事务状态日志的复制因子;
  • transaction.state.log.min.isr:事务状态日志的最小同步副本数;
  • log.flush.interval.messages:强制将数据刷新到磁盘之前接受的消息数;
  • log.flush.interval.ms:消息在日志中停留的最大时间,超过这个时间就会强制刷新到磁盘;
  • log.retention.hours:由于年龄而使日志文件有资格被删除的最小年龄;
  • log.retention.bytes:基于大小的日志保留策略,默认1G;
  • log.segment.bytes:日志段文件的最大大小;
  • log.retention.check.interval.ms:检查日志段是否可以根据保留策略被删除的间隔;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1400673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

6. Spring Boot的starters

6. Spring Boot的starters(重要) 一般认为,SpringBoot 微框架从两个主要层面影响 Spring 社区的开发者们: 基于 Spring 框架的“约定优先于配置(COC)”理念以及最佳实践之路。提供了针对日常企业应用研发各种场景的 spring-boot…

人工智能原理实验2(2)——罗马尼亚问题(贪婪搜索、A*搜索、BFS、DFS)

🧡🧡实验内容🧡🧡 根据上图以Zerind为初始状态,Bucharest为目标状态实现搜索,分别以贪婪搜索(只考虑直线距离)和A算法求解最短路径。 按顺序列出贪婪算法探索的扩展节点和其估价函数…

读书笔记-《数据结构与算法》-摘要8[桶排序]

桶排序和归并排序有那么点点类似,也使用了归并的思想。大致步骤如下: 设置一个定量的数组当作空桶。Divide - 从待排序数组中取出元素,将元素按照一定的规则塞进对应的桶子去。对每个非空桶进行排序,通常可在塞元素入桶时进行插入…

springboot113健身房管理系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的健身房管理系统 适用于计算机类毕业设计,课程设计参考与学习用途。仅供学习参考, 不得用于商业或者非法用途,否则,一切后果请用户自负。 看运行截图看 第五章 第四章 获取…

文件操作和IO(1)

认识文件 先来认识狭义上的文件(存储在硬盘(磁盘)上).针对硬盘这种持久化的I/O设备,当我们想要进行数据保存时,往往不是保存成一个整体,而是独立成一个个的单位进行保存,这个独立的单位就被抽象成文件的概念,就类似办公桌上的一份份真实的文件一般. 注意:硬盘 ! 磁盘 磁盘属于…

【算法与数据结构】1049、LeetCode 最后一块石头的重量 II

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引,可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析:本题需要得到石头之间两两粉碎之后的最小值,那么一个简单的思路就是将这堆石头划分成大小相…

HDFS节点故障的容错方案

HDFS节点故障的容错方案 1. NN高可用1.1 选主逻辑1.2 HA切换1.3 注意点1.3.1 fencing的处理1.3.2 健康状态的定义1.3.3 确保zk中的父节点存在1.3.4 确保NN的ID与IP保持一致 2. DN高可用2.1 感知DN节点异常2.1.1 NN感知DN心跳超时2.1.1 客户端链接DN异常 2.2 异常DN上的数据处理…

6.4.4释放音频

6.4.4释放音频 许多Flash动画里的音乐或歌曲非常好听,能不能在没有源文件的情况下把里面的声音文件取出来呢?利用Swf2VideoConverter2可以轻松做到这一点。 1.单击“添加”按钮,在弹出的下拉菜单中选择“添加文件”,…

蓝桥备战--分糖果OJ2928 贪心 分类讨论

题目&#xff1a; 思路&#xff1a; 首先排序(经验之谈) 分类讨论 我们要做到不重不漏的分类 代码&#xff1a; #include <iostream> #include <algorithm> using namespace std;const int N 1e6 10;char dist[N]; int n, x;int main() {string str;cin >…

以超市数据微案例-fineBI可视化分析

一、入门案例&#xff1a; 2.分析思路&#xff1a; 数据清晰界面中添加毛利额计算 **所以在新增步骤之后&#xff0c;必须点击保存并更新&#xff0c;否则可视化界面中无法使用最新的数据 4、数据可视化分析 1&#xff09;销售额最高的十大商品种类 为1-8月超市数据&#xff…

测试用例评审流程

1:评审的过程 A:开始前做好如下准备 1、确定需要评审的原因 2、确定进行评审的时机 3、确定参与评审人员 4、明确评审的内容 5、确定评审结束标准 6、提前至少一天将需要评审的内容以邮件的形式发送给评审会议相关人员。并注明详审时间、地点及偿参与人员等。 7、 在邮件中提醒…

每日一题——1295.统计位数为偶数的数字

方法一 个人方法&#xff1a; 想知道整数型数字有多少位&#xff0c;可以直接把数字转字符&#xff0c;看字符的长度就是数字的位数 var findNumbers function(nums) {let count0for(let num of nums){let strnumif(str.length%20) count}return count }; 消耗时间和内存情况…

【LLM-agent】function call功能、AgentTuning微调

note function call本质&#xff1a;准确识别用户的语义&#xff0c;将其转为结构化的指令&#xff0c;其中通过LLM理解指令和上下文判断需要调用哪个函数、抽取出input中函数所需的参数。是用户和界面交互方式产生质变的一个trick。所以为了提高模型准确识别和调用函数的能力…

HTML+JavaScript-01

说明 之前有一篇JavaWeb-JavaScript中只是简单介绍了一点JavaScript的内容&#xff0c;这篇笔记算是续写的&#xff0c;但是从01开始编号。 引入js文件 html、css、js俗称前端三剑客&#xff0c;一般都是分开写&#xff0c;先写框架、再写css、最后写js。因此在工程量大的情…

第36集《佛法修学概要》

请大家打开讲义第九十六面&#xff0c;我们讲到禅定的修学方便。 在我们发了菩提心&#xff0c;安住菩萨种性以后&#xff0c;我们开始操作六度的法门。六度的法门&#xff0c;它有两个不同的差别的内容&#xff0c;一种是成就我们的善业力&#xff0c;另外一种&#xff0c;是…

Unity中URP下的SimpleLit的 Lambert漫反射计算

文章目录 前言一、Lambert漫反射计算11、MixRealtimeAndBakedGI 函数有三个重载2、3号 调用了 2号3、1号调用了 SubtractDirectMainLightFromLightmap函数4、我们重点来看 Lambert漫反射的实现部分5、其余部分 二、Lambert漫反射计算21、LightingLambert 前言 在之前的文章中&…

电脑pdf如何转换成word格式?用它实现pdf文件一键转换

pdf转word格式可以用于提取和重用pdf文档中的内容&#xff0c;有时候&#xff0c;我们可能需要引用或引用pdf文档中的一些段落、表格或数据&#xff0c;通过将pdf转换为可编辑的Word文档&#xff0c;可以轻松地复制和粘贴所需内容&#xff0c;节省我们的时间&#xff0c;那么如…

windows下载安装ffmpeg最新版

windows环境搭建专栏&#x1f517;点击跳转 win系统环境搭建&#xff08;十六&#xff09;——windows下载安装ffmpeg最新版 文章目录 win系统环境搭建&#xff08;十六&#xff09;——windows下载安装ffmpeg最新版1.下载2.安装3.验证 1.下载 下载页面地址是https://ffmpeg.…

Windows WSL2 占用磁盘空间清理释放

目前工作中时常用到WSL2&#xff08;Ubuntu20.04&#xff09;&#xff0c;在使用一段时间后会发现WSL2所占用磁盘空间越来越多&#xff0c;体现在WSL2之上安装Linux分发对应的vhdx虚拟磁盘文件体积越来越大&#xff0c;会占用Windows自身空间&#xff0c;即使手动清理了Linux分…

计算机毕设thinkphp+mysql+_vue房屋租赁系统h3sem

运行环境:phpstudy/wamp/xammp等 开发语言&#xff1a;php 后端框架&#xff1a;Thinkphp5 前端框架&#xff1a;vue.js 服务器&#xff1a;apache 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat/phpmyadmin 房屋租赁管理系统有不同的用户角色。不同的用户权限对应不…