案例:ZooKeeper + Kafka消息队列集群部署

news2024/12/24 10:55:01

目录

消息队列

概念

使用场景

不适宜

适宜

消息队列的特征

存储

异步

异步的优点

同步

为什么需要消息队列

解耦

作用

冗余

扩展性

灵活性

峰值处理能力

可恢复性

顺序保证

Kafka

概念

Kafka技术名词

(1)Broker

(2)Topic

(3)Producer

(4)Consumer

(5)Partition

(6)Consumer Group

(7)Message

ZooKeeper

概念

工作原理

Master启动

Master故障

Master恢复

角色

Server

Leader

Follower

Observer

Client

ZooKeeper在Kafka中的作用

部署Kafka集群

初步设置

安装ZooKeeper

安装Kafka

测试

创建主题

生产消费测试


消息队列

概念

  • 消息(Message)是指在应用间传送的数据
  • 消息队列(Message Queue)是一种应用间的通信方式,确保消息的可靠传递
  • 临时存放数据的空间(缓存)
  • 消息队列是中间件的一种

比如服务端产生的数据比较多,客户端无法及时处理,这时就需要一个队列系统,给消息排队,每接收一个消息,就放到队列中,等待应用程序提取该数据,应用程序提取数据之后,队列就释放该数据


使用场景

不适宜

如果有一个程序A产生的数据要交给程序B,而程序A产生的数据量和程序B能够承载数据量是一样的,就没有必要加消息队列这个中间件

如果此时加了中间件,就打破了原有的架构,本来A和B可以直接通信,结果不仅增加了数据传输的延迟,还增加了开发者和运维者对整体架构的复杂度

或者两个程序要求数据同步性较高也不适合加消息队列

适宜

如果程序A产生的数据量远远高于程序B承载的数据量,那么这时就可以增加一个消息队列的中间件


消息队列的特征

存储

将消息存储在某种类型的缓冲区中(内存),直到目标进程读取这些消息或将其从消息队列中显式移除为止

异步

程序A和消息队列连接程序B也和消息队列连接,此时A和B就是异步连接(发送和接收消息不同步),程序A(发布者)只负责发送数据到队列中不在乎谁来用,程序B(使用者)只负责获取数据而不在乎谁发送的

消息发布者只管把消息发布到MQ中而不管谁来用,消息使用者只管从MQ中取消息而不管是谁发布。

异步的优点
  • 如果程序B处理数据比较慢,可以通过中间件暂存这些数据,慢慢处理

同步

同步就是:程序A发送数据,程序B接收数据,建立连接以后,数据的发送与接收是同步的(请求和应答)


为什么需要消息队列

  1. 解耦
  2. 冗余
  3. 扩展性
  4. 灵活性
  5. 峰值处理能力
  6. 可恢复性
  7. 顺序保证
  8. 异步通信

解耦

解耦:解耦合

耦合是指两个程序结合的非常紧密,比如程序A出故障了,那么程序B就不能正常运行,而程序B出故障了程序A就不能正常运行

有一方发生故障,那么两个程序的运行都会受到影响。

作用

在上面描述的环境中,如果有了消息队列以后,就可以把AB两个程序解耦,程序A和消息队列通信,程序B也和消息队列通信,发送方只管发送数据,使用方只管使用数据


冗余

这里的冗余是指可以给消息队列做备份,或分布式存储


扩展性

类似于前面讲的Redis集群可以扩展节点。

消息队列也可以实现良好的扩展,增大消息入队和处理频率


灵活性

比如对于程序A的开发者,更加灵活,不需要知道程序B的运作原理,只需要把封装一个数据报文,按照队列的协议接口,发给消息队列就行了


峰值处理能力

当一个程序B无法及时处理突发的大量访问时,此时就可以使用消息队列,使关键组件顶住突发的访问压力,而不会因为突发的超负荷请求而崩溃


可恢复性

即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理


顺序保证

写入数据和取出数据时都是要按顺序的,当接收者每接收一次数据,那么队列的数据就会往前移一位(偏移量+1)



Kafka

Kafka是众多消息队列的其中一个

