Kafka的搭建及使用

news2025/1/13 10:42:28

Kafka搭建及使用

Kafka搭建

1、上传解压修改环境变量
# 解压
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local/soft
mv kafka_2.11-1.0.0 kafka-1.0.0

tar -xvf 是一个在Unix和类Unix操作系统(如Linux和macOS)中用于解压缩或解包.tar文件的命令。
tar -zxvf 是一个在Unix和类Unix操作系统(如Linux和macOS)中用于解压缩.tar.gz或.tgz文件的命令。
tar:是“tape archive”的缩写,用于文件的归档和压缩。
-z:这个选项告诉tar命令接下来要处理的文件是通过gzip进行压缩的。因此,tar会在解压归档文件之前先解压gzip压缩。
-x:这个选项表示要进行的是解包(或解压)操作,即从归档文件中提取文件。
-v:这个选项代表“verbose”,意味着命令在执行时会显示详细的操作信息,比如正在解压的文件名。
-f:这个选项后面跟着要操作的文件名,告诉tar命令接下来要处理的文件名。注意,-f选项和文件名之间不应该有空格。


# 配置环境变量
vim /etc/profile

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
2、修改配置文件

vim config/server.properties

# 2181:zookeeper端口
broker.id=0 每一个节点broker.id 要不一样
zookeeper.connect=master:2181,node1:2181,node2:2181/kafka
log.dirs=/usr/local/soft/kafka-1.0.0/data   数据存放的位置
3、将kafka文件同步到node1,node2
# 同步kafka文件   -r 选项表示递归复制,这对于复制目录及其所有子目录和文件是必要的。
scp -r kafka-1.0.0/ node1:`pwd`
scp -r kafka_2.11-1.0.0/ node2:`pwd`

# 修改node1、node2中的/etc/profile,增加Kafka环境变量
export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin

#  在ndoe1和node2中执行source
source /etc/profile
4、修改node1和node2中的broker.id

vim config/server.properties

# node1
broker.id=1
# node2
broker.id=2
5、启动kafka
# 1、需要启动zookeeper,  kafka使用zk保存元数据
# 需要在每个节点中执行启动的命令
zkServer.sh start
# 查看启动的状体
zkServer.sh status

#若不清楚指令用法,例如:可以使用kafka-console-producer.sh help来查找对应参数用法

# 2、启动kafka,每个节点中都要启动(去中心化的架构)
# -daemon后台启动
kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

#注:由于是去中心化的,所以指定一台即可(master:9092,node1:9092,node2:909),但是怕它会宕机,所以保险起见都写上
# 测试是否成功
#生产者 --topic shujia:指定topic名称;9092:kafka端口
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic shujia

# 消费者
 --from-beginning   从头消费,如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic shujia


#进入zookeeper的命令行界面,查看zookeeper上kafka的元数据信息
zkServer.sh
#可以help查询操作帮助
help

#错误???
ZooKeeper JMX enabled by default
Using config: /usr/local/soft/zookeeper-3.5.7/bin/../conf/zoo.cfg
Usage: ./zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}

Kafka使用

1、创建topic

在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

--replication-factor  ---每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量
--partition   --分区数,  根据数据量设置
--zookeeper zk的地址,将topic的元数据保存在zookeeper中;/kafka:指定创建Topic的元数据在zookeeper中的路径

kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata

进入zookeeper的命令行,查看里面所存储的元数据信息

启动:zkServer.sh

进行命令行界面:zkCli.sh

在这里插入图片描述

