Kafka: Windows环境-单机部署和伪集群、集群部署

news2025/1/17 23:24:39

1. kafka 单机版部署

1.1 zookeeper 安装

(1)下载安装包

官网:Apache ZooKeeper

我用的是 apache-zookeeper-3.7.1-bin.tar.gz

注意:zookeeper的安装路径不要有中文,建议也不要有空格,比如Program Files这样的路径

下载完成后,解压到本地无中文路径名的目录下,比如: D:/kafka

(2)修改配置文件

在zookeeper的conf目录下复制一份zoo_sample.cfg文件,并重命名为zoo.cfg:

修改zoo.cfg文件里面的路径(data,logs为新建目录)

# 存放内存数据库快照的目录
dataDir=D:/kafka/zookeeper/stand-alone/zookeeper/data
# 存放事务日志目录
dataLogDir=D:/kafka/zookeeper/stand-alone/zookeeper/logs
# AdminServer端口
admin.serverPort=7070
# clientport端口
clientPort=2181

重点避坑:在windows环境中,文件路径必须是 "\" 或者 "//" ,“/” 是无法识别的。 zookeeper服务启动时会启动一个AdminServer的服务,端口会占用8080,如果你有启动别的项目占了8080端口就会报错无法启动。所以在这添加配置 admin.serverPort=7070 来将启动端口修改(7070随便填的,不冲突就行)。单机集群,必须每个节点的admin.serverPort都不同。

(3)参数说明:

参数说明:
 
tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
 
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
 
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
 
dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
 
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

(4)创建data 、logs目录

(5)启动服务

进入bin目录下,进入bin目录下,双击zkServer.cmd

 如果出现闪退,检查jdk的环境变量是否安装正确.路径中不要有中文。

(6)验证是否安装成功

在bin目录下双击zkCli.cmd,打开客户端(此时的服务端zkServer的dos窗口不要关闭),出现"欢迎"字样,说明安装成功!

1.2 kafka 安装

(1)下载安装包

在kafka官网下载安装包,并解压。我使用的是kafka_2.13-2.8.0.tgz。解压到本地目录下,这里是:D:\kafka。

(2)修改配置文件

kafka需要修改server.propertiespei文件的参数:

#节点id,单机用默认的0,集群每个节点都不一样
broker.id=0
#日志文件路径
log.dirs=D:/kafka/kafka/stand-alone/kafka/kafka-logs
#kafka运行端口,默认9092,单机可以不配置
#listeners=PLAINTEXT://:9092
#表示本地运行(默认的可以不改)
zookeeper.connect=localhost:2181

(3)启动kafka服务器

进入Kafka安装目录,新建cmd窗口:

cd D:\kafka\kafka\stand-alone\kafka

输入命令

.\bin\windows\kafka-server-start.bat .\config\server.properties

或者填写绝对路径

D:\kafka\kafka\stand-alone\kafka\bin\windows\kafka-server-start.bat D:\kafka\kafka\stand-alone\kafka\config\server.properties

注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行

1.3 测试

(1)创建主题

新建cmd窗口,进入kafka的windows目录下

cd D:\kafka\kafka\stand-alone\kafka\bin\windows

输入以下命令,创建一个叫topic001的主题

.\kafka-topics.bat --create --topic test1 --bootstrap-server 127.0.0.1:9092

(2)查看状态

 .\kafka-topics.bat --describe --topic test1 --bootstrap-server 127.0.0.1:9092

 (3)停止kafka

.\kafka-server-stop.bat

2. kafka伪集群

2.1 zookeeper集群搭建

(1)创建多节点配置

在单节点的基础上,复制一份,zookeeper集群最少三个节点。三个节点的端口不能相同,分别使用2181、2182、2183。

创建三个节点用的配置文件 zoo.cfg复制多份,分别命名为 zoo-1.cfg、zoo-2.cfg、zoo-3.cfg。

(2)创建多节点的data目录和myid文件、logs目录

data2181、data2182、data2183的myid分别为1、2、3。

(3)三个节点的配置内容:

zoo-1.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:/kafka/zookeeper/colony/zookeeper/data2181

dataLogDir=D:/kafka/zookeeper/colony/zookeeper/logs1
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

