kafka环境搭建以及基本原理

news2024/10/5 1:50:01

kafka最先是作为日志数据采集,后用于消息传递,kafka能承担tb级别数据存储,确保服务的可用性,允许少量数据的丢失

作为消息中间件就有异步、解耦、削峰三个作用

一、单机搭建

单机ip:192.168.64.133

下载地址:Apache Kafka 选择kafka_2.13-3.4.0.tgz进行下载

关于kafka的版本,前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。

下载Zookeeper,下载地址 Apache ZooKeeper ,kafka有内置的zookeeper,Zookeeper的版本并没有强制要求,这里我们选择比较新的3.6.1版本。

#下载解压
cd /usr/local/kafka
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -zxvf kafka_2.13-3.4.0.tgz

 1、启动Kafka之前需要先启动Zookeeper。**这里就用Kafka自带的Zookeeper。启动脚本在bin目录下。

cd kafka_2.13-3.4.0/
nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 

​ 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。 

 2、启动Kafka。

nohup bin/kafka-server-start.sh config/server.properties &

启动完成后,使用jps指令,看到一个kafka进程,确定服务启动成功。服务会默认在9092端口启动。

3、简单收发消息

​ Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。

  首先,可以使用Kafka提供的客户端脚本创建Topic

#查看帮助命令
bin/kafka-topics.sh --help
#创建Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
#查看Topic
bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

 然后,启动一个消息发送者端。往一个名为test的Topic发送消息。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

当命令行出现 > 符号后,随意输入一些字符然后enter。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。

  然后启动一个消息消费端,从名为test的Topic上接收消息。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

4、其他消费模式

​ 之前我们通过kafka提供的生产者和消费者脚本,启动了一个简单的消息生产者以及消息消费者,实际上,kafka还提供了丰富的消息消费方式。

指定消费进度

​ 通过kafka-console.consumer.sh启动的控制台消费者,会将获取到的内容在命令行中输出。如果想要消费之前发送的消息,可以通过添加--from-begining参数指定。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test

如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 4 --topic test

这表示从第0号Partition上的第四个消息开始读起。Partition和Offset是什么呢,可以用以下指令查看。

分组消费

对于每个消费者,可以指定一个消费者组。kafka中的同一条消息,只能被同一个消费者组下的某一个消费者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。在kafka-console-consumer.sh脚本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组。例如,可以启动三个消费者组,来验证一下分组消费机制: 

#两个消费者实例属于同一个消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup --topic test
#这个消费者实例属于不同的消费者组
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGrroup2 --topic test

查看消费者组的偏移量

​ 接下来,还可以使用kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

二、集群搭建

集群就为了保证服务的高可用和数据的安全性,这就是我的理解,万一挂了其它节点依旧可以对外提供服务

1、部署zookeeper集群

#下载解压
cd /usr/local/zookeeper
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
#配置
cd /usr/local/zookeeper/apache-zookeeper-3.6.1-bin/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg


#zoo.cfg文件中修改如下配置
#Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
dataDir=/usr/local/zookeeper
#Zookeeper的服务端口
clientPort=2181
#集群节点配置
server.1=192.168.64.133:2888:3888
server.2=192.168.64.134:2888:3888
server.3=192.168.64.128:2888:3888

 

其中,clientPort 2181是对客户端开放的服务端口。

集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。

​ 接下来将整个Zookeeper的应用目录分发到另外两台机器上。就可以在三台机器上都启动Zookeeper服务了。

#切换到zookeeper的home目录下,运行如下命令启动
bin/zkServer.sh --config conf start

(总是启动失败)

启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。

​ 三台机器都启动完成后,可以查看下集群状态。

[root@hadoop02 zookeeper-3.5.8]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

这其中Mode 为leader就是主节点,follower就是从节点。

2、部署kafka集群

​ kafka服务并不需要进行选举,因此也没有奇数台服务的建议。

​ 部署Kafka的方式跟部署Zookeeper差不多,就是解压、配置、启服务三板斧。

​ 首先将Kafka解压到/app/kafka目录下。

​ 然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#数据文件地址。同样默认是给的/tmp目录。
log.dirs=/usr/local/kafka/logs
#默认的每个Topic的分区数
num.partitions=1
#zookeeper的服务地址
zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181
#可以选择指定zookeeper上的基础节点。
#zookeeper.connect=192.168.64.133:2181,192.168.64.134:2181,192.168.64.128:2181/kafka

broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。

多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。

配置文件中的注释非常细致,可以关注一下。下面是server.properties文件中比较重要的核心配置

