kafka集群搭建-使用zookeeper

news2025/1/17 2:56:31

1.环境准备:

使用如下3台主机搭建zookeeper集群,由于默认的9092客户端连接端口不在本次使用的云服务器开放端口范围内,故端口改为了8093。
172.2.1.69:8093
172.2.1.70:8093
172.2.1.71:8093

2.下载地址

去官网下载,或者使用如下仓库地址下载,本次使用的时kafka_2.13-3.6.1.tgz ,即3.6.1版本,前面的2.13是scala版本,该版本是较新的版本,可以使用zookeeper,也可以不使用zookeeper搭建集群,本次记录使用了zk,zk集群的部署可以参考上一篇记录。

# 软件包下载地址,可以切到/kafka/路径,选择自己需要的版本
https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

3.软件包下载解压

在上面3台服务器上分别执行wget下载,或者本地下载后上传,本次使用的环境为堡垒机接入,如果使用的是宿主机账密登陆,可以下载配置一台,其余使用SCP命令拷贝过去即可。

cd  /usr/local/

wget https://archive.apache.org/dist/kafka/3.6.1/kafka_2.13-3.6.1.tgz

tar -zxvf kafka_2.13-3.6.1.tgz

mv kafka_2.13-3.6.1 kafka

4.修改配置

需要修改logs路径的话,可以在/kafka路径下新建logs路径,并配置到server.properties中,这里使用默认的/tmp/kafka-logs路径。

cd kafka
vim conf/server.properties

修改内容如下,

# 每个节点唯一的id,这里.69、.70、.71服务器分别设置为了1、2、3
broker.id=1

# 默认为9092,云服务器开放端口问题,改为了8093
port=8093

# 上一篇记录博客搭建的zk集群地址
zookeeper.connect=172.2.1.69:8092,172.2.1.70:8092,172.2.1.71:8092

# 配置监听访问、绑定地址,这里都是PLAINTEXT协议,不需要认证(相当于内网访问)
listeners=PLAINTEXT://0.0.0.0:8093                                                                                                 
advertised.listeners=PLAINTEXT://172.2.1.71:8093

# 日志路径
log.dirs=/tmp/kafka-logs

5.启动kafka集群

分别在每个节点的bin路径下执行启动脚本

# 在3个节点分别执行如下命令,-daemon表示后台启动,不带该参数前台启动
./bin/kafka-server-start.sh -daemon config/server.properties

使用jps命令,或者去kafka启动日志,查看kafka是否启动成功。

