【Kafka】概述与集群部署

news2025/1/10 20:35:50

文章目录

    • Kafka概述
      • 定义
      • 应用场景
        • 缓冲/削峰
        • 解耦
        • 异步通信
      • 应用模式
        • 点对点模式
        • 发布/订阅模式
      • 基础架构
    • Kafka集群部署
      • 集群规划
      • 下载解压
      • 修改配置文件
      • 分发安装包
      • hadoop103、hadoop104修改配置文件
      • 配置环境变量
      • 启动集群
        • 先启动Zookeeper集群
        • 然后启动Kafka
      • 关闭集群
    • 集群启停脚本
      • 脚本编写
      • 添加执行权限
      • 启动集群脚本命令
      • 停止集群脚本命令
    • Docker启动Kafka集群
      • docker-compose.yml编写
      • 启动compose
      • 命令行验证
      • Python验证

Kafka概述

定义

kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。

Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处理数据)

应用场景

缓冲/削峰

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

异步通信

允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

应用模式

点对点模式

消费者主动拉去数据,消息收到后清除消息

发布/订阅模式

• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据

基础架构

(1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
(7)Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

Kafka集群部署

集群规划

hadoop102hadoop103hadoop104
zkzkzk
kafkakafkakafka

下载解压

官网地址;https://kafka.apache.org/downloads

# 下载

cd /opt/module

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

# 解压

tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/

# 改名

mv kafka_2.12-3.4.0/ kafka

修改配置文件

路径:config/server.properties

  • broker.id=0
  • log.dirs=/opt/module/kafka/datas
  • zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发安装包

这一步的作用是让另外几台服务器也都有 kafka/ 这套文件

  • 命令:xsync kafka/
  • 下面是xsync.sh脚本的内容
#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

hadoop103、hadoop104修改配置文件

# broker.id不得重复,整个集群中唯一
# hadoop103对应的
broker.id=1

# hadoop104对应的
broker.id=2

配置环境变量

路径:vim /etc/profile.d/my_env.sh

注意:集群内的机子都需要配一遍

# KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量

source /etc/profile

启动集群

先启动Zookeeper集群

kafka/zk.sh start


vim kafka/zk.sh
#!/bin/bash

case $1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
}
;;
"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 停止 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
}
;;
"status"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo  ------------- zookeeper $i 状态 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
}
;;
esac

然后启动Kafka

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties

关闭集群

bin/kafka-server-stop.sh

集群启停脚本

脚本编写

vim kf.sh

#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

添加执行权限

chmod +x kf.sh

启动集群脚本命令

kf.sh start

停止集群脚本命令

kf.sh stop

注意: 停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

Docker启动Kafka集群

docker-compose.yml编写

version: '3'
services:
  li-zookeeper:
    image: wurstmeister/zookeeper:latest
    ports:
      - "7181:2181"
    networks:
      - li-kafka-net

  li-kafka-1:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7091
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7091:9092"
    networks:
      - li-kafka-net

  li-kafka-2:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7092
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7092:9092"
    networks:
      - li-kafka-net

  li-kafka-3:
    image: wurstmeister/kafka
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7093
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
    ports:
      - "7093:9092"
    networks:
      - li-kafka-net

  li-kafka-map:
    image: dushixiang/kafka-map:latest
    environment:
      KAFKA_MAP_KAFKA_SERVERS: li-kafka-1:9092,li-kafka-2:9092,li-kafka-3:9092
      KAFKA_MAP_USERNAME: admin
      KAFKA_MAP_PASSWORD: admin
    ports:
      - "8080:8080"
    networks:
      - li-kafka-net

networks:
  li-kafka-net:
    driver: bridge

启动compose

docker-compose up -d

命令行验证

docker exec -it kafka-server bash
cd /opt/bitnami/kafka

列出所有topics
kafka-topics.sh --bootstrap-server 139.196.169.148:7091 --list

创建一个topic
kafka-topics.sh --bootstrap-server 139.196.169.148:7092 --create --partitions 1 --replication-factor 3 --topic first

生产者
kafka-console-producer.sh --bootstrap-server 139.196.169.148:7091  --topic first 

消费者
kafka-console-consumer.sh --bootstrap-server 139.196.169.148:7092 --from-beginning --topic first

