Kafka 下载与启动

news2025/1/7 5:46:48

目录

一. 前言

二. 版本下载

2.1. 版本说明

三. 快速启动

3.1. 下载解压

3.2. 启动服务

3.3. 创建一个主题(Topic)

3.4. 发送消息

3.5. 消费消息

3.6. 使用 Kafka Connect 来导入/导出数据

3.7. 使用 Kafka Stream 来处理数据

3.8. 停止 Kafka

四. 以系统服务方式启动 Kafka

4.1. Linux 系统

4.1.1. 创建服务文件

4.1.2. 启动服务

4.2. Windows 系统

4.2.1. 准备工作

4.2.2. 注册服务

4.2.3. 编辑注册表

4.2.4. 启动并测试

4.3. 常用命令


一. 前言

    在前文《Kafka 入门介绍》中,主要介绍了 Kafka 的由来、使用场景、相关的重要术语等,本文就着重介绍下Kafka 的下载、安装和启动,以便为接下使用它做好准备。

二. 版本下载

登录 Apache kafka 官方下载,https://kafka.apache.org/downloads.html

推荐下载 Scala 2.13 版本的

2.1. 版本说明

    比如我们下载的包是这样:kafka_2.13-3.6.1.tgz,2.13 是 Scala 编译器的版本,后面的 3.6.1 才是 Kafka 的版本。

  • 0.8版本以后才加入副本机制;新老 API 的主要区别在于连接 Kafka 的时候是配置 ZK 地址还是 Broker 的地址,新版 API 都是配置 Broker 的地址。
  • 0.8.2.2这个版本比较稳定,其中的老版本 consumer API 也比较稳定,但是不要用新版本producer API,BUG 比较多。
  • 0.9 版本的中的新版本的 producer API 比较稳定,但是新版本的 consumer API 不稳定。
  • 0.10 加入了 Kafka Streams,但不要在生产中使用,不过该版本的消息引擎功能没问题。建议使用 0.10.2.2,该版本的新版 Consumer API 和 Producer API 都比较稳定。
  • 0.11.0.0 在 Producer API 中加入了幂等性以及事务,另外就是对 Kafka 消息格式做了重构。增加了幂等性和事务才能使 Kafka 在流处理方面更加得心应手,但是稳定性不好。所以可以使用 0.11.0.3 这个版本。
  • 1.0和2.0主要是针对 Kafka Streams 做了各种改进。在消息引擎方面没有重大变化。
  • 3.x 版本去掉了对 ZK 的依赖,弃用 Java8,但依然可以使用,弃用 Scala2.12 版本,但依然可以使用。

三. 快速启动

3.1. 下载解压

$ tar -xzf kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1

3.2. 启动服务

注意:你的本地环境必须安装有 Java 8+。如何配置请参见《JDK 环境变量设置》。

运行 Kafka 需要使用 Zookeeper,所以你需要先启动 Zookeeper,如果你没有 Zookeeper,你可以使用 Kafka 自带打包和配置好的 Zookeeper。

# 注意:Apache Kafka2.8版本之后可以不需要使用ZooKeeper,内测中,文章末尾有体验的安装方式。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
...

打开另一个命令终端启动 Kafka 服务:

$ bin/kafka-server-start.sh config/server.properties &

一旦所有服务成功启动,那 Kafka 已经可以使用了。

3.3. 创建一个主题(Topic)

创建一个名为 test 的 Topic,只有一个分区和一个备份:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

创建好之后,可以通过运行以下命令,查看已创建的 Topic 信息:

$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:
Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0

或者,除了手工创建 Topic 外,你也可以配置你的 Broker,当发布一个不存在的 Topic 时自动创建Topic,点击这里查看如何配置自动创建topic时设置默认的分区和副本数自动创建 Topic 时设置默认的分区和副本数。

3.4. 发送消息

    Kafka 提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给 Kafka 集群。每一行是一条消息。

运行 Producer(生产者),然后在控制台输入几条消息到服务器:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

3.5. 消费消息

Kafka 也提供了一个消费消息的命令行工具,将存储的信息输出来,新打开一个命令控制台,输入:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。

