用单机搭建kafka伪分布式集群,其实集群的概念并不复杂
先说明一下,以下的每个服务启动后都需要新开一个终端来启动另外的服务(因为是集群,自然会用多个终端)
首先下载kafka
提取码:dvz4
或者直接去官网下载kafka_2.11-1.0.0.tgz
tar -zxvf kafka_2.11-1.0.0.tgz
cd 进入kafka_2.11-1.0.0里面
1.mkdir etc
2.cp config/zookeeper.properties etc
//它是zookeeper的配置文件
3.cp config/server.properties etc
//不要直接执行这条命令,它是用来配置kafka的配置文件, 由于我们需要3个broker实例,所以需要拷贝三份, 把这个命令修改为
cp config/server.properties etc/server_0.properties
cp config/server.properties etc/server_1.properties
cp config/server.properties etc/server_2.properties
4.进入这三个配置文件,分别把broker.id的值更改为0,1,2. 把listeners=PLAINTEXT://:9092中的 9092分别改为9092,9093,9094, 再把log.dirs=/tmp/kafka-logs中的logs分别改为logs-0, logs-1, logs-2
kafka是用到了zookeeper的,zookeeper的作用在文末有介绍,每个kafka的实例都需要连接到zookeeper的,注意看这三个配置文件里面都有zookeeper.connect=localhost:2181
, 因为zookeeper就在本机,所以不用特殊配置,若真正的多机上集群自然就需要配置了。
5.更改好之后去bin目录启动zookeeper, 执行
./zookeeper-server-start.sh ../etc/zookeeper.properties
对于为什么要用zookeeper在文末有介绍。
启动zookeeper过程报错问题
如果java版本不支持,当前这个kafka需要的java版本是8之前, 而我的是11
出现类似Kafka 无法识别的 VM 选项“PrintGCDateStamps”的报错
在bin/kafka-run-class.sh中把
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([^.-]*).*"/\1/p')
换成
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([^.-]*).*/\1/p')
区别就是少了一个"
6.接着启动三个kafka的实例(分别打开三个新的终端)
./kafka-server-start.sh ../etc/server-0.properties
./kafka-server-start.sh ../etc/server-1.properties
./kafka-server-start.sh ../etc/server-2.properties
7.接下来是创建kafka的主题
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 2
解释一下以上字段,
--zookeeper是必须的,因为zookeeper是在本地,所以写的是localhost,2181表示zookeeper监听的端口号
--create表示要创建主题了,可以把它改为--describe表示查看主题的分区情况
--topic 表示要创建的主题名
--partitions 表示分区数量
--replication-factor 表示每个分区有多少份(主本+副本)
8.创建好了之后查看一下刚才创建的主题情况
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
介绍一下主要字段:
partition表示分区号,
leader 表示分区的主本在哪个终端(broker)上
replicas 表示partition表示的这个分区在哪些终端(broker)上
Isr 表示当前正常同步的终端(broker)有哪些
9.kafka利用控制台模拟消费者消费数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092, localhost:9093, localhost:9094 --topic test
此时已经在等待消费数据了,所以需要一个生产者
10.kafka利用控制台模拟生产者生产数据
./kafka-console-producer.sh --broker-list localhost:9092, localhost:9093, localhost:9094 --topic test
此时在当前终端输入任意内容, 消费者那里就能接受到消息了。
拓展:zookeeper的作用, 因为kafka用到了zookeeper
先说明一点:zookeeper主要就是通过znode的节点类型 + 监听机制 来实现很多实用的功能。
因为kafka用到了zookeper,简单介绍一下,可以用它来做统一配置管理、统一命名服务、分布式锁、集群管理。
zookeper的节点成为znode
znode有两种类型: 短暂(当客户端和服务端断开连接后,所创建的znode会自动删除), 持久(连接断开后也不会删除。) , 它们有一个共同特点,可以把节点的名字弄成顺序的(与做分布式锁有关)
zookeeper和redis一样都是C/S架构(分客户端和服务端)
理解了zookeeper的结构之后,还需要知道zookeeper需要配合监听器才能做这么多事,常见的监听场景有一下两种: 1.监听znode节点的数据变化 2.监听子节点的增减变化。
一、用zookeeper做统一配置管理
比如把一个程序做成集群的形式,每个机器上都有相同的配置文件,如果需要修改,那么就需要在每个机器上都进行修改, 所以可以把这些公共的配置文件放到zookeeper进行管理,同时会落盘数据库, 同时会对应用开启配置实时监听,如果zookeeper配置文件一旦被修改,应用就可以实时监听到并获取。
二、用zookeeper做统一命名服务,理解上和域名一样,我们给一部分资源(多个ip地址)起一个名字,把这个名字挂到znode节点上
三、用zookeeper做分布式锁
举个例子:
系统A拿到/locks节点下的所有子节点,经过比较,发现自己(id_000000),是所有子节点最小的。所以得到锁
系统B拿到/locks节点下的所有子节点,经过比较,发现自己(id_000002),不是所有子节点最小的。所以监听比自己小1的节点id_000001的状态
系统C拿到/locks节点下的所有子节点,经过比较,发现自己(id_000001),不是所有子节点最小的。所以监听比自己小1的节点id_000000的状态
四、用zookeeper来管理集群
在zookeeper中创建一个groupMember节点,同时创建3个子节点表示三台不同机器上的服务,如果谁挂了,另外两台就可以感知到。
zookeeper可以实现动态选举master的功能,对于主从的选择, 可以把代表不同机器的znode节点弄成带顺序号的临时节点,zookeeper每次选举最小编号的znode对应的机器作为master,如果master挂了,对应的znode就会删除,然后让新的最小编号的znode对应的机器做master。