Kafka 3.3.1 Kraft 多端口协议搭建

news2025/1/21 6:25:39

Kafka 3.3.1 Kraft 多端口协议搭建

Kafka 3.3.1 已经出来挺久了,很多公司还停留在 1.X/2.X 甚至 0.8 版本的 kafka,不是说不能用,但是用起来真的糟糕,况且现在 Kraft 已经正式推出了,早就该更新了。

本篇文章从实践角度出发,使用真实的搭建手册改编,带领大家搭建多网卡、多端口、多认证的 Kafka 3.3.1 on Kraft。

预计效果,内部通信的非认证、内部数据传输的 sasl-plain、外部数据消费的 sasl-ssl。

版本

Kafka 2.12-3.3.1 with KRaft

auth: huangyichun maggot

服务器环境

在搭建时,必须明确我们集群的情况。本手册使用双网卡进行配置演示,每台服务器有两个网卡(*下面 outip 为虚拟网卡)

再生产环境,搭建任何集群都应该有操作步骤,如这篇文档一样。

快速配置虚拟网卡步骤

cd /etc/sysconfig/network-scripts # 配置目录
cp ifcfg-ens33 ifcfg-ens33:0 # centos7 默认为ens33
vim ifcfg-ens33\:0 # 编辑对应虚拟网卡

# 更改以下配置:
DEVICE="ens33:0"
NAME="ens33:0"
IPADDR="192.168.111.xxx"

systemctl restart network # 重启网络服务
ifconfig # 验证

在这里插入图片描述

配置 ansible hosts

vim /etc/ansible/hosts

# 追加以下内容

[in_ip_kafka3]
192.168.111.128
192.168.111.129
192.168.111.130

[in_ip_kafka3:vars]
ansible_ssh_user='root'
ansible_ssh_pass='root'

如果还未进行 ansible 安装,请移步:

todo ansible 安装手册

环境验证

主目录

/usr/local/kafka/kafka_2.12-3.3.1

数据目录

log.dirs=/data1/kafka/data,/data2/kafka/data

当前虚机并没有这么多挂载盘,只做演示作用

查看磁盘挂载状态

ansible in_ip_kafka3 -m shell -a "df -h"
# 如果可以,在挂载盘的时候最好将kafka data盘 atime关闭。

查看相关内核参数

ansible in_ip_kafka3 -m shell -a "ulimit -a"
# 特别关心 openfiles 参数,一般在大数据中,在 102400 级别

安装 java 14.0.2

安装目录

/opt/jdk/jdk-14.0.2

下载路径记得贴一下 todo
其他 jdk 也是可以的

分发并解压:

ansible in_ip_kafka3 -m copy  -a "src=/opt/jdk/jdk-14.0.2_linux-x64_bin.tar.gz dest=/opt/jdk/"
ansible in_ip_kafka3 -m shell -a "cd /opt/jdk; tar -zxvf jdk-14.0.2_linux-x64_bin.tar.gz"

验证 java 安装:

ansible in_ip_kafka3 -m shell -a "ls -ltr /opt/jdk/"
ansible in_ip_kafka3 -m shell -a "/opt/jdk/jdk-14.0.2/bin/java -version"

在这里插入图片描述

安装 kafka3.3.1

下载地址

kafka.apache.org

分发解压

ansible in_ip_kafka3 -m copy  -a "src=kafka_2.12-3.3.1.tgz dest=/opt/kafka/"
ansible in_ip_kafka3 -m shell -a "cd /opt/kafka; tar -zxvf kafka_2.12-3.3.1.tgz"
ansible in_ip_kafka3 -m shell -a "ls -ltr /opt/kafka"

在这里插入图片描述

kraft conf 目录

/opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties

ansible in_ip_kafka3 -m shell -a "ls -ltr /opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties"

node.id 映射;node.id 是 kafka 中的节点标识,选举投票也会使用这个 id 进行判断

192.168.111.128 -> 111128

192.168.111.129 -> 111129

192.168.111.130 -> 111130

controller.quorum.voters

controller.quorum.voters=111128@192.168.111.128:9093,111129@192.168.111.129:9093,111130@192.168.111.130:9093

配置更改

vim /opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
# 采用后16位作为nodeid不足十进制3位的补0,如:192.168.111.128  -> 111128 
node.id=${NODE_ID}

# The connect string for the controller quorum
controller.quorum.voters=111128@192.168.111.128:9093,111129@192.168.111.129:9093
# The address the socket server listens on.
listeners=INNER://0.0.0.0:9091,OUTTER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=CONTROLLER

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=INNER://${INNER_IP}:9091,OUTTER://${OUTTER_IP}:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER


listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,INNER:PLAINTEXT,OUTTER:PLAINTEXT


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data1/kafka/data,/data2/kafka/data

配置分发:

ansible in_ip_kafka3 -m copy -a "src=/opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties dest=/opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties"

别只分发了不改配置项… node.id & advertised.listeners

配置释义

在 Kafka3.X 中,正式开启了 Kraft 的使用,与之前的 zookeeper 模式其实基本一致,只是将元数据搬迁到了 kafka 内部中进行管理。所以新增了部分选项,需要指定每个节点其作用。

process.roles

可以配置 broker、controller,标示在 KRaft 模式下,该节点充当什么角色:

  • broker 当数据存储节点
  • controller 当元数据管理节点
  • broker,controller 两者都
  • 没有设置,假定使用 zookeeper 进行元数据原理。

controller.quorum.voters

必须所有节点都设置该配置,标识哪些节点是 Quorum 的投票节点,必须包含所有的投票节点。跟 zookeeper 配置中的 zookeeper.connect 配置一样。

controller.quorum.voters=111128@192.168.111.128:9093,111129@192.168.111.129:9093

格式为 node.id@ip:port

  • node.id

上面的映射规则映射出来的 id,一般建议使用后 255*255 补齐零,当然原则是避免重复。

  • ip:port

这个 ip:port 在双网卡时请填写 CONTROLLER 的 ip:port,使用这个进行 kafka 集群内部的通讯规则。

inter.broker.listener.name

kafka 内部通讯时使用哪个 listener。一般建议配置没有进行加密的通讯端口,然后防火墙进行相关配置,仅开放对应的网段进行访问。

advertised.listeners

对外开放访问的接口

advertised.listeners=INNER:// I N N E R I P : 9091 , O U T T E R : / / {INNER_IP}:9091,OUTTER:// INNERIP:9091,OUTTER://{OUTTER_IP}:9092

格式为 LISTENER_NAME://ip:port[,……]

  • LISTENER_NAME

listeners 中定义的访问类型

  • ip:port

这个 ip:port 在双网卡时请填写 LISTENER_NAME 的 ip:port,但如果之前绑定的是 0.0.0.0,请指定到具体的端口!

listener.security.protocol.map

配置具体的 LISTENER_NAME 对应的加密方式

listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,INNER:PLAINTEXT,OUTTER:PLAINTEXT

格式为 LISTENER_NAME : 加密方式

这里的加密方式分为:

  • PLAINTEXT
  • SSL
  • SASL_PLAINTEXT
  • SASL_SSL

这里我们都先使用 PLAINTEXT 进行配置,后续会将 OUTTER 的机密方式进行更改

controller.listener.names

进行选举等操作时占用的通讯端口,配置为 listeners 中的某一个。

log.dirs

配置数据的存储目录,在 kafka 中,log 就是数据的意思。

log.dirs=/data1/kafka/data,/data2/kafka/data

这里我们使用两个挂载盘进行配置,如果是只有一个挂载盘,那么不用这样进行配置。

JVM 相关配置

MEM 使用更改

vim /opt/kafka/kafka_2.12-3.3.1/bin/kafka-server-start.sh


# 找到 KAFKA_HEAP_OPTS, 添加下面的配置参数:
KAFKA_HEAP_OPTS="-Xmx2G -Xms2G -Xmn1G -XX:MetaspaceSize=256M"
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

在这里插入图片描述

分发到其他节点:

ansible in_ip_kafka3 -m copy -a "src=/opt/kafka/kafka_2.12-3.3.1/bin/kafka-server-start.sh dest=/opt/kafka/kafka_2.12-3.3.1/bin/kafka-server-start.sh"

日志相关配置

默认 kafka 日志输出没有指定位置,可以在这里进行配置

vim /opt/kafka/kafka_2.12-3.3.1/bin/kafka-run-class.sh

# 搜索到 LOG_DIR 处,然后添加下面一行
LOG_DIR=/data1/kafka/logs
if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi


# 搜索到 JAVA_HOME,将 JAVA 命令指向刚刚安装14的位置
# Which java to use
if [ -z "$JAVA_HOME" ]; then
  JAVA="java"
else
  JAVA="$JAVA_HOME/bin/java"
fi

# 添加下面一行
JAVA="/opt/jdk/jdk-14.0.2/bin/java"

在这里插入图片描述

分发 kafka-run-class.sh 到其他节点