Python验证

  • 安装包

    pip install kafka-python
    
  • 生产者

    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    # Kafka 集群地址
    bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']
    
    # Kafka 主题名称
    topic = 'first'
    
    # 创建 Kafka 生产者
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    
    
    # 发送消息到 Kafka 主题
    def send_message(message):
        try:
            producer.send(topic, message.encode('utf-8'))
            producer.flush()
            print('Message sent successfully:', message)
        except KafkaError as e:
            print('Failed to send message:', e)
    
    
    # 测试发送消息
    send_message('Hello, Kafka!')
    
  • 消费者

    from kafka import KafkaConsumer
    
    # Kafka 集群地址
    bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']
    
    # Kafka 主题名称
    topic = 'first'
    
    # 创建 Kafka 消费者
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        auto_offset_reset='earliest',  # 从最早的消息开始消费
        enable_auto_commit=True,  # 自动提交消费位移
        group_id='my-group')  # 消费者组名称
    
    
    # 从 Kafka 主题消费消息
    def consume_message():
        for message in consumer:
            print('Message received:', message.value.decode('utf-8'))
    
    
    # 测试消费消息
    consume_message()
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/492534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT自学笔记2:使用介绍(函数)

一、 setAttribute(按F1)—>void QWidget::setAttribute(Qt::WidgetAttribute attribute, bool on true) —>Qt::WidgetAttribute attribute(有一个属性) ----> p->setAttribute(Qt::WA_DeleteOnClose); MainWind…

眼见为实,来瞧瞧MySQL中的隐藏列!

在介绍mysql的多版本并发控制MVCC的过程中,我们提到过mysql中存在一些隐藏列,例如行标识、事务ID、回滚指针等,不知道大家是否和我一样好奇过,要怎样才能实际地看到这些隐藏列的值呢? 本文我们就来重点讨论一下诸多隐…

ThingsBoard集群部署之k8s

1、概述 今天终于有时间去搞这个啦,拖了很久了,一直没时间,因为我本地没有那么多机器资源,开虚拟机不够,如果租用阿里云服务器,需要有充值的时间,因为这个费用是按小时付费,需要有连贯的时间来搞才行,今天恰好有时间,就开始搞了,弄成功搞出来了,特地写博客记录下来…

Linux基本指令【Linux操作系统】

