Kafka-Windows搭建全流程(环境,安装包,编译,消费案例,远程连接,服务自启,可视化工具)

news2024/10/26 20:22:19

目录

一. Kafka安装包获取

  1. 官网地址 

  2. 百度网盘链接

二. 环境要求

1. Java 运行环境

(1) 对 java 环境变量进行配置

(2) 下载完毕之后进行解压

三. 启动Zookeeper

四. 启动Kafka

(1)  修改Conf下的server.properties文件,修改kafka的日志文件路径

(2)  Kafka开启远程连接

五. kafka-windwos 自启动脚本

六. python-demo生产者消费者案例

       (1)生产者

       (2)消费者

七. Docker搭建kafka-map (数据可视化)

八. 备注


Kafka是一款流行分布式消息分布订阅系统,除Kafka之外还有MQ、Redis等。我们可以把消息队列视为一个管道,管道的两端分别是消息生产者(producer)和消息消费者(consumer),消息生产者产生日志等消息后可以发送到管道中,这时消息队列可以驻留在内存或者磁盘上,直到消费者来把它读走为止。

上述就是Kafka的一个概括,我们只需要了解一下Kafka的架构和一些专业术语即可,下面就来介绍一下Kafka 中一些专业术语。

Producer:消息生产者,负责把产生的消息发送到Kafka服务器上。

Consumer:消息消费者,从Kafka服务器读取消息。

Consumer Group:消费者群组,每个消息消费者可以划分为一个特定的群组。

Topic:这是Kafka使用中非常重要的一个术语,它相当于消息的"身份标识",消息生产者产生消息时会给它贴上一个Topic标签,当消息消费者需要读取消息时,可以根据这个Topic读取特定的数据。

Broker:Kafka集群中包含的服务器。


一. Kafka安装包获取

  1. 官网地址 

Apache Kafkaicon-default.png?t=O83Ahttps://kafka.apache.org/downloads

     下载版本为kafka_2.13-3.5.0

  2. 百度网盘链接

百度网盘 请输入提取码 (baidu.com)icon-default.png?t=O83Ahttps://pan.baidu.com/share/init?surl=qD06L8_OLbe7NFmQI3Ja6g&pwd=2024

二. 环境要求


在安装 Kafka 之前,需要满足以下环境要求:

1. Java 运行环境

Kafka 是使用 Java 语言编写的,因此需要在安装 Kafka 之前先安装 Java 运行环境。Kafka 支持 Java 8 及以上版本。可以通过以下命令检查 Java 运行环境的版本:

java -version
(1) 对 java 环境变量进行配置

(2) 下载完毕之后进行解压

     因为Kafka的运行依赖于 Zookeeper,所以还需要下并安装Zookeeper,ZooKeeper和Kafka版本之间有一定的对应关系,不同版本的ZooKeeper和Kafka可以相互兼容,但需要满足一定的条件。
     Kafka 2.2.0 开始支持使用内置的ZooKeeper替代外部ZooKeeper。 所以3.5.0是不需要安装Zookeeper的,直接解压即可。

三. 启动Zookeeper


     因为Kafka中的Broker注册,Topic注册,以及负载均衡都是在Zookeeper中管理,所以需要先启动内置的Zookeeper

     打开Conf文件下的zookeeper.properties文件,修改dataDir目录路径

dataDir=D:\Kafka\kafka_2.13-3.5.0\zookeeperData

执行启动Zookeeper命令

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

当看到绑定到IP地址为0.0.0.0、端口号为2181的地址,表示ZooKeeper服务器监听在该地址,启动成功

四. 启动Kafka


(1)  修改Conf下的server.properties文件,修改kafka的日志文件路径

 (2)  Kafka开启远程连接

新开一个命令行窗口,在之前的目录中输入启动命令

.\bin\windows\kafka-server-start.bat .\config\server.properties

 

五. kafka-windwos 自启动脚本

@echo off

REM 设置绝对路径
set KAFKA_HOME=D:\Kafka\kafka_2.13-3.5.0
set ZOOKEEPER_CONFIG=%KAFKA_HOME%\config\zookeeper.properties
set KAFKA_CONFIG=%KAFKA_HOME%\config\server.properties

REM 删除 kafka_log 目录及其内容
if exist %KAFKA_HOME%\kafka_log (
    echo 删除 kafka_log 目录...
    rmdir /s /q %KAFKA_HOME%\kafka_log
)

REM 删除 zookeeperData 目录
if exist %KAFKA_HOME%\bin\windows\Kafkakafka_2.13-3.5.0zookeeperData (
    echo 删除 Kafkakafka_2.13-3.5.0zookeeperData 目录...
    rmdir /s /q %KAFKA_HOME%\bin\windows\Kafkakafka_2.13-3.5.0zookeeperData
)

