本次kafka环境主要针对kafka2.x版本,运行kafka服务之前,需要先搭建zookeeper服务,因为kafka服务依赖zookeeper,kafka3.x版本后可以不需要手动搭建zookeeper了。
本文主要是介绍怎样搭建kafka2.8.1,关于kafka的操作,后续其他文章更新。
1.本人环境
(1)centos7环境如下:
NAME="CentOS Linux"
VERSION="7 (Core)"
ID="centos"
ID_LIKE="rhel fedora"
VERSION_ID="7"
PRETTY_NAME="CentOS Linux 7 (Core)"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:centos:centos:7"
HOME_URL="https://www.centos.org/"
BUG_REPORT_URL="https://bugs.centos.org/"CENTOS_MANTISBT_PROJECT="CentOS-7"
CENTOS_MANTISBT_PROJECT_VERSION="7"
REDHAT_SUPPORT_PRODUCT="centos"
REDHAT_SUPPORT_PRODUCT_VERSION="7"
(2)java JDK
安装方式:
安装命令:sudo yum install -y java-1.8.0-openjdk >> java -version openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-b08) OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
(3)kafka包
kafka_2.13-2.8.1.tgz
下载路径:Apache Kafka
(4)两台虚拟机
我环境上两台虚拟机的ip地址分别为:
kafka1:192.168.75.128
kafka2:192.168.75.131
2.解压kafka_2.13-2.8.1.tgz到/data目录下(两台虚拟机上都是这么操作)
mkdir /data
tar xf kafka_2.13-2.8.1.tgz -C /data
3.修改配置文件
(1)修改/etc/hosts (两台虚拟机都这么改)
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.75.128 kafka1 zoo1
192.168.75.131 kafka2 zoo2
(2)修改/data/kafka_2.13-2.8.1/config/zookeeper.properties(两台虚拟机都这么改)
dataDir=/data/kafka_2.13-2.8.1/zookeeper_data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
(3)修改修改/data/kafka_2.13-2.8.1/config/server.properties
修改kafka1虚拟机的/data/kafka_2.13-2.8.1/config/server.properties文件
broker.id=1
listeners=PLAINTEXT://zoo1:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=zoo1:2181,zoo2:2181
修改kafka2虚拟机的/data/kafka_2.13-2.8.1/config/server.properties文件
broker.id=2
listeners=PLAINTEXT://zoo2:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=zoo1:2181,zoo2:2181
4.创建数据目录和集群ID
(1)在kafka1机子上:
cd /data/kafka_2.13-2.8.1
mkdir zookeeper_data
echo 1 > zookeeper_data/myid
(2)在kafka2机子上:
cd /data/kafka_2.13-2.8.1
mkdir zookeeper_data
echo 2 > zookeeper_data/myid
5.启动zookeeper
(1)启动kafka1上的zookeeper
cd /data/kafka_2.13-2.8.1
./bin/zookeeper-server-start.sh config/zookeeper.properties
注释:这里可能会报错,是因为kafka2的zookeeper没有启动还有可能是防火墙把端口拦截了
这里给出的方法是关闭防火墙
1.查看防火墙状态
systemctl status firewalld.service
2.临时关闭防火墙
systemctl stop firewalld.service
3.永久关闭
systemctl disable firewalld.service
(2)启动kafka2上的zookeeper
cd /data/kafka_2.13-2.8.1
./bin/zookeeper-server-start.sh config/zookeeper.properties
6.启动kafka
cd /data/kafka_2.13-2.8.1
bin/zookeeper-server-start.sh config/zookeeper.properties
注释:可能会报错
如果报错:
kafka.common.InconsistentClusterIdException: The Cluster ID C4wRULTzSGqNoEAInvubIw doesn’t match stored clusterId Some(eA5rD8rZSUm3EXr2glib2w) in meta.properties. The broker is trying tojoin the wrong cluster. Configured zookeeper.connect may be wrong.这个时候需要删除kafka的/tmp/kafka-logs(上面kafka配置的目录)目录,让程序重新生成
命令:rm -rf /tmp/kafka-logs
7.kafka操作
cd /data/kafka_2.13-2.8.1
# 创建topic
bin/kafka-topics.sh --create --zookeeper zoo1:2181 --replication-factor 1 --partitions 1 --topic testtopic
# 查看topic list
bin/kafka-topics.sh --zookeeper zoo1:2181 --list
# 控制台生产消息
bin/kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic testtopic
# 控制台消费消息
bin/kafka-console-consumer.sh --bootstrap-server kafka2:9092 --topic testtopic --from-beginning
8.创建后台服务
(1)创建zookeeper.service
vim /etc/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper service
After=network.target
[Service]
Type=simple
User=root
Group=root
ExecStart=/data/kafka_2.13-2.8.1/bin/zookeeper-server-start.sh /data/kafka_2.13-2.8.1/config/zookeeper.properties
ExecStop=/data/kafka_2.13-2.8.1/bin/zookeeper-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
(2)创建kafka.service
vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
User=root
Group=root
ExecStart=/data/kafka_2.13-2.8.1/bin/kafka-server-start.sh /data/kafka_2.13-2.8.1/config/server.properties
ExecStop=/data/kafka_2.13-2.8.1/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
(3)systemd配置文件的操作
# 启动zookeeper服务
systemctl start zookeeper.service
# 查看zookeeper服务状态
systemctl status zookeeper.service
# 停止zookeeper服务
systemctl stop zookeeper.service
===================================================================
# 启动kafka服务
systemctl start kafka.service
# 查看kafka服务状态
systemctl status kafka.service
# 停止kafka服务
systemctl stop kafka.service