Kafka第一课概述与安装

news2024/12/22 20:45:00

生产经验 面试重点
Broker面试重点
代码,开发重点
67 章了解

如何记录行为数据

1. Kafka概述

1.产生原因

前端 传到日志 日志传到Flume 传到HADOOP
但是如果数据特比大,HADOOP就承受不住了

2.Kafka解决问题

控流消峰
Flume传给Kafka
存到Kafka
Hadoop 从Kafka取数据 ,而不是Kafka强行发
类似 菜鸟驿站, 先存取来,我们主动去取,或者指定他去送

存到HDFS的,一定不是实时数据,因为HDFS太慢了
在这里插入图片描述

3.应用场景

1. 缓冲/消峰

消息队列存储数据,而不是直接发给处理系统,处理完一部分,再取,再处理
在这里插入图片描述

2.解耦

通过中间件接口,适配不同数据源和目的地

在这里插入图片描述

3.异步通信

允许用户将消息放入队列,但不立即处理,然后再需要的时候处理。
为什么异步处理快: 同步需要等待
点餐:
同步:服务员过来给我点餐 ,这里需要服务员过来
异步:扫桌子码自己点餐

4.消息队列模式

在这里插入图片描述
在这里插入图片描述
Kafka使用发布订阅模式
数据会保存一段时间

5.基础架构

生产者 - Broker - Group
TopicA是什么?
这里的分区是什么?
分区: 物理分割
为什么要分割:结合集群分散存储
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1. 分区操作

topic 是一整个数据 ,分区是为了将数据分在不同的Broker上。类似于HDFS,
Broker是物理存储
Partition 类似于DN

2.消费者组的概念

类似权限管理把,组内并行消费,便于管理

  • Producer生产者: 向Kafka broker发消息的客户端(自主)
  • Consumer消费者: 从Kafka取消息的客户端(自主)
  • Group 组: 消费者组。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    这个说明,分区不支持并行读取,一个分区只能同时一个组内消费者消费。
    组内消费者对不同分区进行读取,是为了优化读取速率.
  • Broker 一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
  • Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。

3.副本

备份功能,备份partition 多个副本, 类似HDFS,类似DN(存入一下子3个)把
多副本时,一个副本叫leader 另一个副本叫follower,
也是选出来的角色 交互时只和leader交互
follower平时只有备份作用,但是当leader倒下时,他直接成为leader
在这里插入图片描述
这里是存储数据的目录,而不是存Kafka自己日志的目录
在这里插入图片描述
高可用, 配置多个
在这里插入图片描述
replicas 是存储副本的位置
lsr 是目前存活的副本

分区数只能改大,不能改小
副本数修改,
通过JSON手动修改

消费者按最新的offect进行消费

5.配置

1.解压

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.3.1.tgz -C /opt/module/
[atguigu@hadoop102 module]$ mv kafka_2.12-3.3.1/ kafka

2.配置文件

配置文件目前只需要修改三个
broker编号 不同机器只需要编号不同即可
log.dir 数据存放位置
zookeeper.connect 连接集群的地址