ansible in_ip_kafka3 -m copy -a "src=/opt/kafka/kafka_2.12-3.3.1/bin/kafka-run-class.sh dest=/opt/kafka/kafka_2.12-3.3.1/bin/kafka-run-class.sh"

初始化 kafka 集群

严禁在已经生产中进行此步骤,会清空所有已经存在数据!

随机集群标识

随机一个 uuid 作为 kafka 集群标识:

bash /opt/kafka/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid

输出一个结果:

M6ZxYfO3TByOQQoc1YdBQw

那么我们在此集群使用 M6ZxYfO3TByOQQoc1YdBQw 为整个集群的标识。

节点 format 操作

此操作会格式化 server.properties 中的 log.dir 路径,严禁在已经生产中使用!

/opt/kafka/kafka_2.12-3.3.1/bin/kafka-storage.sh format -t M6ZxYfO3TByOQQoc1YdBQw -c /opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties

ansible 操作:

ansible in_ip_kafka3  -m shell -a "rm -rf /data*/kafka"
ansible in_ip_kafka3  -m shell -a "/opt/kafka/kafka_2.12-3.3.1/bin/kafka-storage.sh format -t M6ZxYfO3TByOQQoc1YdBQw -c /opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties"

在这里插入图片描述

启动命令

ansible in_ip_kafka3  -m shell -a "/opt/kafka/kafka_2.12-3.3.1/bin/kafka-server-start.sh -daemon /opt/kafka/kafka_2.12-3.3.1/config/kraft/server.properties"

# 查看是否有对应日志,且正常
ansible in_ip_kafka3  -m shell -a "ls -ltr /data1/kafka/logs"

ansible in_ip_kafka3  -m shell -a "tail /data1/kafka/logs/*"
ansible in_ip_kafka3  -m shell -a "jps"

排查错误