admin.serverPort=8080
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 集群配置
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

 zoo-2.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:/kafka/zookeeper/colony/zookeeper/data2182

dataLogDir=D:/kafka/zookeeper/colony/zookeeper/logs2
# the port at which the clients will connect
clientPort=2182
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60

admin.serverPort=8081
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 集群配置
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

zoo-3.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:/kafka/zookeeper/colony/zookeeper/data2183

dataLogDir=D:/kafka/zookeeper/colony/zookeeper/logs3
# the port at which the clients will connect
clientPort=2183
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60


admin.serverPort=8083
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 集群配置
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

(4)配置三个启动脚本

zkServer-2181.cmd

@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM     http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
set ZOOCFG=D:/kafka/zookeeper/colony/zookeeper/conf/zoo-1.cfg
set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log

echo on
call %JAVA% "-Dzookeeper.audit.enable=true" "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal

pause

zkServer-2182.cmd

@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM     http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
set ZOOCFG=D:/kafka/zookeeper/colony/zookeeper/conf/zoo-2.cfg
set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log

echo on
call %JAVA% "-Dzookeeper.audit.enable=true" "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal

pause

zkServer-2183.cmd

@echo off
REM Licensed to the Apache Software Foundation (ASF) under one or more
REM contributor license agreements.  See the NOTICE file distributed with
REM this work for additional information regarding copyright ownership.
REM The ASF licenses this file to You under the Apache License, Version 2.0
REM (the "License"); you may not use this file except in compliance with
REM the License.  You may obtain a copy of the License at
REM
REM     http://www.apache.org/licenses/LICENSE-2.0
REM
REM Unless required by applicable law or agreed to in writing, software
REM distributed under the License is distributed on an "AS IS" BASIS,
REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
REM See the License for the specific language governing permissions and
REM limitations under the License.

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
set ZOOCFG=D:/kafka/zookeeper/colony/zookeeper/conf/zoo-3.cfg
set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log

echo on
call %JAVA% "-Dzookeeper.audit.enable=true" "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*

endlocal

pause

2.2 kafka集群搭建

(1)复制单机版为三份

(2)配置文件

  • broker.id:节点id,在同一个集群中,不能重复
  • listeners:节点监听的端口,在同一台机器上,也不能相同
  • log.dris:存储数据的位置
  • zookeeper.connectzookeeper集群的连接地址

kafka-1: server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://127.0.0.1:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=D:\kafka\kafka\colony\kafka-1\kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

 kafka-2: server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://127.0.0.1:9093

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=D:\kafka\kafka\colony\kafka-2\kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183


# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

kafka-3: server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=3

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://127.0.0.1:9094

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=D:\kafka\kafka\colony\kafka-3\kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

(3)启动命令

说明:注意路径和写法空格之类的.

第一个节点:kafka-1

 D:\kafka\kafka\colony\kafka-1\bin\windows\kafka-server-start.bat D:\kafka\kafka\colony\kafka-1\config\server.properties

第二个节点:kafka-2

 D:\kafka\kafka\colony\kafka-2\bin\windows\kafka-server-start.bat D:\kafka\kafka\colony\kafka-2\config\server.properties

 第三个节点:kafka-3

 D:\kafka\kafka\colony\kafka-3\bin\windows\kafka-server-start.bat D:\kafka\kafka\colony\kafka-3\config\server.properties

3. kafka分布式集群

在三台服务器分别搭建 zookeeper+kafka的主从集群。

3.1 zookeeper集群搭建

在三台服务器分别搭建 zookeeper+kafka的主从集群。

主机ip
主机一192.168.126.135
主机二192.168.126.136
主机三192.168.126.137

(1)分别在三台win系统下载zookeeper安装包,并分别解压zookeeper压缩包到指定目录下

(2)分别创建zoo.cfg配置文件

复制每台windows服务器的zookeeper/conf/zoo_sample.cfg为 zoo.cfg文件。

(3)创建data、logs目录,并在data目录下新建myid用于集群服务,里面内容填写当前主机id,我是三台服务器的集群,id分别为1,2,3

(4)修改zoo.cfg配置