[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties
# 修改1 broker的全局唯一编号,不能重复,只能是数字。 
broker.id=0

#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#修改2 这里其实是是存放到Kafka的数据的地方 kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#修改3  连接集群的位置 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

3. 环境变量

sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

这个是为了启动的时候不需要输入一长串地址
比如:bin/kafka-server-start.sh -daemon config/server.properties
这里的config是kafka的路径 启动需要输入全路径

bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

修改后

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

4.集群脚本

#! /bin/bash
if [ $# -lt 1 ]
then 
 echo "参数错误,请输入start或者stop"
 exit
fi
case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104 
    do 
	echo "---------------启动 $i Kafka ----------------------"
	ssh  $i  "$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do 
	echo "---------------停止 $i Kafka ---------------------"
	ssh $i  "$KAFKA_HOME/bin/kafka-server-stop.sh -daemon $KAFKA_HOME/config/server.properties"
	done
};;
esac

2.命令

1.主题命令

1. --bootstrap-server <String: server toconnect to>

连接Broker 操作Kafka必须有这个命令
既可以输入一个,也可以输入多个

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 

2.主题的创建和删除

– create +空格 ±-topic+空格+主题名
– delete +空格 ±-topic+空格+主题名
主题 主题名 一般放最后

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first

在这里插入图片描述

–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
在这里插入图片描述

3.查看所有主题

–list

4.查看主题详细描述

可以看单个主题,可以看全部主题
不加后缀默认查看全部
查看单个需要+空格 ±-topic+空格+主题名
在这里插入图片描述
在这里插入图片描述

5.修改–alter

设置分区数
–partitions <Integer: # of partitions>
在这里插入图片描述
分区只能调大,不能调小

设置分区副本
–replication-factor<Integer: replication factor>

// 手动调整kafka topic分区的副本数 

{
  // 1. 版本号 这个是自定义的版本号
  "version":1,
  // 2. 分区是重点,因为副本改变分区也要改变。
  //   其实就是将分区的副本重新进行布局
  "partitions":
  [
  {"topic":"first","partition":0,"replicas":[1,2,0]},
  {"topic":"first","partition":1,"replicas":[2,0,1]},
  {"topic":"first","partition":2,"replicas":[2,0,1]}
  ] 
}
// 运行命令
//kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file ./rep.json --execute

在这里插入图片描述

更新系统默认的配置。
–config <String: name=value>
临时调配参数

2.生产者命令

1.操作

--topic <String: topic>
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

在这里插入图片描述

3.消费者

kafka-console-consumer.sh --bootstrap-server +集群+ 主题

  • –bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
  • –topic <String: topic> 操作的topic名称。
  • –from-beginning 从头开始消费。
  • –group <String: consumer group id> 指定消费者组名称。
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

为什么消费者未开启时,生产者发送的消息,等消费者起来了收不到?
没有指定消费者组时,每次开启,消费者属于的消费者组就是随机的,那么就无法进行断点续传
当主动指定组后,再次登录,在指定组后,会自动开启断点续传功能
在这里插入图片描述
在这里插入图片描述
想要提前的顺序,就需要,–from-beginning
但是不能和用户组一起跑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/860385.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

geoserver编辑样式 【开发工具QGis的初次使用】

geoserver编辑样式 开发工具配置中文语言 geoserver样式的更改 开发工具 链接: geoserver样式style的更改 链接: QGis开发工具的安装及使用 配置中文语言 setting > options > general > 中文 geoserver样式的更改 链接: geoserver样式style的更改 利用QGIs Q…

3DMAX动力学布料模拟插件DynamoCloth使用方法

3DMAX动力学布料模拟DynamoCloth是一个&#xff08;实时&#xff09;GPU加速的3ds Max Cloth动力学插件&#xff0c;与原生Cloth修改器相比&#xff0c;性能提高了10-100倍。 3DMAX动力学布料模拟是实时的&#xff0c;能够实现实时的自然互动&#xff0c;并将创作过程从试错转…

代码随想录算法训练营第十五天| 层序遍历(即广度优先搜索), 226.翻转二叉树,101. 对称二叉树

层序遍历(即广度优先搜索) 需要借用一个队列来实现&#xff0c;队列先进先出&#xff0c;符合一层一层遍历的逻辑&#xff0c;而用栈先进后出适合模拟深度优先遍历也就是递归的逻辑。 思路是先把根节点加入队列&#xff0c;然后在遍历下一层前&#xff0c;先将队列拥有的当前层…

LeetCode150道面试经典题--找出字符串中第一个匹配项的下标(简单)

1.题目 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 2.示例 3.思路 回溯算法&#xff1a;首先将…

【CSS3】CSS3 动画 ⑥ ( 动画属性示例 | 精灵图帧动画效果实现 )

文章目录 一、需求说明二、代码分析1、动画属性2、布局分析3、动画实现 三、完整代码示例 一、需求说明 给定一张精灵图 , 其中有多个 动画帧 对应的图片 , 下图的大小是 1600 x 100 像素 , 截图展示如下 : 实际图片 : 二、代码分析 1、动画属性 使用上图实现 逐帧动画 效果 …

智慧工地云平台源码

智慧工地可以实现对人员管理、施工进度、安全管理、材料管理、设备管理、环境监测等方面的实时监控和管理&#xff0c;提高施工效率和质量&#xff0c;降低安全风险和环境污染。在道路施工中&#xff0c;智慧工地可以实现对道路状况、交通流量、施工进度等方面的实时监控和管理…

Wlan——802.11协议物理层关键技术(OFDM、MIMO、BSS)和CSMA/CD机制的介绍

目录 802.11协议的发展 802.11协议物理层关键技术 信道捆绑技术 OFDM/OFDMA技术 Short-Gi短保护间隔技术 MIMO/MU-MIMO技术 QAM技术 BSS Color快速识别 802.11MAC层关键技术CSMA/CD机制 为什么无线提出了CSMA/CD机制 CSMA/CD的工作机制 CSMA/CD的工作原理 CSMA/CD…

matlab进行mex时出现 error LNK2019: 无法解析的外部符号

解决方法分成三个步骤&#xff1a; 1、直接在simulink模块运行出现错误&#xff0c;找不到该s函数&#xff1b; 2、需要确认安装了编译器。mex -setup 确认安装了编译器&#xff0c;再次mex xxx.c未解决&#xff1b; 3、再次查找资料发现可能编译器不知道具体的位置&#xff0c…

Java训练六

目录 一、除数不能为0 二、校验年龄格式 三、终端循环 四、 计算最大公约数 一、除数不能为0 使用静态变量、静态方法以及throws关键字&#xff0c;实现当两个数相除且除数为0时&#xff0c;程序会捕获并处理抛出的ArithmeticException异常&#xff08;算术异常&#xff09…

电脑合上盖子无线网络不会断开

控制面板\硬件和声音\电源选项\系统设置 最终选择不会采取任何操作 选择不会采取任何操作

学习C语言第三天 :关系操作符、逻辑操作符

1.关系操作符 C语言用于比较的表达式&#xff0c;称为“关系表达式”里面使用的运算符就称(relationalexpression)&#xff0c;为“关系运算符” (relationaloperator) &#xff0c;主要有下面6个。 > 大于运算符 < 小于运算符 > 大于等于运算符 < 小于等…

分布式 - 消息队列Kafka:Kafka生产者架构和配置参数

文章目录 1. kafka 生产者发送消息整体架构2. Kafka 生产者重要参数配置01. acks02. 消息传递时间03. linger.ms04. buffer.memory05. batch.size06. max.in.flight.requests.per.connection07. compression.type08. max.request.size09. receive.buffer.bytes和 send.buffer.b…

如何使用appuploader制作apple证书​

转载&#xff1a;如何使用appuploader制作apple证书​ 如何使用appuploader制作apple证书​ 一.证书管理​ 点击首页的证书管理 二.新建证书​ 点击“添加”&#xff0c;新建一个证书文件 免费账号制作证书只有7天有效期&#xff0c;没有推送消息功能&#xff0c;推送证书…

anaconda 基本指令

1.anaconda创建环境 例如我们创建一个名称为img2word&#xff0c;python版本为3.9的环境 conda create -n img2word python3.9在这个命令中&#xff1a; create 是告诉 Conda 你要创建一个新的环境。-n img2word 是设置新环境的名称为 img2word。python3.9 是告诉 Conda 在这…

PS AI版本安装教程

好久没写博客了&#xff0c;今天更新一下子吧&#xff01; 随着chatGPT的提出&#xff0c;各种软件逐渐开始镶嵌人工智能&#xff0c;为我们的生活带来了极大的便利&#xff01;话不多说&#xff0c;开始介绍今天的主角&#xff0c;PS的AI版本。 安装教程&#xff1a; 1.安装…

Linux学习之sed删除、追加、插入、更改、读写文件、下一行、打印、退出和seq命令

cat /etc/redhat-release看到操作系统是CentOS Linux release 7.6.1810&#xff0c;uname -r看到内核版本是3.10.0-957.el7.x86_64&#xff0c;sed --version可以看到sed版本是4.2.2。 echo a : 1 : good : g >> sed_daicpnrwq.txt echo b : 2 : well : w >> sed…

LC-删除排序链表中的重复元素

LC-删除排序链表中的重复元素 链接&#xff1a;https://leetcode.cn/problems/remove-duplicates-from-sorted-list/description/ 思路&#xff1a;这题其实不难&#xff0c;链表已经排序&#xff0c;我们只要把相邻的两个节点的值进行比较&#xff0c;如果相同&#xff0c;删…

基于SpringBoot的社区团购系统设计【附开题|万字文档(LW)和搭建文档】

主要功能 前台界面&#xff1a; ①首页、商品信息推荐、社区信息、商品信息展示、查看更多等 ②商品信息、名称类型查询、添加购物车、立即购买、积分兑换、点我收藏、赞一下、踩一下、评论等 ③团购信息、社区信息、购物车等 ④个人中心、我的订单、我的地址、我的收藏等 后台…

VBA技术资料MF42:VBA_从Excel中上面的单元格复制公式

【分享成果&#xff0c;随喜正能量】唯有梦想才配让你不安&#xff0c;唯有行动才能解除你的不安.绳锯木断&#xff0c;水滴石穿。也许你现在做的事情很小&#xff0c;只要你能日积月累的坚持下去&#xff0c;才会发现意义非凡。所谓的成功&#xff0c;便是别人失败的时候你还在…

matplotlib FormatStrFormatter设置坐标轴的标注为整数和小数【设置小数点的数目】

利用FormatStrFormatter 进行设置 1 设置为整数 import matplotlib.pyplot as plt from matplotlib.ticker import FormatStrFormatter# 创建一个图表 fig, ax plt.subplots()# 生成一些示例数据 x [1, 2, 3, 4, 5] y [1000, 2000, 3000, 4000, 5000]# 在 x 轴上设置刻度标…