2、查看topic描述信息
kafka-topics.sh --describe --zookeeper master:2181,node1:2181,node2:2181/kafka --topic bigdata
  • Topic: 主题名称,这里是bigdata
  • Partition: 分区编号,用于标识主题中的具体分区,从0开始编号。(除partition:0中的数字代表分区,后面的数字均代表节点
  • Leader: 领导副本(Leader Replica)的编号。每个分区都有一个领导副本,所有的生产者(Producer)和消费者(Consumer)都通过这个领导副本来读写数据。如果领导副本不可用,Kafka会自动从ISR中选择一个新的领导副本。
  • Replicas: 副本列表,列出了所有副本的编号。这些副本分布在不同的Kafka broker上,以提供数据的冗余和容错。
  • ISR: 同步副本(In-Sync Replicas)列表,是Replicas的一个子集。ISR中的副本与领导副本保持同步,即它们已经复制了领导副本上的所有消息。只有当副本与领导副本保持同步时,它才会被包含在ISR中。ISR的存在是为了确保在发生故障转移时,新的领导副本能够拥有最新的数据。

在这里插入图片描述

3、获取所有topic

__consumer_offsetsL kafka用于保存消费便宜量的topic

kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka
4、创建控制台生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
5、创建控制台消费者
 --from-beginning   从头消费 如果不在执行消费的新的数据
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node3:9092 --from-beginning --topic bigdata

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node3:9092 --from-beginning --topic students1000
 
6、kafka数据保存的方式
# 1、保存的文件
/usr/local/soft/kafka_2.11-1.0.0/data

# 2,每一个分区每一个副本对应一个目录

# 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
00000000000000000000.log
00000000000000000001.log
00000000000000000002.log

# 4、滚动生成文件的策略
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# 5、文件删除的策略,默认时7天,以文件为单位删除
#可以使用 --delete实现删除的标记,再次查询时则查不到该topic,但是数据依旧存在;可以通过配置实现真正的删除,但是我们无需配置
log.retention.hours=168

Flink整合kafka

1、idea中整合
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>
2、集群中整合
# 将flink-sql-connector-kafka-{flink.version}.jar包上传到Flink的lib目录下
cd /usr/local/soft/flink-{flink.version}/lib

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1964007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java调用WebService接口

案例&#xff1a; 接口&#xff1a; http://xxxxx:8080/GetSPService.asmx 调用方法&#xff1a;GetSPByStnCodeToJsonStr 参数1&#xff1a;begin 开始时间 格式 yyyymmdd hh:mi &#xff08;日和小时之间有空格&#xff09; 例如&#xff1a;20230718 06:00 参数2: end …

IO模型思维导图

背景 &#xff1a; 并发服务器模型可以在同一时刻响应多个用户请求 多路复用IO&#xff1a; 4.多路复用IO 1.select 2.poll 3.epoll 1.select 缺点: 1.select监听文件描述符最大个数为1024 &#xff08;数组&#xff…

【CN】Argo 持续集成和交付(二)

7.25.通知 概述 Argo CD 通知持续监控 Argo CD 应用程序&#xff0c;并提供一种灵活的方式来通知用户应用程序状态的重要变化。使用灵活的触发器和模板机制&#xff0c;可以配置何时发送通知以及通知内容。Argo CD 通知包含有用的触发器和模板目录。因此&#xff0c;可以直接…

什么是住宅IP代理?有何用途?

在大数据时代&#xff0c;互联网成为我们生活与工作中不可或缺的一部分。然而&#xff0c;随着网络环境的日益复杂&#xff0c;隐私保护、网络访问限制等问题也逐渐凸显&#xff1b;以及跨境业务蓬勃发展。在这样的背景下&#xff0c;住宅IP代理作为一种技术解决方案&#xff0…

android studio 无法识别androidTest模块Test模块

android studio 无法识别androidTest模块Test模块 问题案例解决方法 androidTest是UI测试&#xff0c;可以运行在设备或虚拟设备上&#xff0c;需要编译打包为APK在设备上运行 版本依赖 android studio 2023.2.1 gradle kts架构 问题案例 下面无法识别到androidTest&#xff…

供应链|OR论文解读:具有内生随机交货期的双源库存最优策略

编者按 本次解读的文章发表于 Operations Research&#xff0c;原文信息&#xff1a;Song, J. S., Xiao, L., Zhang, H., & Zipkin, P. (2017). Optimal policies for a dual-sourcing inventory problem with endogenous stochastic lead times. Operations Research, 65…

11. 统计(均值、方差、正态分布)和聚类(接近kmeans的聚类)分类(python和c++代码)

以下代码的每个函数功能都做了注释&#xff0c;分别用python和c代码做了具体的实现&#xff0c;不是最终效果&#xff0c;后续会继续优化。以下代码中&#xff0c;python代码在每个步骤处理完数据后都画了散点图显示了处理后的数据效果&#xff0c;c代码是从python代码翻译过来…

学习大数据DAY28 python基础语法

目录 思维导图 数据类型 变量 输入 输出 运算符 bool 类型 常量变量拼接 流程控制 选择结构 if 循环结构 while 及 for 循环 字符串(String) 字符串的索引截取 索引 1.len() #获取字符串长度 2.str.find()字符查找 ,找到返回索引&#xff0c;没找到返回-1 3.st…

[Linux安全运维] iptables包过滤

前言 防火墙是网络安全中非常重要的设备&#xff0c;是一种将内部网络和外部网络隔离开的技术。简单来说&#xff0c;防火墙技术就是访问控制技术&#xff0c;由规则和动作组成。 目录 前言1. Linux 包过滤防火墙1 .1 概述1 .2 四表五链结构1 . 2 .1 规则表1 . 2 .2 规则链1 .…

扩散模型系列ControlNet: Adding Conditional Control to Text-to-Image Diffusion Models

向文本到图像扩散模型添加条件控制 摘要解读&#xff1a; 我对摘要英文的理解&#xff1a; 我们提出了一个神经网络架构ControlNet&#xff0c;可以向大规模的预训练好的文本到图像的扩散模型中添加空间条件控制。ControlNet锁住了准备生产的大规模扩散模型&#xff0c;并且重…

SLAM特征提取新变革:神经符号学结合自适应优化,实现环境适应性大飞跃!

论文标题&#xff1a; A Neurosymbolic Approach to Adaptive Feature Extraction in SLAM 论文作者&#xff1a; Yasra Chandio, Momin A. Khan, Khotso Selialia, Luis Garcia, Joseph DeGol, Fatima M. Anwar 导读&#xff1a; 本研究提出了一种创新的神经符号学方法&a…

Mybatis进阶提升-(一)Mybatis入门

前言 Mybatis是Java 项目开发使用率非常高的一款持久层框架&#xff0c;它支持自定义 SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来配置和映射原始类型、接口和 Java POJO&#xff08…

python+selenium+unittest自动化测试框架

前言 关于自动化测试的介绍&#xff0c;网上已有很多资料&#xff0c;这里不再赘述&#xff0c;UI自动化测试是自动化测试的一种&#xff0c;也是测试金字塔最上面的一层&#xff0c;selenium是应用于web的自动化测试工具&#xff0c;支持多平台、多浏览器、多语言来实现自动化…

【实战】SpringBoot整合ffmpeg实现动态拉流转推

SpringBoot整合ffmpeg实现动态拉流转推 在最近的开发中&#xff0c;遇到一个 rtsp 协议的视频流&#xff0c;前端vue并不能直接播放&#xff0c;因此需要对流进行处理。在网上查阅后&#xff0c;ffmpeg和webrtc是最多的解决方案&#xff0c;但是使用webrtc的时候没成功&#x…

交通 | 不确定条件下旅行者路径选择的K阶均值偏差模型

摘要 现实世界中的交通网络通常具有随机特性&#xff0c;旅行时间可靠性自然成为影响旅行者路线选择的关键因素。在这种情况下&#xff0c;仅凭平均路径旅行时间可能无法充分代表路径对旅行者的吸引力&#xff0c;本研究引入了 k k k 阶均值偏差模型&#xff0c;用于优化大型…

紫辉创投开启Destiny of Gods首轮投资,伯乐与千里马的故事仍在继续

近日&#xff0c;上海紫辉创业投资有限公司&#xff08;以下简称“紫辉创投”&#xff09;宣布开启GameFi链游聚合平台Destiny of Gods首轮投资500,000美金&#xff0c;并与其达成全面战略及业务层合作&#xff0c;双方将协同布局链上生态&#xff0c;共同推动链游行业健康发展…

研究人员可以采用什么策略来批判性地评估和综合其领域的不同文献

VersaBot Literature Review 一键生成文献综述 研究人员可以采用各种策略来批判性地评估和综合其领域内的不同文献&#xff1b; 评估策略 审查方法论&#xff1a; 分析每个来源中使用的研究设计、样本选择、数据收集和分析方法。考虑每种方法的潜在偏见、局限性和优势。评估…

宝通科技携手昇腾技术首席陈仲铭,共探工业大模型与生态发展

在人工智能技术的浪潮中&#xff0c;宝通科技始终致力于探索和应用前沿技术&#xff0c;推动工业智能化的发展。7月26日&#xff0c;宝通科技特邀昇腾生态技术首席陈仲铭博士&#xff0c;为宝通员工带来了一场主题为《工业大模型与业界发展生态》的技术分享会。本次分享会不仅为…

从CNN到Transformer:基于PyTorch的遥感影像、无人机影像的地物分类、目标检测、语义分割和点云分类教程

原文链接&#xff1a;从CNN到Transformer&#xff1a;基于PyTorch的遥感影像、无人机影像的地物分类、目标检测、语义分割和点云分类教程https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247610610&idx5&snf973c3e430c89d6123ca8f4892086c55&chksmfa8271…

nameparser,一个强大的 Python 库!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个强大的 Python 库 - nameparser。 Github地址&#xff1a;https://github.com/derek73/python-nameparser 在处理用户数据时&#xff0c;尤其是涉及到用户姓名的场景下&am…