大数据中间件——Kafka

news2025/1/12 10:00:02

Kafka安装配置

首先我们把kafka的安装包上传到虚拟机中:

解压到对应的目录并修改对应的文件名:

首先我们来到kafka的config目录,我们第一个要修改的文件就是server.properties文件,修改内容如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
# kafka在整个集群中的身份标识,集群中的id是唯一的
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
# 存储kafka数据的位置,默认存储在临时文件夹,要修改成自己的文件夹
log.dirs=/opt/model/kafka/datas

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# 连接的zookeeper集群,需要将集群中部署zookeeper的所有节点写入
# 首先,在zookeeper中,数据的存储是以目录树的方式去存储的,如果后期我们的kafka的数据要修改,在不做任何的修改的情况下,默认是存储在zookeeper根目录下的,这样我们想要单独提取出zookeeper的数据就非常的麻烦
# 所以我们将kafka的数据的单独存储在一个文件分支中,这就是我们为什么要在最后写一个[/kafka]的原因。
# 前面写多个节点是为了防止单个zookeeper节点无法连接可以使用其他的zookeeper节点
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

主要修改三个部分,一个是唯一标识id,kafka的文件存储路径,一个是zookeeper的节点地址。

然后我们将kafka的安装包分发到其他的节点中。

注意在分发完成之后,不要忘记修改不同节点中的唯一标识id的值。

然后我们就可以启动kafka的服务了,注意在启动kafka的服务之前,我们必须要启动zookeeper的服务。

kafka和zookeeper一样,也是要在每个节点中都分别执行启动脚本,并且kafka的启动脚本需要手动指定配置文件:

./kafka-server-start.sh -daemon ../config/server.properties

注意,我的kafka的地址和你们的可能不一样,但是只需要知道启动命令在bin目录下,配置文件在conf目录下即可,我们在三台虚拟机上分别执行脚本:

当我们看到在集群中出现kafka的进程之后,就表示我们的kafka集群启动成功了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1097138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【每日一题】只出现一次的数字 III

文章目录 Tag题目来源题目解读解题思路方法一:位运算 其他语言cpython3 写在最后 Tag 【位运算】【数组】【2023-10-16】 题目来源 260. 只出现一次的数字 III 题目解读 找出数组中恰好只出现一次的连个元素,其余的所有元素均出现两次。要求算法的时间…

获取Steam余额的几种方式,及Steam余额被红锁的几种情况

今天就跟大家聊聊买余额的话题,获取Steam余额的几种方式,及Steam余额被红锁的几种情况。 什么是买余额呢? 指的就是卖家通过steam市场里面的饰品出售功能,把steam账号里的余额转移到买家账号中。 大家都知道,自从充值…

【前段基础入门之】=>CSS3新特性 2D 变换

导语: CSS3新特性,给我们带来了很多的丰富的过渡变换效果,这些效果使我们的页面效果变得更加的生动,这一章节,就给大家带来CSS3中的第一个变换效果:2D 变换 在正式了解2D 变换之前,我们需要了解&#xff0c…

啥?PS一秒成图?Adobe的逆天黑科技大公开

在日前举行的 Adobe MAX 创意大会上,Adobe Adobe Firefly Image 2(萤火虫二代成像模型)、Firefly Vector Model(萤火虫矢量模型)和Firefly Design Model(萤火虫设计模型)。 Firefly矢量模型是世…

155M传输分析仪 优劣势分析

D240S SDH测试模块,是FT100智能网络测试平台产品家族的一部分,是一个坚固耐用、锂电池超长供电的传统PDH/SDH测试解决方案,支持155Mbps到2.048Mbps速率的传输链路测试。支持在线和离线的传输链路的安装、维护和故障排除应用测试。 同时为经验…

分享一下微信小程序怎么添加抽奖活动

微信小程序是一种无需下载安装即可使用的应用,近年来在各行各业中得到了广泛应用。对于企业或商家而言,利用微信小程序开展抽奖活动可以吸引更多的用户关注,增加用户粘性并促进品牌传播。下面就以微信小程序添加抽奖活动为主题,探…

【Linux】环境下部署Nginx服务 - 二进制部署方式

👨‍🎓博主简介 🏅云计算领域优质创作者   🏅华为云开发者社区专家博主   🏅阿里云开发者社区专家博主 💊交流社区:运维交流社区 欢迎大家的加入! 🐋 希望大家多多支…

