kafka 集群 ZooKeeper 模式搭建

news2025/1/10 23:29:26

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:Apache Kafka

关于ZooKeeper的弃用

根据 Kafka官网信息,随着Apache Kafka 3.5版本的发布,Zookeeper现已被标记为已弃用。未来计划在Apache Kafka(4.0版)的下一个主要版本中删除ZooKeeper,该版本最快将于2024年4月发布。在弃用阶段,ZooKeeper仍然支持用于Kafka集群元数据的管理,但不建议用于新的部署。新的部署方式使用 KRaft 模式,KRaft 模式部署可以看笔者的文章《kafka 集群 KRaft 模式搭建》,考虑到一些公司仍然在使用老版本的 Kafka,故笔者写这篇文章记录 Kafka 集群Zookeeper 模式搭建

官网信息截图

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、启动 Kafka 集群

4、关闭 Kafka 集群

5、使用Kafka 可视化工具查看

6、测试Kafka集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

3.6.0 版本需要至少 java8 及以上版本,笔者使用的是 java8 版本

关于 linux 安装 java,没安装过的朋友可以参考《linux 系统安装 jdk》

下载完成

将 kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config

编辑配置文件 server.properties

vi server.properties

配置 broker.id,advertised.listeners,zookeeper.connect

broker.id 每个节点的id

advertised.listeners 本机的外网访问地址

zookeeper.connect zookeeper 地址

192.168.3.232 节点配置

advertised.listeners 笔者配置为本机地址

192.168.2.90 节点

192.168.2.11 节点

笔者zookeeper 地址是 192.168.2.130:2181

zookeeper 版本是3.8.3

关于zookeeper单机安装和集群安装可以参考:《Linux环境 安装 zookeeper》《windows环境 安装 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》

3、启动 Kafka 集群

首先启动 zookeeper

然后在3台机器上依次启动 Kafka

进入 kafka 目录

cd /usr/local/kafka/kafka_2.13-3.6.0

下面2个命令皆可

bin/kafka-server-start.sh config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

4、关闭 Kafka 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

5、使用Kafka 可视化工具查看

下载地址:https://www.kafkatool.com/download.html

运行效果

6、测试Kafka集群

