如何在 Rocky Linux 上安装 Apache Kafka?

news2025/1/12 6:04:45

Apache Kafka 是一种分布式数据存储,用于实时处理流数据,它由 Apache Software Foundation 开发,使用 Java 和 Scala 编写,Apache Kafka 用于构建实时流式数据管道和适应数据流的应用程序,特别适用于企业级应用程序和关键任务应用程序,它是最受欢迎的数据流平台之一,被数千家公司用于高性能数据管道、流分析和数据集成。

Apache Kafka 将消息传递、存储和流处理结合在一个地方,允许用户设置高性能和强大的数据流,用于实时收集、处理和流式传输数据。

在本教程中,我们将在 Rocky Linux 服务器上安装 Apache Kafka,并学习 Kafka 作为消息代理的基本用法,通过 Kafka 插件流式传输数据。

先决条件

要学习本教程,您需要满足以下要求:

  • Rocky Linux 服务器,您可以使用 Rocky Linux v8 或 v9。
  • 具有 sudo root 权限的非 root 用户。

安装 Java OpenJDK

Apache Kafka 是一个基于 Java 的应用程序,要安装 Kafka,您将首先在您的系统上安装 Java,在撰写本文时,最新版本的 Apache Kafka 至少需要 Java OpenJDK v11。

在第一步中,您将从官方的 Rocky Linux 存储库安装 Java OpenJDK 11。

运行下面的 dnf 命令将 Java OpenJDK 11 安装到您的 Rocky Linux 系统。

sudo dnf install java-11-openjdk

当提示确认安装时,输入y并按ENTER继续。

安装 Java 后,使用以下命令验证 Java 版本,您将看到Java OpenJDK 11安装在您的 Rocky Linux 系统上。

java version

现在已经安装了 Java,接下来您将开始安装 Apache Kafka。

下载 Apache Kafka

Apache Kafka 为包括 Linux/Unix 在内的多种操作系统提供多种二进制包,在此步骤中,您将为 Kafka 创建一个新的专用系统用户,下载 Kafka 二进制包,并配置 Apache Kafka 安装。

运行以下命令创建一个名为kafka的新系统用户。这将为 Kafka 创建一个新的系统用户,默认主目录为“/opt/kafka”,该目录将用作 Kafka 安装目录。

sudo useradd -r -d /opt/kafka -s /usr/sbin/nologin kafka

现在将您的工作目录移动到“/opt”。然后,通过下面的 curl 命令下载 Apache Kafka 二进制包。您现在将看到文件kafka.tar.gz。

cd /opt 
sudo curl -fsSLo kafka.tgz https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz

通过 tar 命令提取文件“kafka.tar.gz”,并将提取的目录重命名为*“/opt/kafka* ”。

tar -xzf kafka.tgz 
sudo mv kafka_2.12-3.3.1 /opt/kafka

接下来,通过下面的 chmod 命令 将“/opt/kafka”目录的所有权更改为“ kafka ”用户。

sudo chown -R kafka:kafka /opt/kafka

之后,为 Apache Kafka 创建一个新的日志目录。然后,通过 nano 编辑器编辑默认配置“server.properties”。

sudo -u kafka mkdir -p /opt/kafka/logs 
sudo -u kafka nano /opt/kafka/config/server.properties

Kafka 日志目录将用于存储 Apache Kafka 日志,您必须在 Kakfka 配置 sertver.properties 上定义日志目录。

取消注释“log.dirs”选项并将值更改为/opt/kafka/logs。

# Apache Kafka 的日志配置
log.dirs=/opt/kafka/logs

完成后保存文件并退出编辑器。

您现在已经完成了 Apache Kafka 的基本安装和配置。接下来,您将设置并运行 Apache Kafka 作为系统服务。

将 Kafka 作为 Systemd 服务运行

Apache Kafka 软件包包括另一个应用程序 Zookeeper,用于集中服务和维护 Kafka 控制器选择、主题配置以及 Apache Kafka 集群的 ACL(访问控制列表)。

要运行 Apache Kafka,您必须先在您的系统上运行 Zookeeper。在此步骤中,您将为 Zookeeper 和 Apache Kafka 创建一个新的 systemd 服务文件。这两项服务也将在同一用户kafka下运行。

使用以下命令 为 Zookeeper /etc/systemd/system/zookeeper.service 创建一个新的服务文件。