6.创建topic`

# 创建topic,在任一节点执行都可以。
 ./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --create --topic topic-demo --partitions=3 --replication-factor=3

# 查看topic是否创建成功,在任一节点执行
./bin/kafka-topics.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo --list

7.模拟生产消费消息

需要注意的是网上搜到的一些老的博客kafka命令在高版本中是不再支持的,如:
sh ./bin/kafka-topics.sh --zookeeper=zk集群地址,可能出现命令无法识别:zookeeper is not a recognized option
需要替换为:sh ./bin/kafka-topics.sh --bootstrap-server=kafka集群地址,注意–bootstrap-server后面跟的是kafka集群地址,不是zookeeper地址。

# 在2个节点启动消费者模拟客户端接收消息,在第3个节点启动生产者模拟客户端发送消息
./bin/kafka-console-consumer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo
# 在第3个节点启动生产者客户端模拟发送消息:hello
./bin/kafka-console-producer.sh --bootstrap-server=172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093 --topic topic-demo

生产者客户端发送hello
在这里插入图片描述
此时可以看到2个消费者模拟客户端都受到了消息:hello
在这里插入图片描述

8.集成springboot

坐标如下:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

kafka配置类:

package com.example.kafka.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
//@RefreshScope
public class KafkaConfig {

    @Value("${xxxx:172.2.1.69:8093,172.2.1.70:8093,172.2.1.71:8093}")
    private String kafkaServers;
    
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }
}

消费者客户端:

    @KafkaListener(
            topics = {"topic-demo"},
            groupId = "test1",
            properties = {"auto-offset-reset:latest", "enable.auto.commit:true"}
    )
    public void listen(ConsumerRecord<String, String> consumerRecord) {
        log.info("consumer Received: " + consumerRecord);
    }

生产者发送消息:

@RestController
@RequiredArgsConstructor
@RequestMapping("producer")
public class ProducerController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping(path = "/sendCommonMsg")
    public String sendCommonMsg(String topic, String msg) {
        ListenableFuture<SendResult<String, String>> hello_kafka = this.kafkaTemplate.send(topic, "hello kafka");
        SendResult<String, String> sendResult = hello_kafka.completable().join();
        System.out.println(sendResult);
        return "send topic: " + topic + ", msg: " + msg;
    }
}

发送测试:
在这里插入图片描述

消费者可以接收到消息:

consumer Received: ConsumerRecord(topic = topic-demo, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1721720091071, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)

9.IDEA客户端工具

可以使用kafkalytic工具本地开发环境可视化操作kafka服务器,如查看topic,创建topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1943552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

达梦索引组织表和堆表

达梦数据库默认创建的是索引组织表&#xff0c;‌而Oracle数据库默认创建的是堆表。‌这两种表类型的区别主要体现在数据存储和组织方式上&#xff1a; 索引组织表(‌Index Organized Table, IOT)&#xff1a;‌ 索引组织表‌有且仅有一个聚簇索引键。索引组织表也称“普通表”…

UGUI优化篇--UGUI合批

UGUI合批 UGUI合批规则概述UGUI性能查看工具合批部分的特殊例子一个白色image、蓝色image覆盖了Text&#xff0c;白色image和Text哪个先渲染 Mask合批Mask为什么会产生两个drawcallMask为什么不能合批Mask注意要点 RectMask2D为什么RecMask2D比Mask性能更好主要代码RectMask2D注…

【笔记】学习记录

2024年7月23日 1.图的5中存储方式 2.二叉树的先序&#xff0c;中序&#xff0c;后序遍历。 学了图的存储方式之后&#xff0c;二叉树好像就是小菜一碟一样。注意一下名词的顺序就可以了。 所谓先中后序&#xff0c;就是先根&#xff0c;中根&#xff0c;后根的差别。没有其…

数据库练习-3

查询要求&#xff1a; 查询代码&#xff1a;

Apache2服务介绍

apache2 安装使用配置web访问配置虚拟主机配置代理正向代理反向代理 官网 互联网上排名第一的 HTTP 服务器&#xff0c;Apache HTTP 服务器项目致力于开发和维护适用于现代操作系统&#xff08;包括 UNIX 和 Windows&#xff09;的开源 HTTP 服务器。该项目的目标是提供安全、…

【系列教程之】1、点亮一个LED灯

1、点亮一个LED灯 作者将狼才鲸创建日期2024-07-23 CSDN教程目录地址&#xff1a;【目录】8051汇编与C语言系列教程本Gitee仓库原始地址&#xff1a;才鲸嵌入式/8051_c51_单片机从汇编到C_从Boot到应用实践教程 本源码包含C语言和汇编工程&#xff0c;能直接在电脑中通过Keil…

SQL性能优化秘籍:如何避免计算导致索引失效

适用于MySQL、PostgreSQL、Oracle等各种数据库的优化技巧 问题剖析 设想我们为customer表的c_acctbal列创建了一个B树索引c_acctbal_idx&#xff0c;以加速相关查询。然而&#xff0c;一个看似无害的计算可能会阻碍索引的使用。比如这样的查询&#xff1a; SELECT * FROM cus…

二次元手游《交错战线》游戏拆解

交错战线游戏拆解案 游戏亮点即核心趣味 一、关键词&#xff1a; 回合制游戏、二次元、机甲、横板、剧情、养成、异星探索。 二、游戏亮点&#xff1a; 符合目标群体审美的原画。 三、核心趣味&#xff1a; 抽卡、肝或者氪金解锁新皮肤。 核心玩法及系统规则 核心玩法&…

海外IP代理科普:代理池有什么用?代理池大小的影响

在当今数字化时代&#xff0c;网络爬虫已经成为获取各类信息必不可少的工具。在大规模数据抓取中&#xff0c;使用单一 IP 地址或同一 IP 代理往往会面临抓取可靠性降低、地理位置受限、请求次数受限等一系列问题。为了克服这些问题&#xff0c;构建代理池成为一种有效的解决方…

【Android Compose】ListView效果

【Android Compose】ListView效果 1、Column、Row 和 Box2、LazyColumn和LazyRow3、Compose 中的状态4、ListView效果5、android-compose-codelabs Jetpack Compose 使用入门 Jetpack Compose 教程 Jetpack Compose 1、Column、Row 和 Box Compose 中的三个基本标准布局元素是 …

C++相关概念和易错语法(24)(map、迭代器分类)

1.map 在上篇文章中&#xff0c;我着重介绍了set&#xff0c;由于map和set同源&#xff0c;所以这次我会着重介绍map别于set的地方 &#xff08;1&#xff09;模板参数 set是以单一的key作为成员变量&#xff0c;而map是以pair作为成员变量&#xff0c;而pair的first作为key来…

使用千帆SDK压测千帆大模型平台上的服务

场景 给用户提供千帆标准的压测工具&#xff08;千帆SDK&#xff09;。满足以下使用场景&#xff1a; 测试sft模型部署到算力单元后&#xff0c;实际的性能效果 对比模型压缩后的性能效果 测试预置服务的性能 压测数据准备&#xff08;数据格式规范说明&#xff09; 可用…

DAY05 CSS

文章目录 1 CSS选择器(Selectors)8. 后代(包含)选择器9. 直接子代选择器10. 兄弟选择器11. 相邻兄弟选择器12. 属性选择器 2 伪元素3 CSS样式优先级1. 相同选择器不同样式2. 相同选择器相同样式3. 继承现象4. 选择器不同权值的计算 4 CSS中的值和单位1. 颜色表示法2. 尺寸表示法…

Try ubuntu core (by quqi99)

作者&#xff1a;张华 发表于&#xff1a;2024-07-20 版权声明&#xff1a;可以任意转载&#xff0c;转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明(http://blog.csdn.net/quqi99) try ubuntu core on qemu #ovmf is to ensure compatibility with the re…

电机线电流与转差率曲线理论推导

1.推导基础&#xff1a; #已知正转正拉电流近似为&#xff1a; curr_in_upward (im im*(rm(lml2)*2*np.pi*freq_in*1j)/(r2 l2*2*np.pi*freq_in*1j (1-s)/s*r2))#同工况同负载&#xff0c;正转反拉电流近似为&#xff1a; curr_in_downward (im im*(rm(lml2)*2*np.pi*f…

代码随想录第六十二天 | 739. 每日温度 , 496.下一个更大元素 I ,503.下一个更大元素II

先复习一下栈与队列。栈是先进后出&#xff0c;队列是先进先出。二者都属于STL容器&#xff08;版本是SGI STL&#xff09;中的容器适配器。底层容器完成其所有的工作&#xff0c;对外提供统一的接口&#xff0c;底层容器是可插拔的。如果没有指定底层实现的话&#xff0c;默认…

22-联合体与枚举

22-联合体与枚举 文章目录 22-联合体与枚举一、 联合体1.1 定义和特点1.2 语法1.3 示例1.4 联合体的使用1.5 联合体的使用&#xff1a;检查系统的字节序 二、 枚举2.1 定义和特点2.2 语法2.3 枚举常量的值可以手动修改 一、 联合体 1.1 定义和特点 联合体&#xff08;Union&a…

扩展PyTorch视觉模型

扩展PyTorch视觉模型 目录 扩展PyTorch视觉模型 一、概述 二、扩展基本视觉模型的原因 1. 性能提升 2. 功能扩展 3. 资源管理 三、扩展PyTorch视觉模型的方法 1.修改现有架构 2.应用模型集成技术 3.量化和压缩模型 四、高级技巧与实践 1.自定义训练循环 2.深度模型…

【SpringBoot】 jasypt配置文件密码加解密

目前我们对yml配置文件中的密码都是明文显示&#xff0c;显然这不安全&#xff0c;有的程序员离职了以后可能会做一些非法骚操作&#xff0c;所以我们最好要做一个加密&#xff0c;只能让领导架构师或者技术经理知道这个密码。所以这节课就需要来实现一下。 我们可以使用jasypt…

Gitops-万字保姆级教程-小白也可以轻松学会! (Part 2)

系列文章目录 本文章分为2个部分&#xff1a; Part 1主要涉及Gitlab、Gitlab-Runner、Git-Ci、Sonar-qube-CI阶段 Part 2主要涉及ArgoCD阶段 Gitops-万字保姆级教程-小白也可以轻松学会! (Part 1)-CSDN博客 Gitops-万字保姆级教程-小白也可以轻松学会! (Part 2) 文章目录 目…