概念

  • Kafka是一种高吞吐量(高并发)的分布式发布 / 订阅消息系统
  • Kafka是Apache组织下的一个开源系统
  • 可以实时的处理大量数据以满足各种需求场景

Kafka技术名词

(1)Broker

  • Kafka集群包含一个或多个服务器,每个服务器被称为Broker(经纪人)。
  • 安装有Kafka的服务器,就可以称为Broker(经纪人)。

(2)Topic

  • 每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
  • Kafka消息队列中可以存放各式各样的消息,把消息做分类,不同类别的消息,属于不同的Topic(主题)。

(3)Producer

  • 消息的生产者,负责发布消息到Kafka broker。
  • 负责产生消息的应用程序

(4)Consumer

  • 消息的消费者,从Kafka broker拉取数据,并消费这些已发布的消息。
  • 负责接收消息的应用程序。

(5)Partition

  • Partition(分区)是物理上的概念,每个Topic包含一个或多个Partition,每个Partition都是一个有序的队列。
  • 同一类的消息隶属于同一个主题(Topic),Partition为每个主题的消息创建存储区域(分区),每个主题都要由内存提供存储空间,为该主题的消息提供空间,这个空间就是Partition
  • 同时,Partition中的每一条消息都会被分配一个有序的id
  • 每一个服务器可以有多个Topic,每个Topic可以有多个partition

(6)Consumer Group

  • 消费者组,可以给每个Consumer指定消费者组,若不指定消费者组,则属于默认的Group
  • 比如接收消息的程序不止一个服务器,构建成一个组,让一些消费者属于该组,便于管理

(7)Message

  • 消息,通信的基本单位,每个Producer可以向一个Topic发布一些消息
  • 接收到的每一条消息,每一个生产者会向消息队列发送存储消息的请求,放置到对应的主题中


ZooKeeper

概念

ZooKeeper是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的现象。

协调消息的生产者把消息放到哪个Kafka,协调消息的消费者从哪个Kafka提取数据,是整个Kafka环境中的协调者


工作原理

在Kafka集群中,安装Kafka的服务器也要安装ZooKeeper

Master启动

各节点向ZooKeeper中注册节点信息,ZooKeeper会为其分配一个id,以编号最小算法选举出一个主节点,另外的设备就是备用节点,这个Master就是对外服务的消息队列,其他应用程序连接的就是这个Master

哪个节点先注册,id就最小,就越可能成为Master

Master故障

如果主节点A发生故障,这时它在ZooKeeper所注册的节点信息会被自动删除,ZooKeeper会再次以编号最小算法选举出新的Master

Master恢复

如果故障的主节点恢复了,它会再次向ZooKeeper注册自身的节点信息,但注册的节点信息编号会变小,因此不会成为Master,而是另一台节点继续担任Master


角色

ZooKeeper集群主要角色有Server和Client

其中Server又分为3个角色

Server

  • 服务器端角色
Leader
  • 领导者角色,主要负责发起投票和决议,以及更新系统状态。
Follower
  • 跟随者角色,用于接受客户端请求并返回结果给客户端,在选举过程中参与投票
Observer
  • 观察者角色,用户接受客户端请求,并将请求转发给Leader,同时同步Leader的状态,但是不参与投票。Observer的目的是扩展系统,提高伸缩性(增加节点、减少节点)

Client

  • 客户端角色,用于向ZooKeeper发起请求

ZooKeeper在Kafka中的作用

  1. Broker注册
  2. Topic注册
  3. 生产者负载均衡(把消息分布存储在不同的节点)
  4. 消费者负载均衡(消费者可以选择其中任意一个节点去连接)
  5. 记录消息分区与消费者的关系
  6. 消息消费进度和Offset(偏移量)记录
  7. 消费者注册

部署Kafka集群

本案例的目的是构建一个Kafka消息队列的集群,实现了负载均衡、高可用(数据冗余)、故障转移

初步设置

开启3台虚拟机并连接上XShell

为每一台kafka服务器修改主机名(Kafka1,Kafka2,Kafka3)

[root@localhost ~]# hostnamectl set-hostname kafka1
[root@localhost ~]# bash

开启会话同步

为了方便实验关闭防火墙和内核安全机制