sudo nano /etc/systemd/system/zookeeper.service

将配置添加到文件中。

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

完成后保存文件并退出编辑器。

接下来,使用以下命令 为 Apache Kafka /etc/systemd/system/kafka.service创建一个新的服务文件。*

sudo nano /etc/systemd/system/kafka.service

将以下配置添加到文件中。您可以在[Unit]部分看到,Kafka 服务需要先运行zookeeper.service,并且它始终在zookeeper.service之后运行。

[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/logs/start-kafka.log 2>&1'
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

完成后保存文件并退出编辑器。

接下来,运行以下 systemctl 命令以重新加载 systemd 管理器并应用新服务。

sudo systemctl daemon-reload

现在使用以下命令启动 zookeeper 和 kafka 服务。

sudo systemctl start zookeeper
sudo systemctl start kafka

通过下面的 systemctl 命令使 kafka 和 zookeeper 服务在系统启动时自动运行。

sudo systemctl enable zookeeper
sudo systemctl enable kafka

最后,使用以下命令验证 zookeeper 和 kafka 服务。

sudo systemctl status zookeeper
sudo systemctl status kafka

在下面的输出中,您可以看到 zookeeper 服务的当前状态正在运行并且它也已启用。

下面是 kafka 服务状态,它正在运行并且服务已启用。

现在您已经完成了 Apache Kafka 安装并且它现在已经启动并正在运行。接下来,您将学习 Apache Kafka 作为生成消息的消息代理的基本用法,还将学习如何使用 Kafka 插件实时流式传输数据。

使用Kafka Console Producer和Consumer的基本操作

在开始之前,将用于此示例的所有命令均由“/opt/kafka/bin”目录中可用的 Kafka 包提供。

在此步骤中,您将学习如何创建和列出 Kafka 主题、启动生产者并插入数据、通过消费者脚本流式传输数据,最后,您将通过删除 Kafka 主题来清理您的环境。

运行以下命令创建一个新的 Kafka 主题。您将使用脚本“ kafka-topics.sh ”创建一个名为“ TestTopic ”的新主题,其中包含一个复制和分区。

sudo -u kafka /opt/kafka/bin/kafka-topics.sh \ 
--create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TestTopic

现在运行以下命令来验证 Kafka 上的主题列表。您应该看到“ TestTopic ”已在您的 Kafka 服务器上创建。

sudo -u kafka /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

接下来,要生成消息,您可以使用脚本kafka-console-producser.sh,然后将数据插入将被处理的内容中。

运行以下命令启动 Kafka Console Producer 并将主题指定为TestTopic。

sudo -u kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

获得 Kafka Console Producer 后,输入将要处理的任何消息。

接下来,打开一个新的终端会话并登录到服务器。然后通过脚本kafka-conosle-consumer.sh打开 Kafka Console Consumer 。

运行以下命令启动 kafka 控制台消费者并将主题指定为TestTopic。

sudo -u kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TestTopic --from-beginning

在下面的屏幕截图中,您可以看到来自 Kafka 控制台生产者的所有消息都被处理到消费者控制台。您还可以在 Console Producer 上键入其他消息,消息将自动处理并显示在 Console Consumer 屏幕上。

现在按 Ctrl+c 退出 Kafka Console Producer 和 Kafka Console Consumer。

要清理您的 Kafka 环境,您可以通过以下命令 删除和移除TestTopic.

sudo -u kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic TestTopic

使用 Kafka Connect 插件流式传输数据

Apache Kafka 提供了多个插件,可用于从多个源流式传输数据。默认情况下,附加的 Kafka 库插件在*“/opt/kafka/libs* ”目录中可用,您必须通过配置文件*“/opt/kafka/config/connect-standalone.properties* ”启用 kafka 插件。在这种情况下,对于 Kafka 独立模式。

运行以下命令编辑 Kafka 配置文件“ /opt/kafka/config/connect-standalone.properties ”。

sudo -u kafka nano /opt/kafka/config/connect-standalone.properties

取消注释“plugin.path”行并将值更改为插件的库目录“ /opt/kakfa/libs ”。

plugin.path=/opt/kafka/libs

完成后保存文件并退出编辑器。

接下来,运行以下命令创建一个新文件“ /opt/kafka/test.txt ”,该文件将用作 Kafka 流的数据源。

sudo -u kafka echo -e "Test message from file\nTest using Kafka connect from file" > /opt/kafka/test.txt

现在运行以下命令,使用配置文件connect-file-source.properties和connect-file-sink.properties在独立模式下启动 Kafka Consumer。

此命令和配置是 Kafka 数据流的默认示例,其中包含您刚刚创建的源文件test.txt,此示例还将自动创建一个新主题“connect-test”,您可以通过 Kafka 控制台消费者访问该主题。

cd /opt/kafka 
sudo -u kafka /opt/kafka/bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

现在,打开另一个终端会话并运行以下命令以启动 Kafka 控制台消费者。此外,将主题指定为“connect-test”。您将看到来自文件“ test.txt ”的消息。

sudo -u kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

现在您可以更新文件test.txt,新消息将在 Kafka 控制台消费者上自动处理和流式传输。

运行以下命令以使用新消息 更新文件test.txt.

sudo -u kafka echo "Another test message from file test.txt" >> test.txt

在以下输出中,您可以看到当文件test.txt发生更改时,Kafka 会自动处理新消息。您现在已经完成了 Kafka connect 插件的基本用法,通过文件流式传输消息。

结论

通过本指南,您了解了如何在 Rocky Linux 系统上安装 Apache Kafka,您还了解了用于生成和处理消息的 Kafka Producer Console 以及用于接收消息的 Kafka Consumer 的基本用法,最后,您还学习了如何启用 Kafka 插件并使用 Kafka Connect 插件从文件实时流式传输消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

robots.txt漏洞

robots.txt漏洞描述: 搜索引擎可以通过robots文件可以获知哪些页面可以爬取,哪些页面不可以爬取。Robots协议是网站国际互联网界通行的道德规范,其目的是保护网站数据和敏感信息、确保用户个人信息和隐私不被侵犯,如果robots.txt文件编辑的太过详细,反而会泄露网站的敏感…

[附源码]java毕业设计基于学生信息管理系统

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

Delphi中关于PChar、Char数组、string[](ShortString)及结构体长度及占用空间的一些特性说明和测试

关于特性 1,string和Char数组都是一块内存, 其中存放连续的字符. string保存具体字符的内存对用户 是透明的, 由Delphi管理它的分配, 复制和释放, 用户不能干预2,关于ShortString,内存中用第一个字节来表示字符串的长度。FF255,所以这个特性…

【MySQL】MySQL复制与高可用水平扩展架构实战(MySQL专栏启动)

📫作者简介:小明java问道之路,专注于研究 Java/ Liunx内核/ C及汇编/计算机底层原理/源码,就职于大型金融公司后端高级工程师,擅长交易领域的高安全/可用/并发/性能的架构设计与演进、系统优化与稳定性建设。 &#x1…

天王刘德华走红毯,到哪他都是最耀眼的明星

第三十五届金鸡奖,已经在福建厦门落下帷幕,如果要说本届金鸡奖谁收获最大,无疑是天王刘德华。在金鸡奖颁奖典礼现场,功夫巨星吴京登上热搜,然而热搜的主角却不是他,而是天王刘德华。 在本届金鸡奖颁奖典礼现…

cubeIDE开发, stm32调试信息串口通信输出显示

关于cubeIDE开发基本技巧及流程,本文不详细叙述,请参考:cubeIDE快速开发流程_py_free的博客-CSDN博客_cubeide汉化 一、stm32串口配置 本文采用的开发板是stm32L496VGT3,其有两个 USB 接口,一个为 USB ST-link 复用接口&#xff…

代码随想录——最长递增子序列的个数

题目 给定一个未排序的整数数组,找到最长递增子序列的个数。 示例 1: 输入: [1,3,5,4,7] 输出: 2 解释: 有两个最长递增子序列,分别是 [1, 3, 4, 7] 和[1, 3, 5, 7]。 示例 2: 输入: [2,2,2,2,2] 输出: 5 解释: 最长递增子序列的长度是1,并且…

Oracle 表创建和表管理

1.表的命名 必须以字母开头字符长度在1-30之间只能包含A-Z,a-z,0-9,_,$和#被同一个用户拥有的对象不能有重复的名字 2.表的创建 SQL> create table t01(id number(4),name varchar2(15));Table created.SQL> desc t01Name …

现场直击!维视智造携多款明星产品亮相VisionChina 2022深圳机器视觉展

11月15日,2022年中国(深圳)机器视觉展在深圳国际会展中心(宝安新馆)盛大开幕,维视智造携MV-CR读码相机、3D线激光相机、VisionBank AI多相机智能视觉系统等多款行业领先产品及解决方案亮相。 1 ►现场速击 …

C基础--内存对齐问题(结构体对齐)

问题现象 在调试一个软件功能时,发现一个结构体对齐的问题,以前没有太关注,现在把它总结出来。先看示例: 结构体1: typedef struct {char magic[4];uint32_t crc32;uint32_t lenght;uint16_t ver;uint16_t IFrameCnt…

多线程DPDK应用的内存优化

作者 Conor Walsh is a software engineering intern with the Architecture Team of Intel’s Network Platform Group (NPG), based in Intel Shannon (Ireland). 引言 高速包处理是一种资源密集型应用。一种解决方案是将包处理流水线(pipeline)分离到多线程以提高程序性能…

大一新生HTML期末作业,网页制作作业——海鲜餐饮网站登录页面(单页面)HTML+CSS+JavaScript

👨‍🎓静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计👩‍🎓,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等,用的最多的还是DW,当然不同软件写出的…

STM32G0开发笔记-Platformio+libopencm3-FreeRTOS和FreeModbus库使用

title: STM32G0开发笔记-Platformiolibopencm3-FreeRTOS和FreeModbus库使用 tags: STM32MCUSTM32G070libopencm3MonkeyPiFreeRTOSModbus categories: STM32 date: 2022-9-11 19:52:05 [原文:makerinchina.cn] 使用Platformio平台的libopencm3开发框架来开发STM32…

docker -- 入门篇 (数据卷、自定义镜像、安装mysql redis)

1 数据卷 采用上一章节创建的centos镜像启动容器 doc01 docker run -it --name doc01 lhy/centos:1.00 2 数据卷容器 启动子容器doc02 实现继承doc01的关系 docker run -it --name doc02 --volumes-from doc01 lhy/centos:1.00 启动子容器doc03 实现继承doc01的关系 docker…

【计算机毕业设计】病人跟踪治疗信息管理系统源码

一、系统截图(需要演示视频可以私聊) 摘 要 病人跟踪治疗信息管理系统采用B/S模式,促进了病人跟踪治疗信息管理系统的安全、快捷、高效的发展。传统的管理模式还处于手工处理阶段,管理效率极低,随着病人的不断增多&a…

mac pro M1(ARM)安装:安装zookeeper可视化工具PrettyZoo、ZooKeeperAssistant

0. 引言 今天安装zookeeper的可视化工具遇到一些问题,将其记录下来,以供后续的同学参考,在mac软件安装上少走弯路。同时也让大家体会下这两款不同的zk可视化工具的差别 1. 安装PrettyZoo 1、下载 直接在github上选择版本下载: …

8 - 复习总结java中的继承与多态

1. 继承 1.1 为什么需要继承 先看一个例子: 比如猫和狗都是动物,都可以用一个类来描述。 使用java语言来描述: class Cat{String name;int age;float wight;public void bark(){System.out.println(name"汪汪汪叫");}public void eat(){S…

PyTorch使用快速梯度符号攻击(FGSM)实现对抗性样本生成(附源码和数据集MNIST手写数字)

需要源码和数据集请点赞关注收藏后评论区留言或者私信~~~ 一、威胁模型 对抗性机器学习,意思是在训练的模型中添加细微的扰动最后会导致模型性能的巨大差异,接下来我们通过一个图像分类器上的示例来进行讲解,具体的说,会使用第一…

Reactor 模型

文章目录1、网络编程关注的事件2、网络 IO 的职责2.1、IO 检测2.1.1、连接建立2.1.2、连接断开2.1.3、消息到达2.1.4、消息发送2.2、IO 操作2.2.1、连接建立2.2.2、连接断开2.2.3、连接到达2.2.4、消息发送3、Reactor 模式3.1、概念3.2、面试:Reactor 为什么使用非阻…

利用jenkins直接构件docker镜像并发布

一、本服务器构建 1.jenkins安装完成之后,打jenkins,选择新建任务,如: 2.进行〔源码管理〕配置,如: 3.构建执行配置,如: APP_NAMEtest-project APP_PORT8083 RUN_ENVprod cd /var/…