Linux 搭建 Kafka 环境 - 详细教程

news2025/1/11 2:36:50

目录

一. Kafka介绍

1. 应用场景

2. 版本对比

二. Kafka安装

1. 前置环境

(1)安装JDK

2. 软件安装

(3)环境变量配置

(3)服务启动

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

(3)创建一个新的主题

(3)删除主题

(4)描述主题

(5)启动生产者

(6)启动消费者

四. 注册系统服务

1. Systemd服务配置

2. Kafka服务控制


一. Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

1. 应用场景

Kafka可以看作是一个能够处理消息队列的中间件,适用于实时的流数据处理,主要用于平衡好生产者和消费者之间的关系。

  • 生产者

生产者可以看作是数据源,可以来自于日志采集框架,如Flume,也可以来自于其它的流数据服务。当接收到数据后,将根据预设的Topic暂存在Kafka中等待消费。对于接收到的数据将会有额外的标记,用于记录数据的被消费【使用】情况。

  • 消费者

消费者即数据的使用端,可以是一个持久化的存储结构,如Hadoop,也可以直接接入支持流数据计算的各种框架,如Spark - Streaming。消费者可以有多个,通过订阅不同的Topic来获取数据。

2. 版本对比

Kafka的0.x和1.x可以看作是上古版本了,最近的更新也是几年以前,从目前的场景需求来看,也没有什么特别的理由需要使用到这两个版本了。

  • 2.x

在进行版本选择时,通常需要综合考虑整个数据流所设计到的计算框架和存储结构,来确定开发成本以及兼容性。目前2.x版本同样是一个可以用于生产环境的版本,并且保持着对Scala最新版本的编译更新。

  • 3.x

3.x是目前最新的稳定版,需要注意的是,Kafka的每个大版本之间的差异较大,包括命令参数以及API调用,所以在更换版本前需要做好详细的调查与准备,本文以3.x的安装为例。

二. Kafka安装

解压安装的操作方式可以适用于各种主流Linux操作系统,只需要解决好前置环境问题。

1. 前置环境

此前,运行Kafka需要预先安装Zookeeper。在Kafka 2.8.0版本以后,引入了Kraft(Kafka Raft)模式,可以使Kafka在不依赖外部Zookeeper的前提下运行。除此之外Kafka由Scala语言编写,需要JVM的运行环境。

(1)安装JDK

 Ubuntu/Debian:

sudo apt install openjdk-8-jdk

  CentOS/RedHat:

sudo yum install java-1.8.0-openjdk

安装完成后可以使用java-version命令验证【可省去环境变量配置】。

2. 软件安装

  • 下载Kafka ,链接如下:
# 离线下载安装包
https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz

# 在线利用wget远程下载​
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
  • 解压安装  
tar -zvxf kafka_2.12-3.5.2.tgz

(3)环境变量配置

需要在环境变量中指定Kafka的安装目录以及命令文件所在目录,系统环境变量与用户环境变量配置其中之一即可。

/etc/profile 文件最下方添加如下两行命令--配置全局环境。

export KAFKA_HOME=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2
export PATH=$PATH:$KAFKA_HOME/bin

在文件结尾添加以上内容后执行source命令,使其立即生效。

source /etc/profile

[Ubuntu/Debian] source ~/.bashrc

[CentOS/RedHat] source ~/.bash_profile

执行后可以输入kafka,然后按Tab尝试补全【需要按多次】,如果出现命令列表则证明配置成功。

(3)服务启动

使用Kraft模式,则需要先进行集群初始化【即使是单个节点】,以下为操作步骤:

  • 目录下创建 kafka-logs文件夹

  • 修改配置文件

修改Kafka的/config/kraft/server.properties文件,更换其中的log.dirs目录指向创建目录,防止默认的/tmp被清空:

log.dirs=/home/ygsj/Config_files/kafka_server/kafka-logs

  • 创建Kafka的集群ID 
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

调用 kafka-storage.sh 生成一个UUID

将获得的 UUID 放到 kafka_2.12-3.5.2/config/kraft/server.properties 文件中 如下:

相同文件内修改:远程连接开启 (红框内写服务器ip)---自己测试0.0.0.0无效

进入到Kafka的家目录后,执行以下命令 

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

# bin/kafka-server-start.sh config/kraft/server.properties

 这种方式并不是后台运行,需要保证终端开启,等测试稳定后可以在后台执行或者注册为系统服务。 