PropertyDefaultDescription
broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。
log.retention.hours168每个日志文件删除之前保存的时间。
num.partitions1创建topic的默认分区数
default.replication.factor1自动创建topic的默认副本数量
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enablefalse是否允许删除主题

​ 接下来就可以启动kafka服务了。启动服务时需要指定配置文件。

bin/kafka-server-start.sh -daemon config/server.properties

-daemon表示后台启动kafka服务,这样就不会占用当前命令窗口。

​ 通过jps指令可以查看Kafka的进程。

三、Kraft集群

​ 在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。

  • broker.properties: 数据节点
  • controller.properties: Controller控制节点
  • server.properties: 即可以是数据节点,又可以是Controller控制节点。

这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可

#配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
process.roles=broker,controller
#配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
node.id=1
#配置集群的投票节点。其中@前面的是节点的id,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
controller.quorum.voters=1@worker1:9093,2@worker2:9093,3@worker3:9093
#Broker对客户端暴露的服务地址。基于PLAINTEXT协议。
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服务协议的别名。默认就是CONTROLLER
controller.listener.names=CONTROLLER
#配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#数据文件地址。默认配置在/tmp目录下。
log.dirs=/app/kafka/kraft-log
#topic默认的partition分区数。
num.partitions=2

将配置文件分发,并修改每个服务器上的node.id属性和advertised.listeners属性。

​ 由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。