SOLIDWORKS PDM—2024版本新增

SOLIDWORKS产品数据管理 (PDM) 解决方案可帮助您控制设计数据,并且从本质上改进您的团队就产品开发进行管理和协作的方式。使用 SOLIDWORKS PDM Professional,您的团队能够:1. 安全地存储和索引设计数据以实现快速检索;2. 打消关于…

从简单到复杂,MVI 架构定义与封装使用总结

前言 时间回到一年多前讨论度很高的 MVI 架构,现在也已尘埃落地,没有什么争议并各自都有自己的一套实现方案了,接下来我们就看看这些网上各种各样的 MVI 架构是如何从简单到复杂,从 Java 到 Kotlin 到协程再到 Compose 的各个场景…

gitee page中HTML显示乱码

参考的:静态HTML网页部署到gitee后中文乱码-CSDN博客 根据上述引用的博客做完后要记得在gitee page中更新(我就是没点更新以为用不了)

广告牌安全传感器怎么用?为城市能起到什么效果?

随着城市的迅速发展和经济的快速增长,广告牌在城市中扮演着越来越重要的角色。但是近年来广告牌缺乏修缮和维护,广告牌所带来的安全隐患逐年增加。 广告牌作为城市的明信片,出现损坏,且具有一定的安全隐患之后,给城市带…

你不一定全部知道的16种进程注入方法和注入工具(C语言版)

一、前言 提起进程注入,似乎感觉挺奇妙的,毕竟这是黑客的入门必备技术之一,互联网发展这么多年,每年都会有新的技术出现,自然也就有了很多的进程注入的方法。今天小编要和大家讲的是16种我们比较常见的进程注入方法&a…

进化策略算法

前言 进化策略 (Evolution Strategy) 后面都简称 ES,其本质就是:种群通过交叉产生后代,我们只保留较好的父代和子代,一直这样迭代下去, 我们的保留方式是: 父代产生后代,然后将后代DNA和原来的…

C++入门-引用

C入门-引用 前置知识点:函数栈帧的复用前置知识点:类型转换时产生的临时变量1.含义2.代码形式3.引用的价值1.传参数传参效率测试补充:C与Java中引用的区别 2.引用做返回值(前置知识:栈帧复用)1.传值返回2.传引用返回传引用返回并用引用接收3.静态变量传引用返回4.引用做返回值真…

Redis数据结构的奇妙世界:一窥底层存储机制【redis第一部分】

Redis数据结构的奇妙世界:一窥底层存储机制【redis第一部分】 前言第一:为什么要使用redis第二:redis的底层数据结构第三:Redis的基本数据类型1. 字符串(String)2. 列表(List)3. 集合…

Ansible的playbook编写和运行示例介绍

目录 一.yaml语法格式 1.定义: 2.yaml支持几种数据类型 (1)纯量: (2)对象 (3)数组 3.playbook-yaml书写的注意事项 二.playbook编写和运行 1.单个简单playbook示例 &#…

2023_Spark_实验二十:SparkStreaming累加计算单词频率

一、需求分析 在服务器端不断产生数据的时候,sparkstreaming客户端需要不断统计服务器端产生的相同数据出现的总数,即累计服务器端产生的相同数据的出现的次数。 二、实验环境 centos7 nc spark2.1.1 windows idea 三、思路分析 流程分析 思路分析…

BUUCTF学习(7): 随便注,固网杯

1、介绍 2、解题 11;show tables;# select * from 1919810931114514 concat(sel,ect from 1919810931114514 ) PEREPARE y from sql; ECCUTE y; -1; sEt sql CONCAt(se,lect * from 1919810931114514;); prePare stmt from sql; EXECUTE stmt; # 结束

代码随想录算法训练营第二十三天丨 回溯算法part01

回溯算法理论基础 #题目分类 #理论 #什么是回溯法 回溯法也可以叫做回溯搜索法,它是一种搜索的方式。 在二叉树系列中,不止一次提到了回溯,例如二叉树:以为使用了递归,其实还隐藏着回溯 (opens new window)。 回溯…

5款令人骄傲的国产优质软件,能让你的电脑办公更加高效

很多人都喜欢用国外软件,其实国内也有不少优秀软件。这些国产软件不输国外软件,能够提高我们的办公效率,帮助我们更好地处理日常事务。今天就给大家分享5款令人骄傲的国产优质软件,它们能让你的电脑办公更加高效。 Listary——文件…