三. Console测试

基础命令

(1)列出Kafka集群中所有存在的主题

kafka-topics.sh --list --bootstrap-server localhost:9092

--bootstrap-server localhost:9092 指定了Kafka集群的连接地址(在这里是本地的Kafka服务器)
如果集群中没有主题,命令不会返回任何内容
当你创建主题后,这条命令会返回集群中存在的主题列表

(3)创建一个新的主题

kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

这条命令用于创建一个名为 my-topic 的新主题。
--create 指定了创建操作。
--topic my-topic 指定了要创建的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。
Created topic my-topic. 表示主题 my-topic 已成功创建。

(3)删除主题

kafka-topics.sh --delete  --topic my-topic --bootstrap-server localhost:9092

--delete: 指定要删除一个主题。
--topic my-topic: 指定要删除的主题名称是 my-topic。
--bootstrap-server localhost:9092: 指定Kafka集群的连接地址(在此是本地的Kafka服务器)。

(4)描述主题

 kafka-topics.sh --describe  --topic my-topic --bootstrap-server localhost:9092

获取指定主题 my-topic 的详细信息。
--describe 指定了描述操作。
--topic my-topic 指定了要描述的主题名称。
--bootstrap-server localhost:9092 指定了Kafka集群的连接地址。

(5)启动生产者

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

启动一个基于console的生产者脚本,可以方便的进行数据输入的测试,直接进行数据输入即可。

(6)启动消费者

 kafka-console-consumer.sh --help  打印所有参数

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

添加from-beginning参数来从头消费数据。

四. 注册系统服务

 为了方便的控制Kafka服务的启动和停止,可以将其注册为系统服务。

1. Systemd服务配置

创建Systemd服务文件

sudo vim /etc/systemd/system/kafka.service

在文件中添加以下内容,需要手动替换ExecStartExecStop中关于路径的部分:

[Unit]
Description=Apache Kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-start.sh /home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/config/kraft/server.properties
ExecStop=/home/ygsj/Config_files/kafka_server/kafka_2.12-3.5.2/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

 重新加载Systemd配置 

sudo systemctl daemon-reload

2. Kafka服务控制

  • 开机自动启动
sudo systemctl enable kafka.service
  • 启动Kafka服务
sudo systemctl start kafka.service
  • 检查Kafka状态 
sudo systemctl status kafka.service

  • 停止Kafka服务
sudo systemctl stop kafka.service
  • 重启Kafka服务
sudo systemctl restart kafka.service

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1889294.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型分布式训练的第四种境界

主要内容包括: \1. 历史背景 \2. 分布式训练挑战 \3. 分布式训练技术体系 \4. 未来挑战 \5. Q&A 01 历史背景 自 2019 年以来,大语言模型发展迅猛,不断有新的研究成果涌现,包括各类预训练模型及其应用,如 LL…

Vue86-Vuex中的getters属性

一、getters的使用 1-1、index.js中getters的书写 计算属性computed靠return获得返回值! 1-2、组件中getters的调用 state是数据源,getters是拿着数据源里的东西进行一番加工。像极了:data和computed 二、小结

从0到1手写vue源码

模版引擎 数组join法(字符串) es6反引号法(模版字符串换行) mustache (小胡子) 引入mustache 模版引擎的使用 mustache.render(templatestr,data)

从零开始学LangChain(3):数据接入层

LangChain 主体分为 6 个模块,分别是对(大语言)模型输入输出的管理、外部数据接入、链的概念、(上下文记忆)存储管理、智能代理以及回调系统,通过文档的组织结构,你可以清晰了解到 LangChain的侧…

视频网关的作用

在数字化时代,视频通信已经成为了人们日常生活和工作中的重要部分。为了满足不同设备和平台之间的视频通信需求,各种视频协议应运而生。然而,这些协议之间的差异使得相互通信变得复杂。因此,视频网关作为一种重要的网络设备&#…

2024年【安全生产监管人员】考试题及安全生产监管人员试题及解析

题库来源:安全生产模拟考试一点通公众号小程序 安全生产监管人员考试题根据新安全生产监管人员考试大纲要求,安全生产模拟考试一点通将安全生产监管人员模拟考试试题进行汇编,组成一套安全生产监管人员全真模拟考试试题,学员可通…

65.Python-web框架-Django-免费模板django-datta-able的admin_datta