[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
j8XGPOrcR_yX4F7ospFkTA
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t j8XGPOrcR_yX4F7ospFkTA -c config/kraft/server.properties 
Formatting /app/kafka/kraft-log with metadata.version 3.4-IV0.

-t 表示集群ID,三个服务器上可以使用同一个集群ID。

​ 接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。

[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-server-start.sh -daemon config/kraft/server.properties 
[oper@worker1 kafka_2.13-3.4.0]$ jps
10993 Jps
10973 Kafka

​ 等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1048943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

npm ,yarn 更换使用国内镜像源,淘宝源

背景 文章首发地址 在平时开发当中,我们经常会使用 Npm,yarn 来构建 web 项目。但是npm默认的源的服务器是在国外的,如果没有梯子的话。下载速度会特别慢。那有没有方法解决呢? 其实是有的,设置国内镜像即可&#x…

PHP8的静态变量和方法-PHP8知识详解

我们在上一课程讲到了public、private、protected这3个关键字,今天我们来讲解static关键字,明天再讲解final关键字。 如果不想通过创建对象来调用变量或方法,则可以将该变量或方法创建为静态变量或方法,也就是在变量或方法的前面…

Docker学习_镜像和容器篇

简介 Docker是一种容器化的技术,可以实现在一台宿主机电脑上运行多个不同的容器,每个容器之间都相互独立,具有完整的一套文件,网络和端口。 可以将其理解为一种虚拟机技术,只不过和VMware等虚拟化技术不同&#xff0…

Day05-循环高级和数组

循环高级 1.无限循环 概念: 又叫死循环。循环一直停不下来。 for格式: for(;;){System.out.println("循环执行一直在打印内容"); } 解释: 初始化语句可以空着不写,表示循环之前不定义任何的控制变量。 条件判断…

C++标准模板(STL)- 输入/输出操纵符-(std::setbase,std::setfill)

操纵符是令代码能以 operator<< 或 operator>> 控制输入/输出流的帮助函数。 不以参数调用的操纵符&#xff08;例如 std::cout << std::boolalpha; 或 std::cin >> std::hex; &#xff09;实现为接受到流的引用为其唯一参数的函数。 basic_ostream::…

深信服云桌面用户忘记密码后的处理

深信服云桌面用户忘记了密码&#xff0c;分两种情况&#xff0c;一个是忘记了登录深信服云桌面的密码&#xff0c;另外一个是忘记了进入操作系统的密码。 一、忘记了登录深信服云桌面的密码 登录虚拟桌面接入管理系统界面&#xff0c;在用户管理中选择用户后&#xff0c;点击后…

C++ | 计算几何:判断点与多边形的关系

在屏幕坐标上有一个多边形&#xff0c;给定一个点&#xff0c;判断该点和多边形的关系。 例如在下图中&#xff0c;点A位于多边形内&#xff0c;点B位于多边形外&#xff0c;点C位于多边形上。 注意: 1.屏幕坐标系中&#xff0c;x轴从左往右为正方向&#xff0c;y轴从上到下为…

10分钟巩固多线程基础

10分钟巩固多线程基础 前言 多线程是并发编程的基础&#xff0c;本篇文章就来聊聊多线程 我们先聊聊概念&#xff0c;比如进程与线程&#xff0c;串行、并行与并发 再去聊聊线程的状态、优先级、同步、通信、终止等知识 进程与线程 什么是进程&#xff1f; 操作系统将资…

外汇天眼:假投资平台COINPAYEX非法吸金2158万,项目小组循线破获诈团据点

近年来加密货币投资愈来愈普及&#xff0c;许多人们开始关注并投入这个市场&#xff0c;期望能够获得高额报酬&#xff0c;甚至达到财务自由的目标。 但需要注意的是&#xff0c;市面上充斥各种黑平台&#xff0c;一不小心恐怕蒙受极大的损失。 9/12&#xff0c;警政署刑事警察…

c语言 - 实现每隔1秒向文件中写入当前系统时间

实现思路 主要是通过库函数和结构体获取当前系统时间&#xff08;年月日和时分秒&#xff09;保存到变量里&#xff0c;然后通过格式化输出函数将当前系统时间输出到文件中去。 但是需要注意的是题目要求每隔 1 s对系统时间进行输出&#xff0c;所以需要加入 sleep()函数进行调…

windows 环境变量设置

方法一&#xff1a;打开环境变量设置窗口&#xff08;或者cmd 执行sysdm.cpl命令&#xff09; 方法二&#xff1a; powershell执行如下命令&#xff0c;注意修改<要增加的环境变量值>值为要设置的环境变量值 [Environment]::SetEnvironmentVariable("PATH", $E…

2023最新团购社群小程序源码 社区商城小程序源码开源版 带完整后台+部署教程

随着互联网的不断发展&#xff0c;社群团购成为了一种新型的商业模式。在这种模式下&#xff0c;通过社群分享和团购折扣&#xff0c;消费者可以享受到更优惠的价格和更便捷的购物体验。给大家分享一个2023最新团购社群小程序源码&#xff0c;包含社区商城小程序源码开源版和完…

openGauss学习笔记-83 openGauss 数据库管理-内存优化表MOT管理-内存表特性-MOT使用内存和存储规划

文章目录 openGauss学习笔记-83 openGauss 数据库管理-内存优化表MOT管理-内存表特性-MOT使用内存和存储规划83.1 MOT内存规划83.2 存储IO83.3 容量需求 openGauss学习笔记-83 openGauss 数据库管理-内存优化表MOT管理-内存表特性-MOT使用内存和存储规划 本节描述了为满足特定…

No132.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

基于python的智慧城市社区服务平台及养老服务子系统的设计与实现

文章目录 导文摘要前言绪论课题背景国内外现状与趋势国内现状:国外现状:趋势:课题内容相关技术与方法介绍系统分析系统设计系统实现导文 基于python的智慧城市社区服务平台及养老服务子系统的设计与实现 摘要 本文基于Python开发了一款智慧城市社区服务平台及养老服务子系统…

ant-design-pro的可编辑表格editprotable的遇到的一些小问题

需求&#xff1a;1、使用可编辑表格实现对某一行表格的数据进行编辑&#xff0c;对输入内容控制必须低于同行某一数据 2、解决搜索数据和列名展示不同&#xff0c;和不可编辑的列不展示在搜索框 实现效果 解决搜索和readonly之间的问题 因为本次需求只要求修改一个数据&…

ElementPlus· tab切换/标签切换 + 分页

tab切换 ---> <el-tabs><el-tab-pane>... 分页 --------> <el-pagination> tab切换 // tab标签切换 // v-model双向绑定选项中的name&#xff0c;tab-change事件在 activeName改变时触发 <script setup> const tabChange (tab, event)>{…

【JAVA】飞机大战

代码和图片放在这个地址了&#xff1a; https://gitee.com/r77683962/fighting/tree/master 最新的代码运行&#xff0c;可以有两架飞机&#xff0c;分别通过WASD&#xff08;方向&#xff09;&#xff0c;F&#xff08;发子弹&#xff09;&#xff1b;上下左右&#xff08;控…

大型企业网如何部署NAT实现需求

1.企业中堕胎电脑如何共享上网&#xff1f; 2.NAT地址转换原理讲解&#xff1b; 3.企业机房如何用NAT让服务器更安全&#xff1f; - NAT - 网络地址转换 - 什么式网络地址 IP地址 -通信时候的设备标识 - 为什么要把IP地址做转换呢&#xff1f; -- 公网IP&#xff…

迷你无人车 Navigation 导航(5)— 基础框架学习

迷你无人车 Navigation 导航&#xff08;5&#xff09;— 基础框架学习 整个功能包整个功能包集合以move_base为核心&#xff0c;将里程计信息、传感器信息、定位信息、地图以及目标点输入给move_base&#xff0c;move_base经过规划后会输出速度指令 move_base包括三个关键部分…