新建 maven 项目,添加 Kafka 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.wsjzzcbq</groupId>
    <artifactId>kafka-learn</artifactId>
    <version>1.0-SNAPSHOT</version>
 
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
/**
 * Demo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ProducerDemo {
 
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
        //配置序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
 
        Producer<String, String> producer = new KafkaProducer<>(properties);
 
        //topic 名称是demo_topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println(recordMetadata.topic());
        System.out.println(recordMetadata.partition());
        System.out.println(recordMetadata.offset());
 
    }
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
 
/**
 * ConsumerDemo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ConsumerDemo {
 
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
 
        // 消费分组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
        // 序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        // 消费者订阅主题
        consumer.subscribe(Arrays.asList("demo_topic"));
 
        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records) {
                System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

运行测试

效果图

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1275061.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

看这里!精确的用户画像是如何一步步构建的?

1. 用户画像的定义 用户画像是指根据用户的个人信息、行为特征和偏好等数据来描绘和分析用户的一种方法。它是通过收集和分析用户的各种数据&#xff0c;以便更好地了解用户需求和行为模式&#xff0c;从而为企业提供个性化、精准化的产品和服务。 2. 构建用户画像的步骤 &…

If和else的紧缩版本(Python)

epsilon_max 3 epsilon 2 epsilon_increment 1 epsilon epsilon * (1 epsilon_increment) if epsilon < epsilon_max else epsilon_max print(epsilon)

读书笔记-《数据结构与算法》-摘要1[数据结构]

文章目录 [数据结构]1. String - 字符串2. Linked List - 链表2.1 链表的基本操作2.1.1 反转链表单向链表双向链表 2.1.2 删除链表中的某个节点2.1.3 链表指针的鲁棒性2.1.4 快慢指针 3. Binary Tree - 二叉树3.1 树的遍历3.2 Binary Search Tree - 二叉查找树 4. Queue - 队列…

训练 CNN 对 CIFAR-10 数据中的图像进行分类-keras实现

1. 加载 CIFAR-10 数据库 import keras from keras.datasets import cifar10# 加载预先处理的训练数据和测试数据 (x_train, y_train), (x_test, y_test) cifar10.load_data() 2. 可视化前 24 个训练图像 import numpy as np import matplotlib.pyplot as plt %matplotlib …

桶装水订水送水小程序具备以下主要功能

桶装水订水送水小程序具备以下主要功能&#xff1a; 对比传统的电话订水&#xff0c;订水小程序展现出显著的优势&#xff1a; 1. 便捷性&#xff1a;用户通过小程序就能轻松预订水桶&#xff0c;无需亲自出门&#xff0c;极大提升了生活的便捷度。 2. 即时性&#xff1a;送水…

element-ui表格滚动效果,el-table滚动条样式重置

项目首页需要展示一个表格滚动区域&#xff0c;特此来记录一下 HTML <div class"table-box" mouseenter"mouseenter" mouseleave"mouseleave"><el-table :data"tableList" border height"400px" v-loading"…

2023_Spark_实验二十四:Kafka集群环境搭建

Kafka集群环境搭建 一、环境说明 二、安装步骤 一、环境说明 目前的Kafka版本还是需要借助zookeeper来存储cluster、brokers、consumer等相关元信息&#xff0c;在当前版本即 在本案例中&#xff0c;我们采用了外部的zookeeper&#xff0c;即搭建了三节点的集群zookeeper环境…

Python Scrapy分布式爬虫

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在当今信息爆炸的时代&#xff0c;获取大规模数据对于许多应用至关重要。而分布式爬虫作为一种强大的工具&#xff0c;在处理大量数据采集和高效爬取方面展现了卓越的能力。 本文将深入探讨分布式爬虫的实际应用…

最新Graphviz python安装教程及使用

文章目录 Graphviz 安装python安装graphviz库 Graphviz 安装 Graphviz是一个独立的软件&#xff0c;在用python的pip下载之前&#xff0c;需要先下载软件。 网址&#xff1a;https://graphviz.org/download/ 找到合适的版本进行下载安装。记住自己的安装位置&#xff0c;完…

如何通过数据文化加速企业管理的转型升级?

#01 企业管理 更需要“转型升级” 中国企业管理在某种程度上来看&#xff0c;受到中国传统文化、社会价值观及现代化趋势等多方面影响的结果&#xff0c;比如说&#xff0c;中国传统文化强调长期思考和计划&#xff0c;这在企业管理中体现为对长期业务发展和可持续的关注。但…

小白备战蓝桥杯:Java常用API

一、什么是API 就是别人写好的一些类&#xff0c;给咱们程序员直接拿去调用即可解决问题的 我们之前接触过的Scanner和Random都是API 但java中提供的API很多&#xff0c;我们没有必要去学习所有的API&#xff0c;只需要知道一些常用的API&#xff0c;再借助帮助文档去使用AP…

蓝桥杯每日一题2023.12.1

题目描述 蓝桥杯大赛历届真题 - C 语言 B 组 - 蓝桥云课 (lanqiao.cn) 题目分析 对于此题目而言思路较为重要&#xff0c;实际可以转化为求两个数字对应的操作&#xff0c;输出最前面的数字即可 #include<bits/stdc.h> using namespace std; int main() {for(int i 1…

【前缀和]LeetCode1862:向下取整数对和

本文涉及的基础知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 给你一个整数数组 nums &#xff0c;请你返回所有下标对 0 < i, j < nums.length 的 …

steam搬砖项目怎么赚钱?附详细拆解教程

互联网上的赚钱项目如林&#xff0c;为何他人能斩获成果&#xff0c;你却只能望洋兴叹&#xff1f;归根结底&#xff0c;还是因为你的心态过于急功近利。今天&#xff0c;我要为你揭示的Steam实体项目&#xff0c;是专门为热门竞技游戏CS:GO量身打造的。CS:GO&#xff0c;这款风…

自动机器学习AutoML

自动机器学习AutoML AutoMLAuto-SklearnAutoKerasAutoGluonGoogle AutoMLAzure自动机器学习 AutoML 模型的选择和超参数的调节等等任务对于机器学习算法的开发者来说是一件繁琐的工作&#xff0c;为了使得机器可以自动地设计模型并调优&#xff0c;自动机器学习AutoML便应运而…

打破语言障碍:跨境电商中的多语言营销策略

随着全球市场的不断扩大&#xff0c;跨境电商成为企业拓展国际业务的重要途径。然而&#xff0c;语言障碍往往成为企业在跨境电商中面临的挑战之一。为了打破这一障碍&#xff0c;实现全球市场的可持续发展&#xff0c;多语言营销策略变得至关重要。 多语言市场的挑战 在跨境电…

春秋云境:CVE-2022-32991(sql注入)

靶标介绍&#xff1a; 该CMS的welcome.php中存在SQL注入攻击。 获取登录地址 http://eci-2zeb0096que0556y47vq.cloudeci1.ichunqiu.com:80 登录注册 注册成功登录进入注册接口 参数接口一 发现接口参数q http://eci-2zeb0096que0556y47vq.cloudeci1.ichunqiu.com/welcome.p…

使用visual Studio MFC 平台实现对灰度图添加椒盐噪声,并进行均值滤波与中值滤波

平滑处理–滤波 本文使用visual Studio MFC 平台实现对灰度图添加椒盐噪声&#xff0c;并进行均值滤波与中值滤波 关于其他MFC单文档工程可参考 01-Visual Studio 使用MFC 单文档工程绘制单一颜色直线和绘制渐变颜色的直线 02-visual Studio MFC 绘制单一颜色三角形、渐变颜色边…

【openssl】Window系统如何编译openssl

本文主要记录如何编译出windows版本的openss的lib库 1.下载openssl&#xff0c;获得openssl-master.zip。 a.可以通过github&#xff08;网址在下方&#xff09;上下载最新的代码、今天是2023.12.1我用的master版本&#xff0c;下载之后恭喜大侠获得《openssl-master.zip》 …

iPhone苹果手机如何将词令网页添加到苹果iPhone手机桌面快捷打开?

iPhone苹果手机如何将词令网页添加到苹果iPhone手机桌面快捷打开&#xff1f; 1、在iPhone苹果手机上找到「Safari浏览器」,并点击打开&#xff1b; 2、打开Safari浏览器后&#xff0c;输入词令官方网站地址&#xff1a;ciling.cn ; 3、打开词令官网后&#xff0c;点击Safari…