# 这个地方的路径就是上面创建data文件夹的地址。根据自己的实际地址填写
dataDir=D:/kafka/zookeeper/stand-alone/zookeeper/data
# 存放事务日志目录
dataLogDir=D:/kafka/zookeeper/stand-alone/zookeeper/logs
# AdminServer端口
admin.serverPort=7070
# clientport端口
clientPort=2181

并在文本最后添加节点信息:

server.1=192.168.126.135:2888:3888
server.2=192.168.126.136:2888:3888
server.3=192.168.126.137:2888:3888

节点信息里的 “server.”后面的数字就是约定该服务器的主机id。必须一致,不然集群启动会失败。

(5)启动zookeeper

在命令行执行zookeeper的 zkServer.cmd文件:

D:\kafka\zookeeper\colony\zookeeper\bin\zkServer.cmd

3.2 kafka集群搭建

(1)安装kafka

在三台windows服务器分别下载、解压缩kafka压缩包到指定目录

(2)修改config/server.properties文件

修改broker.id,分别为1,2,3

#对应上面配置zk的每台节点的id
broker.id=1

 修改listeners

#本机主机的ip
listeners=PLAINTEXT://192.168.126.135:9092

修改zookeeper.connect

    #每个节点的信息
    zookeeper.connect=192.168.126.135:2181,192.168.126.136:2181,192.168.126.137:2181

修改日志目录:

log.dirs=D:/kafka/kafka/stand-alone/kafka/logs

(3)进入/bin目录下启动kafka

D:/kafka/kafka/stand-alone/kafka/bin/windows/kafka-server-start.bat D:/kafka/kafka/stand-alone/kafka/bin/config/server.properties

(4)关闭kafka

D:/kafka/kafka/stand-alone/kafka/bin/windows/kafka-server-stop.bat


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/48267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

移动跨平台技术方案浅析

随着互联网产品逐渐兴起,越来越多产品体验从线下搬到了线上,尤其是移动互联网产品相关,所以很多企业就会更加重视降本增效,以最快的速度推出质量满意度高、用户体验性好的产品,那么就顺势催生了很多跨端跨平台方案。 …

并发编程九 线程池Executor框架

一 线程 线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型; Java线程与OS线程保持1:1的映射关系,也就是说有一个java线程也会在操作系统里有一个对应的线程。Java线程有多种生命状态 NEW,新建 RUNN…

一文带你深入了解Linux IIO 子系统

【推荐阅读】 一文剖析Linux内核中内存管理 分析linux启动内核源码 关于如何快速学好,学懂Linux内核。内含学习路线 工业场合里面也有大量的模拟量和数字量之间的转换,也就是我们常说的 ADC 和 DAC。而且随着手机、物联网、工业物联网和可穿戴设备的…

[第二十二篇]——Docker 安装 MongoDB

Docker 安装 MongoDB MongoDB 是一个免费的开源跨平台面向文档的 NoSQL 数据库程序。 1、查看可用的 MongoDB 版本 访问 MongoDB 镜像库地址: 。 可以通过 Sort by 查看其他版本的 MongoDB,默认是最新版本 mongo:latest。 你也可以在下拉列表中找到…

高校社团管理系统的设计与实现

摘要 随着互联网技术的高速发展,人们生活的各方面都受到互联网技术的影响。现在的社团成员可以通过互联网技术就能实现不在学校,在家也可以查看社团信息并能进行申请加入,简单、快捷的方便了社团成员的社交生活。同样的,在人们的工…

【Spring项目中的统一处理异常】

目录 1. 统一处理异常的机制 2. 关于统一处理异常的方法 3. 关于处理异常的方法的执行特点 1. 统一处理异常的机制 Spring MVC框架提供了统一处理异常的机制!表现为每种类型的异常只需要写一段(写一次)处理此异常的代码即可,项…

需求:针对同一个表格多次导入是否要做判断(此项目是用得若依)

