Kafka快速入门

news2025/1/12 2:44:49

文章目录

    • 安装部署
      • 集群规划
      • 集群部署
      • kafka群起脚本
    • Kafka命令行操作
      • 主题命令行操作
      • 生产者命令行操
      • 消费者命令行操作

安装部署

集群规划

在这里插入图片描述

集群部署

  1. 官方下载地址:http://kafka.apache.org/downloads.html
  2. 上传安装包到02的/opt/software目录下
[atguigu@hadoop02 software]$ ll
-rw-rw-r--. 1 atguigu atguigu  86486610 3月  10 12:33 kafka_2.12-3.0.0.tgz
  1. 解压安装包到/opt/module/目录下
[atguigu@hadoop02 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
  1. 进入到/opt/module目录下,修改解压包名为kafka
[atguigu@hadoop02 module]$ mv kafka_2.12-3.0.0 kafka
  1. 修改config目录下的配置文件server.properties内容如下
[atguigu@hadoop02 kafka]$ cd config/
[atguigu@hadoop02 config]$ vim server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=02
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop02:2181,hadoop03:2181,hadoop04:2181/kafka

  1. 配置环境变量
[atguigu@hadoop02 kafka]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[atguigu@hadoop02 kafka]$ source /etc/profile
  1. 分发环境变量文件并source
[atguigu@hadoop02 kafka]$ xsync /etc/profile.d/my_env.sh
==================== hadoop02 ====================
sending incremental file list

sent 47 bytes  received 12 bytes  39.33 bytes/sec
total size is 371  speedup is 6.29
==================== hadoop03 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.Sd7MUA" failed: Permission denied (13)

sent 465 bytes  received 126 bytes  394.00 bytes/sec

total size is 371  speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2]
==================== hadoop04 ====================
sending incremental file list
my_env.sh
rsync: mkstemp "/etc/profile.d/.my_env.sh.vb8jRj" failed: Permission denied (13)

sent 465 bytes  received 126 bytes  1,182.00 bytes/sec
total size is 371  speedup is 0.63
rsync error: some files/attrs were not transferred (see previous errors) (code 23) at main.c(1178) [sender=3.1.2]# 这时你觉得适用sudo就可以了,但是真的是这样吗?
[atguigu@hadoop02 kafka]$ sudo xsync /etc/profile.d/my_env.sh
sudo: xsync:找不到命令
# 这时需要将xsync的命令文件,copy到/usr/bin/下,sudo(root)才能找到xsync命令
[atguigu@hadoop02 kafka]$ sudo cp /home/atguigu/bin/xsync /usr/bin/
[atguigu@hadoop02 kafka]$ sudo xsync /etc/profile.d/my_env.sh
# 在每个节点上执行source命令,如何你没有xcall脚本,就手动在三台节点上执行source命令。
[atguigu@hadoop02 kafka]$ xcall source /etc/profile

  1. 分发安装包
[atguigu@hadoop02 module]$ xsync kafka/
  1. 修改配置文件中的brokerid
    分别在hadoop03和hadoop04上修改配置文件server.properties中broker.id=03、broker.id=04
[atguigu@hadoop03 kafka]$ vim config/server.properties
broker.id=03

[atguigu@hadoop04 kafka]$ vim config/server.properties
broker.id=04

  1. 启动集群
    ① 先启动Zookeeper集群
[atguigu@hadoop02 kafka]$ zk.sh start 

② 依次在hadoop02、hadoop03、hadoop04节点上启动kafka

[atguigu@hadoop02 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop03 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@hadoop04 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
  1. 关闭集群
[atguigu@hadoop02 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop03 kafka]$ bin/kafka-server-stop.sh
[atguigu@hadoop04 kafka]$ bin/kafka-server-stop.sh

kafka群起脚本

  1. 脚本编写
    在/home/atguigu/bin目录下创建文件kafka.sh脚本文件:
#! /bin/bash
if (($#==0)); then
  echo -e "请输入参数:\n start  启动kafka集群;\n stop  停止kafka集群;\n" && exit
fi

case $1 in
  "start")
    for host in hadoop03 hadoop02 hadoop04
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
      done
      ;;
  "stop")
    for host in hadoop03 hadoop02 hadoop04
      do
        echo "---------- $1 $host 的kafka ----------"
        ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh"
      done
      ;;
    *)
        echo -e "---------- 请输入正确的参数 ----------\n"
        echo -e "start  启动kafka集群;\n stop  停止kafka集群;\n" && exit
      ;;