目录 1.起源 2.admin_datta admin_datta\urls.py admin_datta\views.py 1.起源 前面有一篇文章介绍了django-datta-able:54.Python-web框架-Django-免费模板django-datta-able_danjon web框架商用免费-CSDN博客 页面是这个样子。 从template\include\sidebar.…

人民数据“数据资产入表高级研修班”正式上线

人民数据与北京龙腾亚太教育咨询有限公司定于8月10日至8月11日联合举办“数据资产入表高级研修班”,欢迎报名。 课程介绍: 01 公共数据的价格形成与价值评估 02 数据资产化实践探索 03《企业数据资源相关会计处理暂行规定》要点解析与入表意义 04 …

干式电抗器与油浸电抗器有什么区别?

干式电抗器和油浸电抗器是电力系统中常用的两种电抗器,它们在结构、性能、应用等方面存在一定的区别。 1. 结构上的区别: 干式电抗器主要由线圈、铁芯和绝缘材料组成,没有油箱,线圈和铁芯直接暴露在空气中。因此,干式…

详解微服务应用灰度发布最佳实践

作者:子丑 本次分享是站在 DevOps 视角的灰度发布实践概述,主要内容包括以下四个方面: 第一,灰度发布要解决的问题; 第二,灰度发布的四种典型场景; 第三,如何把灰度发布融入到应…

【FFmpeg】avcodec_send_frame函数

目录 1.avcodec_send_frame1.1 将输入的frame存入内部buffer(encode_send_frame_internal)1.1.1 frame的引用函数(av_frame_ref )1.1.1.1 帧属性的拷贝(frame_copy_props)1.1.1.2 buffer的引用函数&#xf…

从零开始了解视频直播美颜SDK与主播美颜工具

时下,美颜SDK与主播美颜工具应运而生,为视频直播的画质提升提供了强有力的技术支持。 一、什么是美颜SDK? 美颜SDK是一种软件开发工具包,开发者可以将其集成到自己的应用程序中,以实现实时美颜功能。通过美颜SDK&…

巴西社交APP出海热潮!本土网盟CPI流量助力海外广告引流新方向

巴西社交APP出海热潮!本土网盟CPI流量助力海外广告引流新方向 在巴西这个充满活力与多元文化的国家,社交APP的推广和营销策略需要深入了解和适应其独特的市场环境。本土网盟流量投放作为一种有效的推广手段,正逐渐成为众多企业和开发者关注的…

Flask 中的 Session 和 Cookies、wtforms

Flask 中的 Session 和 Cookies 在构建 web 应用时,管理用户的状态和数据是至关重要的。Flask,作为一个灵活的微型 web 框架,提供了会话(Session)和 Cookies 管理的能力。本文将深入探讨 Flask 中的会话和 Cookies 的…

七项确保AI监管得当的原则

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

Java8环境安装(jdk1.8安装)详细教程

Java 8环境安装(jdk1.8安装)详细教程 Java 8(也称为JDK 1.8),是Oracle公司于2014年3月发布的一个重要的Java语言版本。这个版本自发布以来,因其众多的新特性和改进,被认为是Java语言发展历程中…

宁波职业技术学院DSP实验室建设案例—以及H264编码实验案例简介

宁波职业技术学院:勤信实 宁波职业技术学院是1999年由教育部批准成立的从事高等职业教育的全日制普通高校,2005年被评为全国职业教育先进单位,是国家首批示范性高等职业院校,教育部首批现代学徒制试点院校、浙江省五所重点建设高…

z-index的工作原理

z-index的工作原理 HTML文档中的元素却是存在于三个维度之中。除了大家熟知的平面画布中的x轴和y轴,还有控制第三维度的z轴。 像 margin , float , offset 这些属性,控制着元素在x轴和y轴上的表现形式一样。 z-index 这个属性控制着元素在z轴上的表现形…

一文详解多层感知机(MLP)

文章目录 What(是什么)Where(用在哪)How(怎么用)多层感知机解决分类问题(以minist分类为例)多层感知机解决回归问题多层感知机解决噪声处理的问题 What(是什么) 多层感知机(Multilayer Perceptr…

PyTorch入门笔记

学习参考: PyTorch简单入门视频 深入浅出PyTorch 小土堆笔记 前置知识 AI vs ML vs DL AI(Artificial Intelligence):通过让机器模仿人类进而超越人类ML(Machine Learning):让机器模仿人类的一…