可以使用 Ctrl-C 停止消费者客户端。

3.6. 使用 Kafka Connect 来导入/导出数据

    你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect 允许你不断地从外部系统提取数据到 Kafka,反之亦然。用 Kafka 整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。

查看 Kafka Connect 部分,了解更多关于如何将你的数据持续导入和导出 Kafka。

3.7. 使用 Kafka Stream 来处理数据

    一旦你的数据存储在 Kafka 中,你就可以用 Kafka Streams 客户端库来处理这些数据,该库适用于 Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与Kafka 服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

    为了给你一个初步的体验,这里是如何实现流行的 WordCount 算法的:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka 流演示例子和应用开发教程展示了如何从头到尾编码和运行这样一个流式应用。

3.8. 停止 Kafka

现在你已经完成了快速入门,可以随时卸载 Kafka 环境了,或者继续玩下去。

  1. 使用 Ctrl-C 停止生产者和消费者客户端。
  2. 使用 Ctrl-C 停止 Kafka Broker。
  3. 最后,用 Ctrl-C 停止 ZooKeeper。

如果你还想删除你的本地 Kafka 环境的数据,包括你创建的消息,运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

四. 以系统服务方式启动 Kafka

4.1. Linux 系统

4.1.1. 创建服务文件

创建 /usr/lib/systemd/system/zookeeper.service 并写入:

[Unit]
Requires=network.target
After=network.target
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
ExecStop=/usr/local/kafka/bin/zookeeper-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target

创建 /usr/lib/systemd/system/kafka.service 并写入:

[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
LimitNOFILE=1048576
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties 
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=Always
[Install]
WantedBy=multi-user.target

注意:示例的 Kafka 安装地址在 /usr/local/kafka。

4.1.2. 启动服务

重载系统服务并启动:

systemctl daemon-reload
systemctl enable zookeeper && systemctl enable kafka
systemctl start zookeeper && systemctl start kafka
systemctl status zookeeper && systemctl status kafka

4.2. Windows 系统

4.2.1. 准备工作

下载 Kafka:https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

解压 Kafka 至 D:\bigdata\kafka_2.13-3.6.1

下载 instsrv.exe / srvany.exe

将 instsrv.exe / srvany.exe 拷贝至 D:\bigdata\kafka_2.13-3.6.1\bin\windows

4.2.2. 注册服务

以管理员身份打开 cmd:

cd D:\bigdata\kafka_2.13-3.6.1\bin\windows

instsrv KafkaService D:\bigdata\kafka_2.13-3.6.1\bin\windows\srvany.exe

卸载服务:

instsrv KafkaService remove
或
sc delete KafkaService

4.2.3. 编辑注册表

打开注册表(运行窗口中输入 regedit):

  1. 定位到以下路径:HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\KafkaService
  2. 如果该服务名下没有 Parameters 项目,则对服务名称项目右击新建项,名称为 Parameters;
  3. 定位到 Parameters 项,新建以下几个字符串值:
    1. 名称 Application 值为你要作为服务运行的 BAT 文件地址:D:\bigdata\kafka_2.13-3.6.1\bin\windows\kafka-server-start.bat;
    2. 名称 AppDirectory 值为你要作为服务运行的 BAT 文件所在文件夹路径:D:\bigdata\kafka_2.13-3.6.1\bin\windows;
    3. 名称 AppParameters 值为你要作为服务运行的 BAT 文件启动所需要的参数:D:\bigdata\kafka_2.13-3.6.1\config\server.properties。

4.2.4. 启动并测试

做完以上3步,启动服务即可。

测试 Kafka:

1. 创建 Topic

打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:

cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2. 创建生产者 Producer

打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:

cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test

3. 创建消费者

打开一个命令行窗口,进入到目录 D:\bigdata\kafka_2.13-3.6.1\bin\windows:

cd D:\bigdata\kafka_2.13-3.6.1\bin\windows
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

    在生产者命令行窗口内随便输入一段字符,然后回车,你应该能看到同样的消息出现在消费者的命令行窗口内,如果在消费者端能看到你推送的消息,那么你已经成功的安装了 Kafka。

4.3. 常用命令

1. 列举 Topic:

kafka-topics.bat --list --zookeeper localhost:2181

2. 描述 Topic:

kafka-topics.bat --describe --zookeeper localhost:2181 --topic [Topic Name]

3. 从头消费消息:

kafka-console-consumer.bat --zookeeper localhost:2181 --topic [Topic Name] --from-beginning

4. 删除 Topic:

kafka-run-class.bat kafka.admin.TopicCommand --delete --topic [topic_to_delete] --zookeeper localhost:2181

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1441013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python中HTTP隧道的基本原理与实现

HTTP隧道是一种允许客户端和服务器之间通过中间代理进行通信的技术。这种隧道技术允许代理服务器转发客户端和服务器之间的所有HTTP请求和响应&#xff0c;而不需要对请求或响应内容进行任何处理或解析。Python提供了强大的网络编程能力&#xff0c;可以使用标准库中的socket和…

疑似针对安全研究人员的窃密与勒索

前言 笔者在某国外开源样本沙箱平台闲逛的时候&#xff0c;发现了一个有趣的样本&#xff0c;该样本伪装成安全研究人员经常使用的某个渗透测试工具的破解版压缩包&#xff0c;对安全研究人员进行窃密与勒索双重攻击&#xff0c;这种双重攻击的方式也是勒索病毒黑客组织常用的…

攻防世界 CTF Web方向 引导模式-难度1 —— 1-10题 wp精讲

目录 view_source robots backup cookie disabled_button get_post weak_auth simple_php Training-WWW-Robots view_source 题目描述: X老师让小宁同学查看一个网页的源代码&#xff0c;但小宁同学发现鼠标右键好像不管用了。 不能按右键&#xff0c;按F12 robots …

springboot+vue居民小区设备报修系统

小区报修系统可以提高设施维护的效率&#xff0c;减少机构的人力物力成本&#xff0c;并使得维修人员可以更好地了解维护设备的情况&#xff0c;及时解决问题。 对于用户来说&#xff0c;报修系统也方便用户的维修请求和沟通&#xff0c;提高了用户的满意度和信任。其次小区报修…

CTFshow web(命令执行 41-44)

web41 <?php /* # -*- coding: utf-8 -*- # Author: 羽 # Date: 2020-09-05 20:31:22 # Last Modified by: h1xa # Last Modified time: 2020-09-05 22:40:07 # email: 1341963450qq.com # link: https://ctf.show */ if(isset($_POST[c])){ $c $_POST[c]; if(!p…

ubuntu原始套接字多线程负载均衡

原始套接字多线程负载均衡是一种在网络编程中常见的技术&#xff0c;特别是在高性能网络应用或网络安全工具中。这种技术允许应用程序在多个线程之间有效地分配和处理网络流量&#xff0c;提高系统的并发性能。以下是关于原始套接字多线程负载均衡技术的一些介绍&#xff1a; …

【已解决】:pip is configured with locations that require TLS/SSL

在使用pip进行软件包安装的时候出现问题&#xff1a; WARNING: pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available. 解决&#xff1a; mkdir -p ~/.pip vim ~/.pip/pip.conf然后输入内容&#xff1a; [global] ind…

vue3 之 商城项目—详情页

整体认识 路由配置 准备组件模版 <script setup></script><template><div class"xtx-goods-page"><div class"container"><div class"bread-container"><el-breadcrumb separator">">&…

C++进阶(十三)异常

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、C语言传统的处理错误的方式二、C异常概念三、异常的使用1、异常的抛出和捕获2、异常的重新…

Vue.js2+Cesium1.103.0 十五、绘制视锥,并可实时调整视锥姿态

Vue.js2Cesium1.103.0 十五、绘制视锥&#xff0c;并可实时调整视锥姿态 Demo <template><divid"cesium-container"style"width: 100%; height: 100%;"/> </template><script> /* eslint-disable no-undef */ /* eslint-disable …

Flink从入门到实践(一):Flink入门、Flink部署

文章目录 系列文章索引一、快速上手1、导包2、求词频demo&#xff08;1&#xff09;要读取的数据&#xff08;2&#xff09;demo1&#xff1a;批处理&#xff08;离线处理&#xff09;&#xff08;3&#xff09;demo2 - lambda优化&#xff1a;批处理&#xff08;离线处理&…

深入解析Linux中HTTP代理的工作原理

亲爱的Linux探险家们&#xff0c;准备好一起探索HTTP代理背后的神秘面纱了吗&#xff1f;在这个数字世界里&#xff0c;HTTP代理就像是一个神秘的中间人&#xff0c;默默地在你和互联网之间穿梭&#xff0c;为你传递信息。那么&#xff0c;这个神秘的中间人到底是如何工作的呢&…

github拉取项目,pycharm配置远程服务器环境

拉取项目 从github上拉取项目到pycharmpycharm右下角选择远程服务器上的环境 2.1. 如图 2.2. 输入远程服务器的host&#xff0c;port&#xff0c;username&#xff0c;password连接 2.3. 选择服务器上的环境 链接第3点 注&#xff1a;如果服务器上环境不存在&#xff0c;先创建…

【动态规划】【C++算法】2188. 完成比赛的最少时间

作者推荐 【动态规划】【前缀和】【C算法】LCP 57. 打地鼠 本文涉及知识点 动态规划汇总 LeetCode2188. 完成比赛的最少时间 给你一个下标从 0 开始的二维整数数组 tires &#xff0c;其中 tires[i] [fi, ri] 表示第 i 种轮胎如果连续使用&#xff0c;第 x 圈需要耗时 fi…

微策略爆买3万枚!

号外&#xff1a;教链内参2.7《内参&#xff1a;美股续创新高》 日前&#xff0c;因囤积比特币&#xff08;BTC&#xff09;而知名的美股上市公司微策略&#xff08;Microstrategy&#xff09;放出了2023四季度财报。看了这篇财报&#xff0c;才发现在2024年初现货ETF通过之前&…

【分布式】雪花算法学习笔记

雪花算法学习笔记 来源 https://pdai.tech/md/algorithm/alg-domain-id-snowflake.html概述 雪花算法是推特开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将64位分割成多个部分&#xff0c;每一个部分代表不同的含义&#xff0c;这种就是将64位划分成不同的段&…

【JavaWeb】头条新闻项目实现 基本增删改查 分页查询 登录注册校验 业务功能实现 第二期

文章目录 一、为什么使用token口令二、登录注册功能2.1 登录表单提交后端代码&#xff1a; 2.2 根据token获取完整用户信息代码实现&#xff1a; 2.3 注册时用户名占用校验代码实现&#xff1a; 2.4 注册表单提交代码实现&#xff1a; 三、头条首页功能3.1 查询所有头条分类3.2…

免费生成ios证书的方法(无需mac电脑)

使用hbuilderx的uniapp框架开发移动端程序很方便&#xff0c;可以很方便地开发出移动端的小程序和app。但是打包ios版本的app的时候却很麻烦&#xff0c;官方提供的教程需要使用mac电脑来生成证书&#xff0c;但是mac电脑却不便宜&#xff0c;一般的型号都差不多上万。 因此&a…

网络学习:数据链路层VLAN原理和配置

一、简介&#xff1a; VLAN又称为虚拟局域网&#xff0c;它是用来将使用路由器的网络分割成多个虚拟局域网&#xff0c;起到隔离广播域的作用&#xff0c;一个VLAN通常对应一个IP网段&#xff0c;不同VLAN通常规划到不同IP网段。划分VLAN可以提高网络的通讯质量和安全性。 二、…

彻底学会系列:一、机器学习之线性回归(一)

1.基本概念(basic concept) 线性回归&#xff1a; 有监督学习的一种算法。主要关注多个因变量和一个目标变量之间的关系。 因变量&#xff1a; 影响目标变量的因素&#xff1a; X 1 , X 2 . . . X_1, X_2... X1​,X2​... &#xff0c;连续值或离散值。 目标变量&#xff1a; …