每次上传表时,将上传的表名与以往上传的表名做对比,如果相同,则提示表已经有记录,是否上传相同表并结束此方法。 实现思路: 首先,每次上传表都要把表明记录一下,可以新建一个表(数…

天宇优配|离岸人民币狂拉逾千点!中概股暴涨!B站涨22%

当地时间周二,美股三大指数收盘涨跌纷歧。道指涨0.01%,标普500指数跌0.16%,纳指跌0.59%。 抢手中概股领涨,纳斯达克我国金龙指数大涨5.04%,哔哩哔哩(B站)涨超22%。大型科技股多数跌落&#xff0…

小程序中的confirm-type设置键盘的确认按钮

详情: confirm-type是很多小程序组件中的一种设置,用于改变输入键盘右下角的确认按钮。比如说,正常情况下,键盘上的默认提示可能是完成,但是你可以通过confirm-type将其设置为发送,搜索等,在特…

间隔不到一年开两店,温州鸿雁全屋智能经销商透露了他的生意经

作者 | 牧之 编辑 | 小沐 出品 | 智哪儿 zhinaer.cn编者按:间隔不到一年,连续开设了两家全屋智能体验店。这是发生在温州的渠道商故事。本期专访,「智哪儿」对话浙江林上智能科技有限公司总经理朱飞隆先生。他为何做智能家居?为何…

翻转单词序列、按之字形顺序打印二叉树、二叉搜索树的第k个节点

1、翻转单词序列 本题考点:子串划分,子串逆置 牛客链接 题目描述: 牛客最近来了一个新员工Fish,每天早晨总是会拿着一本英文杂志,写些句子在本子上。同事Cat对Fish写的内容颇感兴趣,有一天他向Fish借来翻…

Python数据库编程之关系数据库API规范

Python关系数据库API规范 对于关系数据库的访问,Python社区已经制定出一个标准,称为Python Database API Specification。Mysql,Oracal等特定数据库模块遵从这一规范,而且可以添加更多特性。 高级数据库API定义了一组用于连接数…

三十六、Java 泛型

Java 泛型 Java 泛型(generics)是 JDK 5 中引入的一个新特性, 泛型提供了编译时类型安全检测机制,该机制允许程序员在编译时检测到非法的类型。 泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参数。 假定我们有这…

火山引擎 DataLeap 的 Data Catalog 系统公有云实践

Data Catalog 通过汇总技术和业务元数据,解决大数据生产者组织梳理数据、数据消费者找数和理解数的业务场景。本篇内容源自于火山引擎大数据研发治理套件 DataLeap 中的 Data Catalog 功能模块的实践,主要介绍 Data Catalog 在公有云部署和发布中遇到挑战…

5. LSTM的C++实现

[C 基于Eigen库实现CRN前向推理] 第三部分:TransposedConv2d实现 (含dilation) 前言:(Eigen库使用记录)第一部分:WavFile.class (实现读取wav/pcm,实现STFT)第二部分:Conv2d实现第三部分:Tran…

你知道不同U盘在ARM+Linux下的读写速率吗?

优秀的产品离不开完善的测试,即使一个简单的USB接口也要确保稳定性及兼容性。不同的U盘在ARMLinux板卡下的兼容性、速率怎么样呢?本文将为大家提供测试参考数据及详细测试步骤! 1. 测试准备 主控选用最近发布的64位Cortex-A55核心板&#xff…

设计模式-day01

1,设计模式概述 1.1 软件设计模式的产生背景 "设计模式"最初并不是出现在软件设计中,而是被用于建筑领域的设计中。 1977年美国著名建筑大师、加利福尼亚大学伯克利分校环境结构中心主任克里斯托夫亚历山大(Christopher Alexand…

深入理解SR-IOV和IO虚拟化

一、背景 SR-IOV(Single Root I/O Virtualization)是由PCI-SIG组织定义的PCIe规范的扩展规范《Single Root I/O Virtualization and Sharing Specification》,目的是通过提供一种标准规范,为VM(虚拟机)提供…

springboot+vue职称评审管理系统

开发语言:Java 框架:Springbootssm(SpringSpringMVCMyBatis) JDK版本:JDK1.8 前端框架:vue.js 服务器:tomcat 数据库:mysql 数据库工具:Navicat11 开发软件:eclipse/idea都支持 Mave…

前端页面的性能测试

介绍 随着 Web 应用的空前发展,前端业务逐渐复杂,为了处理这些复杂业务,前后端分离,出现了专门应对这种分离架构的应用开发框架,比如 Angular,React,Vue 等,从而也导致 Web 应用的复…