【大数据之Kafka】二、Kafka入门

news2025/1/12 7:48:42

1 安装部署

1.1 集群规划

在这里插入图片描述

1.2 集群部署

官方下载地址:http://kafka.apache.org/downloads.html
(1)解压安装包:

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

(2)修改解压后的文件名称:

mv kafka_2.12-3.0.0/ kafka

(3)进入到/opt/module/kafka/config 目录,修改配置文件server.properties:
修改:
  参数值broker.id=0(broker 的全局唯一编号,不能重复,只能是数字);
  kafka 运行日志(数据)存放的路径log.dirs=/opt/module/kafka/datas;
  配置连接Zookeeper 集群地址zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka。

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量 
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小 
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量 
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时1 个副本 
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除 
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G 
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理) 
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

(4)分发安装包

xsync kafka/

(5)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties中的 broker.id=1、broker.id=2。(broker.id 不得重复,整个集群中唯一。)

(6)在/etc/profile.d/my_env.sh 文件中增加kafka 环境变量配置:

sudo vim /etc/profile.d/my_env.sh

#添加:
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

(7)刷新一下环境变量,并将环境变量分发到其它节点,并source。

source /etc/profile
sudo xsync /etc/profile.d/my_env.sh

(8)启动集群时需要先启动zookeeper集群再启动Kafka,关闭集群时要先保证Kafka都关闭了才能关闭zookeeper。

1.3 集群启停脚本

(1)在/home/用户名/bin 目录下创建文件 kf.sh 脚本文件:

#!/bin/bash
case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka--------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh - daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka--------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

(2)添加执行权限:

chmod 777 kf.sh

(3)启动集群:

zk.sh start
kf.sh start

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止, Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死Kafka 进程了。
在这里插入图片描述

2 Kafka命令行操作

2.1 主题命令行操作

#查看操作主题命令参数
bin/kafka-topics.sh

在这里插入图片描述

#查看当前服务器中所有topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --list

#创建first topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --create --partitions 1 --replication-factor 3 --topic first
#选项说明:
#--topic 定义 topic 名
#--replication-factor 定义副本数
#--partitions 定义分区数

#查看first主题详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --describe --topic first

#修改分区数,分区数只能增不能减
bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --alter --topic first --partitions 3

#删除topic
bin/kafka-topics.sh --bootsrtap-server hadoop102:9092,hadoop103:9093 --delete --topic first

2.2 生产者命令行操作

#查看操作生产者命令参数
bin/kafka-console-producer.sh

在这里插入图片描述

#发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first
>hello

2.3 消费者命令行操作

#查看操作消费者命令参数
bin/kafka-console-consumer.sh

在这里插入图片描述

#消费first主题中的数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

#把主题中所有的数据读取出来(包括历史数据)
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/875955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uboot通过NFS挂载根文件系统卡死解决办法(VFS: Unable to mount root fs via NFs, trying floppy)

uboot通过NFS挂载根文件系统时,加载完内核后,卡死在这里 gpio_dvfs: disabling can-3v3: disabling ALSA device list:#0: wm8960-audio原因: 从Ubuntu17.04开始,nfs默认只支持协议3和协议4,而kernel中默认支持协议2…

posefs1.perception.cs.cmu.edu 无法访问

我尝试练习openpose时,发现运行的代码缺乏coffee的model,需要执行models 下的bat或sh。但是 posefs1.perception.cs.cmu.edu 无法访问。 从Kaggle上下载 https://www.kaggle.com/changethetuneman/openpose-model 在根据它的getModels脚本放置相应的mo…

数据结构笔记--优先队列(大小根堆)经典题型

1--项目的最大利润 题目描述: 输入:正数数组 costs,costs[i] 表示项目 i 的花费;正数数组 profits,profits[i] 表示项目 i 的花费;正数 k 表示只能串行完成最多 k 个项目;m 表示拥有的资金&…

应急响应-钓鱼邮件的处理思路溯源及其反制

0x00 钓鱼邮件的危害 1.窃取用户敏感信息,制作虚假网址,诱导用户输入敏感的账户信息后记录 2.携带病毒木马程序,诱导安装,使电脑中病毒木马等 3.挖矿病毒的传输,勒索病毒的传输等等 0x01 有指纹的钓鱼邮件的溯源处理…

非计算机专业的能当程序员吗?

非计算机专业的能当程序员吗? 😇博主简介:我是一名正在攻读研究生学位的人工智能专业学生,我可以为计算机、人工智能相关本科生和研究生提供排忧解惑的服务。如果您有任何问题或困惑,欢迎随时来交流哦!😄 …

React源码解析18(6)------ 实现useState

摘要 在上一篇文章中,我们已经实现了函数组件。同时可以正常通过render进行渲染。 而通过之前的文章,beginWork和completeWork也已经有了基本的架子。现在我们可以去实现useState了。 实现之前,我们要先修改一下我们的index.js文件&#x…