然后在hosts文件末尾,追加3台服务器的ip+主机名,方便通信

[root@kafka1 ~]# systemctl stop firewalld
[root@kafka1 ~]# setenforce 0
[root@kafka1 ~]# vim /etc/hosts
192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3

安装ZooKeeper

安装java环境,导入软件包

[root@kafka1 ~]# yum -y install java

解压ZooKeeper软件包,因为是解压即用的程序,所以直接移动到/etc下并重命名

[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper

进入ZooKeeper目录,创建出一会在配置文件中指定的数据目录,然后进入存放配置文件的目录,通过ZooKeeper提供的配置文件拷贝出一份自己使用的文件

[root@kafka1 ~]# cd /etc/zookeeper
[root@kafka1 zookeeper]# mkdir zookeeper-data
[root@kafka1 zookeeper]# cd conf/
[root@kafka1 conf]# cp zoo_sample.cfg zoo.cfg

编辑ZooKeeper配置文件,指定存储数据的目录,然后在14行下方配置ZooKeeper集群中的服务器

[root@kafka1 conf]# vim zoo.cfg
# 指定ZooKeeper存储数据的目录。这个目录用于保存ZooKeeper的持久数据和快照文件
dataDir=/etc/zookeeper/zookeeper-data  # 第12行
server.1=192.168.10.101:2888:3888
server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888
# 保存并退出

  • 2181:对cline端提供服务
  • 2888:集群内部通信的端口
  • 3888:选举Leader的端口

关闭会话同步

在3台服务器上分别执行下方操作

在ZooKeeper的集群中,需要为每台服务器生成一个唯一的id

[root@kafka1 conf]# cd /etc/zookeeper/zookeeper-data/

不同的是每台服务器的id不能相同,这里我们把3台服务器的id分别设置为1、2、3

第一台服务器
[root@kafka1 zookeeper-data]# echo "1" > myid
第二台服务器
[root@kafka1 zookeeper-data]# echo "2" > myid
第三台服务器
[root@kafka1 zookeeper-data]# echo "3" > myid

开启会话同步

分配完id后,开启会话同步

执行bin目录下的zkServer.sh脚本,追加start参数,启动ZooKeeper服务,启动服务后显示STARTED表示正常启动

还是执行bin目录下的akServer.sh脚本,追加status参数,输出ZooKeeper的状态

  • Mode: follower: 该节点当前处于follower模式,说明它不是领导者(leader),而是从节点,正在跟随集群中的领导者。

此时这3台主机就一台的mode显示是:Mode: leader

[root@kafka1 zookeeper-data]# cd ..
[root@kafka1 zookeeper]# bin/zkServer.sh start
Starting zookeeper ... STARTED
[root@kafka1 zookeeper]# bin/zkServer.sh status
Mode: follower

安装Kafka

还是解压Kafka软件包,然后将解压目录移动到/etc下再重命名直接使用,cd进入Kafka的配置文件目录

[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz
[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka
[root@kafka1 ~]# cd /etc/kafka/config

暂时关闭会话同步,在3台服务器上分别修改对应的id

打开Kafka的配置文件,先在第21行修改每台服务器对应的id

[root@kafka1 config]# vim server.properties
第一台服务器
broker.id=1
第二台服务器
broker.id=2
第三台服务器
broker.id=3

然后在第31行,分别为三台服务器修改对应的IP

第一台服务器
listeners=PLAINTEXT://192.168.10.101:9092
第二台服务器
listeners=PLAINTEXT://192.168.10.102:9092
第三台服务器
listeners=PLAINTEXT://192.168.10.103:9092

再开启会话同步

然后在第60行指定Kafka存储数据的目录,再在第123行指定Kafka连接到Zookeeper集群的地址。

Kafka使用Zookeeper来进行集群协调和管理,所以需要提供Zookeeper服务器的IP地址和端口。

log.dirs=/etc/kafka/kafka-logs  # 第60行
zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218  # 第123行

保存并退出,在指定的位置创建出刚才在配置文件中指定存储数据的目录

[root@kafka1 config]# cd ..
[root@kafka1 kafka]# mkdir kafka-logs
[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties &

测试

关闭会话同步

创建主题

执行kafka-topics.sh脚本的--create选项创建一个主题,然后再使用--list选项查看该主题是否能被查到

  • --create:指定创建一个新的主题。
  • --zookeeper kafka1:2181:指定 ZooKeeper 服务器的位置。注意,Kafka 需要连接 ZooKeeper 来管理集群元数据。
  • --replication-factor 1:因为是测试环境,所以设置主题的副本树为 1,意味着只有一个副本(没有冗余)。在生产环境中,建议使用大于 1 的副本树以提高容错能力。
  • --partitions 1:设置主题的分区数为 1。分区决定了消息的并行处理能力和存储。
[root@kafka1 kafka]# cd bin/
[root@kafka1 bin]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
[root@kafka1 bin]# ./kafka-topics.sh --list --zookeeper kafka1:2181
test

生产消费测试

选一个作为消费的服务器,执行kafka-console-consumer.sh脚本,指定一个或多个Kafka代理的地址,再指定要读取消息的主题,等待接收消息

在结尾还有一个选项:

  • 不加 --from-beginning:只输出实时数据
    • 加 --from-beginning:输出从主题创建以来的所有数据
[root@kafka3 bin]# ./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test

然后再选一个作为发布消息的服务器,执行kafka-console-producer.sh脚本,再指定Kafka代理列表,每个代理地址通常包含主机名(或 IP 地址)和端口号,用逗号分隔

[root@kafka1 bin]# ./kafka-console-producer.sh --broker-list kafka1:9092 -topic test
>123
>222
>333

然后再来到座位消费的服务器,就会发现输入的消息都会在消费服务器延迟显示(异步)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Orangepi全志H616智能视觉垃圾分类系统

目录 一、功能需求 二、Python的安装和环境搭建 三、Python基础 3.1 Python的特点: 3.2 Python的基础学习: 3.3 字典的多层嵌套: 四、C语言调用Python 4.1 搭建编译环境: 4.2 C语言执行Python语句: 4.3 C语言…

22 注意力机制—Transformer

目录 TransformerTransformer 架构对比 seq2seq多头注意力(Multi-head attention)带掩码的多头注意力(Masked Multi-head attention)基于位置的前馈网络(Positionwise FFN)残差连接和归一化(Add & norm)(加 & 规范化)1、加入归一化能够更好地训练比较深的网络…

UE基础 —— 项目与模板

虚幻引擎 项目 包含游戏和应用程序的所有内容,并将所有内容联系在一起;包含磁盘上的许多文件夹和资产,如蓝图、材质、3D资产、动画等;内容浏览器与磁盘上的文件夹和文件夹结构相同; 每个项目都有与之关联的.uproject文…

性能优化理论篇 | 彻底弄懂系统平均负载

Linux 上的进程状态 要讨论系统平均负载,首先要了解Linux 上的进程状态。 标志名称内核名称及解释R运行中或可运行TASK_RUNNING。进程正在执行或等待执行。可以在用户空间(用户代码)或内核空间(内核代码)中运行。S可…

【项目】基于Vue3.2+ElementUI Plus+Vite 通用后台管理系统

构建项目 环境配置 全局安装vue脚手架 npm install -g vue/cli-init打开脚手架图形化界面 vue ui创建项目 在图形化界面创建项目根据要求填写项目相关信息选择手动配置勾选配置项目选择配置项目然后我们就搭建完成啦🥳,构建可能需要一点时间&#xff0…

Navicat Premium Lite For Linux,一款免费的专业可视化 SQL 数据库设计工具,支持各种数据库并行连接,在业界可是大名鼎鼎!

Navicat Premium Lite For Linux,一款免费的专业可视化 SQL 数据库设计工具,支持各种数据库并行连接,在业界可是大名鼎鼎! Navicat 是一个可视化数据库、数据表设计软件,支持MySQL、MariaDB、SQLite、MongoDB、Redshi…

论文阅读笔记:ST-MetaNet-1

目录 前言 摘要 CCS 关键词 介绍 时空相关性的复杂组合 空间相关性 时间相关性 时空相关性的多样性 本篇博客结语 前言 读这篇论文边读边学,每天坚持发博客,看到哪学到哪,这系列文章既有翻译,又有深度详细解释&#xff…

Rust学习笔记1--下载安装和使用

一、下载和安装: 官网:https://www.rust-lang.org/ 直接下载即可,windows:按照教程执行步骤。 二、使用: 2.1 在vscode中安装rust 2.2 编译与运行rust文件: 后缀名rs: 编译: …

org.springframework.boot.autoconfigure.AutoConfiguration.imports 配置没有生效

在spring3.x以后,自动配置需要配置在org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件中 如果你配置了却没生效,有可能是创建的目录不对,正常情况下, META-INF.spring 是一个两层目录,如果是从别的地方复制…

第51集《大佛顶首楞严经》

请大家打开讲义第 111 页。癸三,结责迷情。 当我们在修学首楞严王三昧的时候,要把握两个很重要的原则:第一个就是它修学的方法,第二个就是它修学的目标。 那么,首楞严王的修学方法是什么呢(这一点蕅益大师…

零基础读懂 DDPM 数学推导

零基础读懂 DDPM 数学推导 完整PDF文件可以在工坊获得,以下是内容截图。

为何显示keyerror fruit,如何解决??

🏆本文收录于《CSDN问答解惑-专业版》专栏,主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收…

【docker综合篇】关于我用docker搭建了6个应用服务的事

最近一直在捣鼓docker,利用测试服务器,本着犯错就重来(重装系统)的大无畏精神,不断尝试,总结经验,然后在网上搜寻一些关于docker有关的服务镜像,并搭建起来。看着一个个服务在我的服务器跑起来,…

【QT】基于UDP/TCP/串口 的Ymodom通讯协议客户端

【QT】基于UDP/TCP/串口的Ymodom通讯协议客户端 前言Ymodom实现QT实现开源库的二次开发-1开源库的二次开发-2 串口方式实现TCP方式实现UDP方式实现补充:文件读取补充:QT 封装成EXE 前言 Qt 运行环境 Desktop_Qt_5_11_2_MSVC2015_64bit ,基于…

PowerShell自动化Windows系统管理任务

​ 大家好,我是程序员小羊! 前言 Windows系统管理涉及许多繁琐的任务,如用户管理、文件操作、系统更新、网络配置等。PowerShell作为Windows的命令行工具和脚本语言,可以极大地简化这些管理任务。本文将探讨如何使用PowerShell自动…

【教学类-75-01】20240817“通义万相图片最大化+透明png”的修图流程

背景需求: 打印了袜子配对的PDF模版,做预测试 【教学类-74-02】彩色袜子配对02--左右配对-CSDN博客文章浏览阅读497次,点赞10次,收藏9次。【教学类-74-02】彩色袜子配对02--左右配对https://blog.csdn.net/reasonsummer/article…

09:链表的介绍

链表 1、算法的定义2、链表 1、算法的定义 通俗的定义:解题的方法与步骤。       狭义的定义:对存储的数据的操作。       广义的定义:无论数据是如何存储的,对数据从操作都是一样的。 到目前为止我们可以通过2种结构来存储…

关于订单最终一致性解决方案

背景 整体的交易架构主要由两部分组成:C端交易平台 - B端交易平台 由于组织架构的特殊性,并没有采用两阶段提交、三阶段提交这种刚性分布式事务的方案。 主要采用了基于TCC思想的TOC柔性事务补偿方案。 柔性事务:遵循BASE原则,…

Redis7.x安装系列教程(四)集群部署原理详解

1、什么是集群部署 Redis集群(cluster)是Redis的一种分布式运行模式,通过分片(sharding)提供数据的自动分区和管理,实现数据的高可用性和可扩展性。 在集群模式下,数据分布在多个Redis节点上,节点分为主节点和从节点。主节点负责…

Pytorch如何判断两个模型的权重是否相同(比较权重差异/参数字典)

参考资料: GPT-4o mini的回答 第一种方法是使用md5sum这个命令(Linux上),但是由于环境的不同,哪怕是load之后转存似乎都会有差,所以效果不大。 第二种方法是使用代码比较,这段代码是我找GPT要的,感觉非常不…