本文将开启Linux操作系统学习新篇章,欢迎与博主一起交流学习。 目录 一、ls指令 二、pwd指令 三、mkdir与rm指令 四、cd指令 五、touch指令 六、man指令(重要) 七、cp指令(重要) 八、mv指令(重要&…

keepalived如何手动切换主备

概述 主备部署中使用keepalived可以很方便的实现,安装维护简单,功能稳定。 最近在使用过程中有小的发现,记录一下。 环境 CentOS Linux release 7.9.2009 (Core) keepalived.x86_64 1.3.5-19.el7 安装配置 centos7自带的keepalived版本…

提升自我数据分析能力的根本,是方法论!

很多人问,我是财务,能转行业做BI吗?我该学些什么?该掌握哪些技能?该如何学习?我是学生,在校期间专业不扎实,该怎么办?我是小白,还能学会数据分析吗&#xff1…

第二届“强国青年科学家”获奖者均有海外经历

今天是五四青年节,为了弘扬五四精神,知识人网小编本期专门介绍10名第二届“强国青年科学家”获奖者。特别提示:这些科学家均具有海外留学或研究经历。 五四精神的核心内容为“爱国、进步、民主、科学”。 进一步弘扬科学精神,营造…

找不到vcruntime140.dll,无法继续执行代码,解决方法分享

找不到vcruntime140.dll,无法继续执行代码?vcruntime140.dll 是 Visual Studio 2015 运行库的 Dynamic Link Library 文件,许多 Windows 应用程序需要它才能正常运行。当你尝试安装或运行某些应用程序时,有时可能会遇到找不到 vcruntime140.d…

电压放大器的实际应用有哪些方面

电压放大器是一种电子设备,用于增加信号的电压,使得信号具备更大的电压和功率去驱动负载,或者是更容易被检测、传输和处理。电压放大器的基本原理是将输入信号增加一个固定的电压值,以使得输出信号的幅值与输入信号的幅值相同或更…

怎样才能学好数据分析?

俗话说,先入行再求发展,好比你想彩票中奖,得先有买这个过程才行。想成为优秀数据分析也是如此,先掌握好这门技能入行后,再谈发展和深研。很多人都想拥有高薪双休又不加班的工作,比如数据分析,但…

在制造业的工业2.0中应用MOM系统

介绍 什么是制造运营管理 (MOM) 系统和 IT 架构的最佳实践? 行业专家对制造类型和供应网络有何建议? 管理思维和企业文化是否因不断变化的全球市场而过时? MOM 技术是否过于昂贵,IT 架构是否无法快速适应市场变化?…

波奇学c++:类和对象:类,构造函数,析构函数和拷贝构造函数

面向对象和面向过程 面向对象:划分事务参与的对象,关注对象的交互,现实关系更真实的模拟现实 面向对象三大特性:封装,继承,多态 封装:私有,公有,为了更好的管理 c语言…

第 7 章 与 Hive 的集成--以及最后的HBase回顾

7.1 使用场景 如果大量的数据已经存放在 HBase 上面,需要对已经存在的数据进行数据分析处理,那 么 Phoenix 并不适合做特别复杂的 SQL 处理,此时可以使用 hive 映射 HBase 的表格,之后 写 HQL 进行分析处理。 插入一条&#xff…

鸿蒙Hi3861学习五-Huawei LiteOS(任务管理)

一、任务简介 关于任务的相关介绍,之前文章有比较详细的介绍,这里不做过多解释,可以参考如下文章:FreeRTOS学习二(任务)_t_guest的博客-CSDN博客 而LiteOS的主要特性可以总结为如下几点: LiteO…

一个文章学会使用Git

GIT版本控制系统 版本控制系统 : ​ 1.记录历史版本信息 (记录每一次修改的记录) ​ 2.方便团队相互之间协作开发 ​ … 常用的版本控制系统 cvs / svn : 集中式版本控制系统git : 分布式版本控制系统 svn git GIT工作原理 工作区 : 我们能看到的,并且用来写代码的…

nodejs的安装以及Dos的命令

1.0 nodeJS nodejs是基于谷歌v8引擎的执行环境,他没有BOM、DOM nodeJS安装 找官网 ->下载 -> 傻瓜式下一步 -> win键 r -> 输入cmd 进入dos操作命令 -> node -v 查看版本 1.1 DOS 命令【掌握】 进入指定文件夹 cd 文件目录 退出到上一层 cd .…

112.【Vue-细刷-03】

Vue-03 (二十)、过渡和动画1.过渡案列_原生实现2.过渡案列_Vue实现3.动画案列_Vue实现 (1)4.动画案列_Vue实现(2)5.Vue实现时间格式化6.Vue实现过滤器7.Vue常用内置指令 (二十一)、Vue的自定义指令1.自定义非内嵌指令(不保留h2中的原有text)2.自定义非内…

在线教育机构视频加密防下载和防盗用的方法有哪些可以借鉴

阿酷TONY / 原创 / 2023-5-5 / 长沙 在线教育机构防止视频被盗用和视频被下载,可以采取以下措施,一共10条,总有一条适用于您吧,收藏一下吧~~~~~ 1.VRM分片错序视频加密 2.Html5全链路视频加密 3.用户ID跑马灯 4.数字化动态水印 …

【科普帖】晶振 OCXO、VCXO、TCXO、VC-TCXO、DCXO、SPXO区别

一、前言 晶体振荡器用作频率基准,以生成非常稳定的频率源。它用于许多应用中,如频率合成器,本地振荡器,并在调制解调器和其他电路中提供稳定的时钟。 晶体输出频率漂移主要受温度、电源电压和老化的影响。其中温度变化是最为重要…

PS VR创始成员:瑕不掩瑜,PS VR2是跨世代的飞跃

今年2月,索尼次世代VR头显PS VR2正式发售,这款立项近7年的产品受到了游戏玩家和从业者广泛关注,市面上也有很多种不同的测评报告。PS VR项目创始成员、前索尼沉浸式体验专家、高级VR游戏设计师Jed Ashforth也发表了自己对于该头显的一些看法&…