Connection to node 111130 (/192.168.111.129:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

在这里插入图片描述

如果发现有这个问题,请查看是否关闭掉了防火墙。

ansible in_ip_kafka3  -m shell -a "systemctl status firewalld"

关闭步骤:

ansible in_ip_kafka3  -m shell -a "systemctl stop firewalld; systemctl mask firewalld"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/69111.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

276 t230 二叉搜索树第k小的元素

题解 思路:定义一个全局变量,初值赋予k.中序遍历,每次访问一个,变量–,当该变量为0时,就把val值赋予另一个全局变量返回. class Solution {int resKthSmallest0;int countKthSmallest;// 276 t230 二叉搜索树第k小的元素public int kthSmallest(TreeNode root, int k) {count…

大数据人工智能实验室-大数据培训方案

随着计算机和信息技术的迅猛发展和普及应用,行业应用系统的规模迅速扩大,行业应用所产生的数据呈爆炸性增长。企业需要将隐藏在大批杂乱无章的数据中进行信息的集中、萃取和提炼,找出所研究对象的内在规律,从而对决策提供帮助&…

预训练模型-代码补全(二):Copilot(GitHub)

​ GitHub Copilot是一个为开发者提供的突破性的AI编程辅助工具,但这才是开始。 昨天,Copilot团队推出了一个名为GitHub Copilot Labs的VS Code配套扩展。它独立于(并依赖于)GitHub Copilot扩展。它可以用来解释代码和翻译代码。…

【元胞自动机】格子波尔兹曼模型研究(Matlab代码实现)

👨‍🎓个人主页:研学社的博客 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜…

【JNA】java springboot 动态读取动态库

【JNA】java springboot 动态读取动态库创建名为dynamic-lib-load.xml的文件资源目录结构如下工具类LibraryLoadDynamicParseUtil工具类调用我们在使用第三方动态库 时长出现动态库无法读取jar包内的动态库文件,以下代码希望对大家有帮助 废话不多说,上代…

【王道计算机网络笔记】数据链路层-局域网广域网

文章目录局域网局域网拓扑结构局域网传输介质局域网介质访问控制方法局域网的分类以太网以太网提供无连接、不可靠的服务以太网传输介质与拓扑结构的发展10BASE-T以太网适配器与MAC地址以太网的MAC帧高速以太网IEEE802标准MAC子层和LLC子层IEEE802.11有固定基础设施无线局域网无…

嵌入式:ARM体系结构详解

文章目录指令集与指令集架构主要计算机指令集架构PC及服务器领域嵌入式领域新生代ARM体系结构的演变ARM发展的历程指令集与指令集架构 指令:就是指挥计算机工作的命令,程序就是一系列按一定顺序排列的指令,计算机就是通过执行程序中的指令来…

11. softmax回归的简洁实现

通过深度学习框架的高级API也能更方便地实现softmax回归模型。 继续使用Fashion-MNIST数据集,并保持批量大小为256。 import torch from torch import nn from d2l import torch as d2lbatch_size 256 train_iter, test_iter d2l.load_data_fashion_mnist(batch…

在IDEA中配置Maven

文章目录Maven 简介Maven 下载与安装修改Maven配置文件Maven文件目录的含义配置IDEA的MavenMaven 简介 Maven 项目对象模型(POM),可以通过一小段描述信息来管理项目的构建,报告和文档的项目管理工具软件。 Maven 除了以程序构建能力为特色之外&#x…

数据库审核工具SQLE接口调用

点击上方蓝字关注我接上文数据库审核接口SQLE的探索使用,本次自定义接口进行调用,实现需求。1、创建自定义审核接口因直接调用SQLE的审核接口,会出现token过期,且审核及结果查询接口是分开的,因此,出于以上…

Mock模拟数据动态字节码编译插件优化

模块介绍 dmc-plugin-java 动态编译字节码 关于动态编译字节码技术参考: https://blog.csdn.net/huxiang19851114/article/details/127881616 优化如下: 动态文本类改为界面配置及数据库保存 数据库表结构: DROP TABLE IF EXISTS compi…

leetcode刷题 log day56(编辑距离总结篇~

583. 两个字符串的删除操作 【思路】这道题只有删除操作,两个字符串相等时,步数不变,不相等时,只能做删除操作,删除有三种情况:删除 word1 或删除 word2 或者两个字符串都删除,取三种情况的最小…

sytem clock for ctrl ms task and us/ms delay

Cortex-M3 的内核中包含一个 SysTick 时钟。SysTick 为一个 24 位递减计数器,SysTick 设定初值并使能后,每经过 1 个系统时钟周期,计数值就减 1。计数到 0 时,SysTick 计数器自动重装初值并继续计数,同时内部的 COUNTF…

网页性能优化

网页性能优化 文章目录网页性能优化[TOC](文章目录)前言1.前端脚本优化1.1 减少重绘、回流1.2 缓存dom选择与计算1.3 使用事件委托而不是批量绑定2.渲染优化2.1 使用CSS3开启GPU硬件加速提升网站动画渲染性能2.2 touchmove、scroll、input事件添加防抖3.加载优化3.1 合并小图片…

怎么修改图片分辨率提高清晰度?如何调整图片dpi分辨率?

下载的图片有时候会比较模糊,想要改变图片清晰度的话就需要调整图片分辨率,很多小伙伴都不知道怎么去修改分辨率(在线修改照片分辨率(DPI) 调整图片DPI 照片dpi修改工具-压缩图)。今天小编就教大家一个非常…

关系抽取(三)实体关系联合抽取:TPlinker

参考: NLP系列之封闭域联合抽取:CasRel、TPLinker、PRGC、PURE、OneRel,实在是太卷了! - 知乎 (zhihu.com)NLP 关系抽取 — 概念、入门、论文、总结 TPlinker 论文:PLinker: Single-stage Joint Extraction of Entit…

乐观锁思想在 JAVA 中的实现——CAS

前言 生活中我们看待一个事物总有不同的态度,比如半瓶水,悲观的人会觉得只有半瓶水了,而乐观的人则会认为还有半瓶水呢。很多技术思想往往源于生活,因此在多个线程并发访问数据的时候,有了悲观锁和乐观锁。 悲观锁认为…

FinClip11月产品更新:FIDE 插件开发功能优化;开发者文档英文版上线

不知不觉 22 年进入尾声,通过一年的不断打磨,FinClip 也在不断成长,现在,让我们看看过去的 11 月,FinClip 又有了哪些新的变化。 产品方面的相关动向👇👇👇 FIDE 插件开发功能优化…

【LeetCode每日一题:1775. 通过最少操作次数使数组的和相等~~~贪心+思维题】

题目描述 给你两个长度可能不等的整数数组 nums1 和 nums2 。两个数组中的所有值都在 1 到 6 之间(包含 1 和 6)。 每次操作中,你可以选择 任意 数组中的任意一个整数,将它变成 1 到 6 之间 任意 的值(包含 1 和 6&a…

另一种在ARM/x86架构处理器上部署WebDAV服务器的方法

引言 最近搞了个矿渣,处理器是国产的RK3328,四核A53架构,64位的,性能太好了,装了个OpenWRT,想用来当nas用,但是我发现,竟然没有合适的文件服务器,局域网内用SMB确实可以…