Redis数据结构——链表list

链表是一种常用的数据结构,提供了顺序访问的方式,而且高效地增删操作。 Redis中广泛使用了链表,例如:列表的底层实现之一就是链表。 在Redis中,链表分为两部分:链表信息 链表节点。 链表节点用来表示链表…

Leetcode-每日一题【剑指 Offer 30. 包含min函数的栈】

题目 定义栈的数据结构,请在该类型中实现一个能够得到栈的最小元素的 min 函数在该栈中,调用 min、push 及 pop 的时间复杂度都是 O(1)。 示例: MinStack minStack new MinStack(); minStack.push(-2); minStack.push(0); minStack.push(-3); minStack…

1+X Web前端开发职业技能等级证书建设方案

一 、系统概述 1X Web前端开发技术是计算机类专业重要的核心课程,课程所包含的教学内容多,实践性强,并且相关技术更新快。传统的课堂讲授模式以教师为中心,学生被动式接收,难以调动学生学习的积极性和主动性。混合式教…

C语言指针笔试真题整理(8道)

前言 本篇文章整理了一些指针的笔试题,适合初学者以及对于指针掌握并不是很牢固的朋友阅读,当然,大佬想做着玩的话可以看一看第八题~ 分类:循序渐进的难度:前三题和第七题是简单题,第四题有陷阱&#xff…

什么是Web应用程序防火墙,WAF与其他网络安全工具差异在哪?

一、什么是Web 应用程序防火墙 (WAF) ? WAF软件产品被广泛应用于保护Web应用程序和网站免受威胁或攻击,它通过监控用户、应用程序和其他互联网来源之间的流量,有效防御跨站点伪造、跨站点脚本(XSS攻击)、SQL注入、DDo…

最新版本 Stable Diffusion 开源 AI 绘画工具之 VAE 篇

✨ 目录 🎈 什么是VAE🎈 开启VAE🎈 下载常见的VAE🎈 对比不同VAE生成的效果 🎈 什么是VAE VAE:是 Variational Auto-Encoder 的简称,也就是变分自动编码器可以把它理解成给图片加滤镜&#xff…

ChatGPT or BingChat

你相信我们对大模型也存在「迷信权威」吗? ChatGPT 的 GPT-4 名声在外,我们就不自觉地更相信它,优先使用它。但我用 ChatALL 比较 AI 大模型们这么久,得到的结论是: ChatGPT GPT-4 在大多数情况下确实是最强&#xf…

【elementUi】绘制自定义表格、绘制曲线表格

要求绘制下图系列表格: 实现步骤: 1.绘制树,实现树勾选字段—>表格绘制字段 逻辑: 树:check-change“treeChart.handleCheckChange” 绑定点击选择事件,改变data.column3数据项;表格:columns"data…

Unity智慧园区夜景制作

近期使用Unity做了一个智慧园区场景的demo,初步了解了3D开发的一些步骤和知识,以下为制作的步骤,比较简略,备忘: 1. 制作前的设计分析: 1. 分析日光角度,阴影长度,效果 2. 分析冷暖…

Idea 快捷键整理

Idea快捷键和自动代码补全汇总 idea快捷键汇总 Ctrl 快捷键说明Ctrl F在当前文件进行文本查找 (必备)Ctrl R在当前文件进行文本替换 (必备)Ctrl Z撤销 (必备)Ctrl Y删除光标所在行 或 删除选中的行 &am…

双向最佳路径优先搜索算法

概念 双向最佳优先搜索(Bidirectional Best-First Search)是一种图搜索算法,用于在给定的图或树中找到两个节点之间的最短路径。该算法尝试从起始节点和目标节点同时扩展搜索,直到两个搜索方向相遇。 双向最佳优先搜索的步骤如下…

Rx.NET in Action 第三章学习笔记

3 C#函数式编程思想 本章内容包括 将 C# 与函数式技术相结合使用委托和 lambda 表达式使用 LINQ 查询集合 面向对象编程为程序开发提供了巨大的生产力。它将复杂的系统分解为类,使项目更易于管理,而对象则是一个个孤岛,你可以集中精力分别处理…

table 根据窗口缩放,自适应

element-plus中,直接应用在页面样式上, ::v-deep .el-table{width: 100%; } ::v-deep .el-table__header-wrapper table,::v-deep .el-table__body-wrapper table{width: 100% !important; } ::v-deep .el-table__body,::v-deep .el-table__footer,::v-d…

试卷转电子版怎样处理?分享个好用的扫描转换方法

试卷转电子版是一个常见的需求,可以通过扫描纸质试卷来实现。但是,扫描后的文件可能会有一些问题,例如模糊、颜色失真、文字识别错误等。在这篇文章中,我将分享一个好用的扫描转换方法,可以帮助您快速而准确地将试卷转…