REM 启动Zookeeper
cd /d %KAFKA_HOME%\bin\windows
start /b zookeeper-server-start.bat %ZOOKEEPER_CONFIG%

REM 等待Zookeeper启动
timeout /t 10 /nobreak

REM 检查 meta.properties 文件
if exist %KAFKA_HOME%\kafka_log\meta.properties (
    echo "Meta properties already exist. Starting Kafka server..."
) else (
    echo "Meta properties not found. Starting Kafka server for the first time..."
)

REM 启动Kafka服务器
start /b kafka-server-start.bat %KAFKA_CONFIG%

exit

 将脚本后缀改为bat,放到windwos自启中

六. python-demo生产者消费者案例

(运行如下代码可以对kafka数据进行推送与拉取)

   (1)生产者

from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers='192.168.14.93:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# kafka 队列生产者
def producer_demo():
    # 配置Kafka生产者
    producer = KafkaProducer(bootstrap_servers='192.168.14.93:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    # 发送消息到指定的topic
    producer.send('my-topic', {'data_time':'2024-09-10 17:06:12',
                               'data_image':'http://192.168.7.51:9001/var/model_algorithm_package/output/17/2024-09-10-17-06-11.jpg',
                               'data_result':
                                   "['[323, 356, 437, 665, 0, [226, 255, 0], 0.5512826]', '[1514, 238, 1619, 447, 0, [226, 255, 0], 0.5094569]']"
                               })

    # 关闭生产者连接
    producer.close()

if __name__ == '__main__':
    while 1:
        producer_demo()




  (2)消费者

import json
from kafka import KafkaConsumer

lisr = []

# kafka 队列消费者
def consumer_demo():
    # 配置Kafka消费者
    consumer = KafkaConsumer('my-topic',
                         bootstrap_servers='192.168.14.93:9092',
                         # auto_offset_reset='latest',  # 从最新消息开始消费
                         auto_offset_reset='earliest',  # 从最新消息开始消费
                         enable_auto_commit=True,      # 自动提交offset
                         group_id='my-consumer-group')  # 指定消费者组

    # 消费消息
    for message in consumer:
        res = message.value.decode('utf-8')
        try:
            # 解析 JSON 字符串为 Python 字典
            parsed_dict = json.loads(res)
            # 打印解析后的字典
            print(parsed_dict)
        except Exception as e:
            print(f"Error: {e}, 消息内容: {res}")


if __name__ == '__main__':
    consumer_demo()

七. Docker搭建kafka-map (数据可视化)

docker run -d --name kafka-map \
    -p 9001:8080 \
    -v /opt/kafka-map/data:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=admin \
    --restart always dushixiang/kafka-map:latest

登录端口地址为9001

可以对相应分区数据进行拉取

八. 备注

Kafka定位为分布式消息发布-订阅系统,提及分布式就可以想象,只有当在多节点环境下才能最大的发挥它的价值,前面所介绍的Kafka配置方式都是基于单节点的配置,由于本文主要是为了系统的梳理一下Kafka的配置及使用,因此对于配置这一块不再花费大的篇幅去详细介绍,如果需要到多节点配置Kafka可以自行查阅其他资料。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2224164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件分享丨Marktext 编辑器

Marktext是一款开源免费的Markdown编辑器,它具有简洁优雅的界面设计和强大的功能,支持多种Markdown语法,包括表格、流程图、甘特图、数学公式、代码高亮等。Marktext还支持导出HTML和PDF格式的文档,非常适合需要编写Markdown文档的…

sersync实时同步部署案例

目录 sersync介绍 案例信息 操作步骤 服务端部署 客户端部署 创建存储目录 安装sersync 修改配置文件 启动服务 停止服务 测试 sersync介绍 sersync是一个基于inotifyrsync的实时文件同步工具,通过监控目录的变动达到实时同步的目的。 案例信息 拓扑…

【微软商店平台】如何将exe打包上传微软商店

打开微软合作者中心:https://partner.microsoft.com/en-us/dashboard/home点击App and Games板块可以创建项目。 3. 重新生成包含私钥的自签名证书 运行以下命令,确保生成的证书包含私钥: New-SelfSignedCertificate -Type CodeSigning -Su…

Git的初次使用

一、下载git 找淘宝的镜像去下载比较快 点击这里 二、配置git 1.打开git命令框 2.设置配置 git config --global user.name "你的用名"git config --global user.email "你的邮箱qq.com" 3.制作本地仓库 新建一个文件夹即可,然后在文件夹…

从零开始:构建一个高效的开源管理系统——使用 React 和 Ruoyi-Vue-Plus 的实战指南

✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…

QT界面开发:图形化设计、资源文件添加

设计界面介绍 此时我们创建项目时就可以选择添加UI选项了。 添加完之后,我们可以看到,文件中多出了一个存放界面文件的目录,下面有个.ui的界面文件。甚至pro的项目文件中也会添加一项内容。 我们点击界面文件中的.ui文件,我们可以…

mono源码交叉编译 linux arm arm64全过程

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github:codetoys,所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的,可以在任何平台上使用。 源码指引:github源…

有道在线翻译外,这三款翻译工具值得一试!

在众多翻译工具中,有道在线翻译是很多小伙伴都会用的。而市场上当然也有很多好用的翻译工具,这里就来给大家介绍几个! 福昕在线翻译 直达链接: fanyi.pdf365.cn/ 操作教程:立即获取 这也是一款在线翻译工具。它以…

解决 VScode 每次打开都是上次打开的文件问题

每次使用 VScode 打开总是上次的文件,可以简单设置即可,记录一下。 VScode Visual Studio Code(简称VSCode)是一个由微软开发的免费、开源的代码编辑器。它支持多种编程语言,并提供了代码高亮、智能代码补全、代码重构…

【分布式知识】分布式对象存储组件-Minio

文章目录 什么是minio核心特点:使用场景:开发者工具:社区和支持: 核心概念什么是对象存储?MinIO 如何确定对对象的访问权限?我可以在存储桶内按文件夹结构组织对象吗?如何备份和恢复 MinIO 上的…

【力扣 + 牛客 | SQL题 | 每日4题】牛客大厂笔试真题SQLW6, W7, W8

1. 牛客大厂笔试真题SQLW6:统计所有课程参加培训人次 1.1 题目: 描述 某公司员工培训信息数据如下: 员工培训信息表cultivate_tb(info_id-信息id,staff_id-员工id,course-培训课程),如下所示: 注:该公…

不依赖F8键,如何快速进入电脑安全模式

如果不希望通过狂按F8键进入安全模式,可以尝试以下几种方法来进入电脑的安全模式: 方法一:使用Ctrl键 关闭电脑并重新启动。在电脑启动过程中,按下Ctrl键(需要在Windows系统启动画面出现之前就开始按)。当…

基于Django+Python的房屋信息可视化及价格预测系统设计与实现(带文档)

项目运行 需要先安装Python的相关依赖:pymysql,Django3.2.8,pillow 使用pip install 安装 第一步:创建数据库 第二步:执行SQL语句,.sql文件,运行该文件中的SQL语句 第三步:修改源…

Linux: network: wireshark IO图的一个问题

今天遇到一个问题,发现wireshark画的IO图,前几秒没有数据,但是根据Raw的pcap看,是有包的,这就迷惑了。 经同事提醒,这个IO在设置了多个画图filter的时候,可能导致开始前几秒没有输出。如下图 这…

Java中Thread类的基本认识与使用(如果想知道Java中有关Thread类的基本知识,那么只看这一篇就足够了!)

前言:在Java中,Thread类是实现多线程编程的核心。它允许程序同时执行多个任务,提高应用程序的响应能力和性能。通过Thread类,开发者可以轻松创建和管理线程,并实现复杂的并发操作。接下来,我们将探讨Thread…

Java中自增自减,赋值,逻辑,三元运算符

自增自减运算符 在某个变量前面或者后面加一--在某个变量前面或者后面减一 可以看见,当a输出时,a是没有变化的,说明如果是在变量后就是先使用再增加,而b输出时,b增加了1,说明如果是在变量前面就是先增加再…

机器人转人工时,开启实时质检(mod_cti基于FreeSWITCH)

文章目录 前言联系我们实现步骤1. 修改拨号方案2. 启用拨号方案 前言 在客户与机器人对话中,是不能开启质检功能的。因为机器人识别会与质检识别产生冲突。如果用户想通过机器人转接到人工时,开启质检功能,记录客户与人工之间的对话。应该如…

《a16z : 2024 年加密货币现状报告》解析

加密社 原文链接:State of Crypto 2024 - a16z crypto译者:AI翻译官,校对:翻译小组 当我们两年前第一次发布年度加密状态报告的时候,情况跟现在很不一样。那时候,加密货币还没成为政策制定者关心的大事。 比…

Python Numpy 实现神经网络自动训练:反向传播与激活函数的应用详解

Python Numpy 实现神经网络自动训练:反向传播与激活函数的应用详解 这篇文章介绍了如何使用 Python 的 Numpy 库来实现神经网络的自动训练,重点展示了反向传播算法和激活函数的应用。反向传播是神经网络训练的核心,能够通过计算梯度来优化模…

unity项目导出安卓工程后,在AndroidStudio打包报错:unityLibrary:BuildIl2CppTask‘.

下面这个是我在unity开发者社区提问后,他们回答得: 解决方案:我这边按照这几个方案检查了下,NDK和JDK都没问题,最后重启电脑才解决的,应该是文件被锁定了,我用的windows系统的。 验证&#xff…