esac

  1. 脚本文件添加权限
[atguigu@hadoop02 bin]$ chmod +x kafka.sh

Kafka命令行操作

主题命令行操作

  1. 查看操作主题命令需要的参数
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh
  1. 重要的参数如下
    在这里插入图片描述
  2. 查看当前服务器中的所有topic
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
  1. 创建一个主题名为first的topic
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop02:9092 --create --replication-factor 3 --partitions 3 --topic first
  1. 查看Topic的详情
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop02:9092 --describe --topic first
Topic: first    TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 02     Replicas: 02,03,04   Isr: 02,03,04
  1. 修改分区数(注意:分区数只能增加,不能减少)
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop02:9092 --alter --topic first --partitions 3
  1. 再次查看Topic的详情
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop02:9092 --describe --topic first
Topic: first    TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 02     Replicas: 02,03,04   Isr: 02,03,04
        Topic: first    Partition: 1    Leader: 03     Replicas: 03,04,02   Isr: 03,04,02
        Topic: first    Partition: 2    Leader: 04     Replicas: 04,02,03   Isr: 04,02,03
  1. 删除topic
[atguigu@hadoop02 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop02:9092 --delete --topic first

生产者命令行操

  1. 查看命令行生产者的参数
[atguigu@hadoop02 kafka]$ bin/kafka-console-producer.sh 
  1. 重要的参数如下:
    在这里插入图片描述

  2. 生产消息

[atguigu@hadoop02 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop02:9092 --topic first
>hello world
>atguigu  atguigu

消费者命令行操作

  1. 查看命令行消费者的参数
[atguigu@hadoop02 kafka]$ bin/kafka-console-consumer.sh 
  1. 重要的参数如下:
    在这里插入图片描述

  2. 消费消息

[atguigu@hadoop02 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --topic first
  1. 从头开始消费
[atguigu@hadoop02 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --from-beginning --topic first

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/149554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5.hadoop系列之HDFS NN和2NN工作机制

1.第一阶段:NameNode启动 1.第一次启动NameNode格式化后,创建Fsimage和Edits文件,如果不是第一次启动,直接加载Fsimage和Edits到内存 2.客户端对元数据进行增删改请求 3.NameNode记录操作日志,更新滚动日志 4.NameNod…

elasticsearch-head使用问题汇总

1、elasticsearch-head数据预览、基本查询、复合查询模块无法查询es文档记录(1)解决办法复制vendor.jsdocker cp elasticsearch-head1:/usr/src/app/_site/vendor.js vendor.js修改vendor.js第6886行,将“contentType: "application/x-w…

高性能分布式缓存Redis-第三篇章

高性能分布式缓存Redis-第三篇章一、分布式锁1.1、高并发下单超卖问题1.2、何为分布式锁1.3、分布式锁特点1.4、基于Redis实现分布式锁1.4.1、实现思路:1.4.2、实现代码版本1.4.3、错误解锁问题解决1.4.4、锁续期/锁续命1.4.5、锁的可重入/阻塞锁(rediss…

微服务 Spring Boot 整合 Redis BitMap 实现 签到与统计

文章目录⛄引言一、Redis BitMap 基本用法⛅BitMap 基本语法、指令⚡使用 BitMap 完成功能实现二、SpringBoot 整合 Redis 实现签到 功能☁️需求介绍⚡核心源码三、SpringBoot 整合Redis 实现 签到统计功能四、关于使用bitmap来解决缓存穿透的方案⛵小结⛄引言 本文参考黑马 …

【第24天】SQL进阶-查询优化- performance_schema系列实战一:利用等待事件排查MySQL性能问题(SQL 小虚竹)

回城传送–》《32天SQL筑基》 文章目录零、前言一、背景二、performance_schema配置配置表启用等待事件的采集与记录三、sysbench基准测试工具3.1 安装和使用sysbench3.1.1 yum安装3.1.2 查看版本信息3.1.3 sysbench 使用说明3.2 sysbench 测试服务器cpu性能3.3 sysbench测试硬…

Hadoop 入门基础 及HiveQL

一、hadoop 解决了什么问题?即hadoop 产生背景 一个能够轻松方便、经济实惠地存储和分析大量数据的非常流行的开源项目。 二、hadoop 是如何低成本地解决大数据的存储和分析的?即hadoop 原理,hadoop 的组成部分 Hadoop的创始人、Cloudera首…

Java图形化界面---基本组件

目录 一、基本组件介绍 二、Diaolg对话框 (1)Dialog (2) FileDialog 一、基本组件介绍 Button 按钮 Canvas 用于绘图的画布 Checkbox 复选框组件 CheckboxGroup 用于将多个…

【阶段三】Python机器学习06篇:模型评估函数介绍(分类模型)

本篇的思维导图: 模型评估函数介绍(分类模型) accuracy_score()函数 作用:accuracy_score函数计算了模型准确率。在二分类或者多分类中,预测得到的标签,跟真实标签比较,计算准确率。 注意事项:在正负样本不平衡的情况下,准确率这个评价指标有很大的缺陷。比如数据样本…

数据库管理-第五十一期 新年新气象(20230108)

数据库管理 2023-01-08第五十一期 新年新气象1 新年快乐2 旧账3 软硬件对比4 新气象总结第五十一期 新年新气象 1 新年快乐 2023年来了,我也没有第一时间写一篇写文章给大家祝福,第一呢是因为某些原因元旦假期也没咋休息,其次就是因为本周又…

Allegro174版本新功能介绍之新增几种沿着目标打过孔模式

Allegro174版本新功能介绍之新增几种沿着目标打过孔模式 Allegro在低版本的时候,就已经有了沿着目标打过孔的功能,在升级到了174版本后,又新增了几种打过孔的模式,类似下图 以第一种模式举例介绍说明 点击Place

DFT知识点扫盲——DFT scan chain

先说一下tsmc的std celltsmc 7nm工艺下有专门的std synccell 命名如下:SDFSYNC1RPQD1XXXXVTSDFSYNC1SNQD1XXXXVTSDFSYNC1QD1XXXXVT不考虑VT, PWR和track,电压等差别,整个工艺库下只有这三种实际在项目中synccell一般直接上ULVT,既…

2022年第四届全国高校计算机能力挑战赛c++组决赛

A 题目描述 小丽好朋友的生日快到了,她打算做一些折纸放在幸运罐中作为生日礼物。小丽计划总共 需要a颗星星以及b只纸鹤。现在市场上卖的到的星星纸(折小星星的专用纸)一张可以折c颗小星星,一张纸鹤纸(折纸鹤的专用纸)可以折d只小纸鹤。她准备一共买k张…

【C++】模板进阶

​🌠 作者:阿亮joy. 🎆专栏:《吃透西嘎嘎》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 目录👉非类型模…

三次握手四次挥手

三次握手&四次挥手 三次握手 四次挥手

RA-Net:一种混合深度注意感知网络,用于提取CT扫描中的肝脏和肿瘤

摘要 本文提出了一种三维混合残差注意感知分割网络,称为RA-UNet,用于提取肝脏感兴趣区域(VOI)并从这个感兴趣区域(VOI)中分割肿瘤。这个网络的基本架构为三维UNet。它结合了低层次特征图和高层次特征图提取…

【从零开始学习深度学习】39. 梯度下降优化之动量法介绍及其Pytorch实现

动量法的提出主要是为了优化在多变量目标函数中不同自变量梯度下降过程中更新速度快慢不均的问题,并且使目标函数向最优解更快移动。 目录1. 梯度下降中的问题2. 动量法介绍及原理2.1 动量法的数学解释---指数加权移动平均2.2 由指数加权移动平均理解动量法3. 从零实…

【HTML | CSS | Javascript】一款响应式精美简历模板分享(万字长文 | 附源码)

💂作者简介: THUNDER王,一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学会计学专业大二本科在读,同时任汉硕云(广东)科技有限公司ABAP开发顾问。在学习工作中,我通常使用偏后…

JS面试题--深入JavaScript运行原理

深入JavaScript运行原理 JavaScript让人迷惑的知识点 JavaScript是一门编程语言 浏览器的工作原理 一般的浏览器有以下主要部分组成:1. 用户界面包括浏览器中可见的地址输入框,浏览器前进返回按钮,打开书签,打开历史记录等用户可…

NEUQ week10 题解

P1636 Einstein学画画 题目描述 Einstein 学起了画画。 此人比较懒~~,他希望用最少的笔画画出一张画…… 给定一个无向图,包含 nnn 个顶点(编号 1∼n1 \sim n1∼n),mmm 条边,求最少用多少笔可以画出图中…

对于NPS 的学习和认知

企业存在的唯一使命是创造顾客 —— 彼得德鲁克对于现代的多数组织而言,净推荐值(NPS)是一种衡量顾客满意度的“温度计”。NPS看似是一种管理工具,其实更多的是对企业基因的一种改变,其倡导的是内生性的问题&#xff0…