来一段舞蹈
前提回顾
下载kafka
wget https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
或者
curl -O https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
解压缩
tar -vxf kafka_2.8.0-0.8.0.tar.gz
修改配置
修改conf/server.properties
- 【host.name=10.10.224.12】:修改为主机ip,不然服务器返回给客户端的是主机的hostname,客户端并不一定能够识别。
修改conf/zookeeper.properties
- 【dataDir=/usr/local/tmp/zookeeper】:zookeeper临时数据文件。
启动zookeeper和kafka
启动zookeeper
- 【./zookeeper-server-start.sh …/config/zookeeper.properties &】(&推出命令行,服务守护执行)
启动kafka
- 【./kafka-server-start.sh …/config/server.properties &】:启动相关的kafka服务,指定相关的配置文件
验证是否成功
创建主题
-
不过我们一般不建议将auto.create.topics.enable参数设置为true,因为这个参数会影响topic的管理与维护。
-
kafka的0.10版本之前,可以使用内置的kafka-admin包,后续提供了专门的类AdminClient API来进行API层面上的topic创建。
-
通过kafka提供的kafka-topics.sh脚本来创建,并且我们也建议通过这种方式(或者相关的变种方式)来创建topic。
./kafka-create-topic.sh --partition 1 --replica 1 --zookeeper localhost:2181 --topic test
或者 低版本
bin/kafka-topics.sh --create --zookeeper 192.168.0.2:2181/kafka100 --topic topic-test1 --replication-factor 2 --partitions 4
- 2181:是zookeeper 端口
- partion:topic的数据内容被划分为几块存储
- replication-factor: 物理存储topic的内容采用几个副本的容错策略
自动创建主题
如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(默认值为1)个副本的对应topic。
命名规则
topic的命名不推荐(虽然可以这样做)使用双下划线__开头,因为以双下划线开头的topic一般看作是kafka的内部topic,比如__consumer_offsets和__transaction_state。
topic的名称必须满足如下规则:
- 由大小写字母、数字、.、-、_组成
- 不能为空、不能为.、不能为…
- 长度不能超过249
创建topic时除了需要zookeeper的地址参数外,还需要指定topic的名称、副本因子replication-factor以及分区个数partitions等必选参数 ,还可以包括disable-rack-aware、config、if-not-exists等可选参数。
检查是否创建主题成功
./kafka-list-topic.sh --zookeeper localhost:2181
或者低版本可以使用
./kafka-topics.sh --list --zookeeper localhost:2181
启动produce
Kafka的console-producer在topic test1 生产消息
./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1
然后输入想要产生的消息内容(如 hello world),回车:
启动consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
运行命令:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning
9092是kafka单机启动的端口;–bootstrap-server 新旧kafka版本不一样,这个是新版本的命令
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h3DNJV4M-1671089678445)(https://oscimg.oschina.net/oscnet/up-e65bd3baa9ba10bbdb9bd7321c7d8a38398.png)]
关闭kafka和zookeeper
./kafka-server-stop.sh ../config/server.properties
./zookeeper-server-stop.sh
心得总结
- produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口;
- 必须先创建topic才能使用;
- topic本质是以文件的形式储存在zookeeper上的。
kafka与zookeeper间的关联
一个典型的Kafka集群中包含若干Produce,若干broker(一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。
-
Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。
-
Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
-
Producer端直接连接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每个partition leader建立socket连接并发送消息。
-
Broker端使用zookeeper用来注册broker信息,以及监控partition leader存活性。
-
Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
Zookeeper作用:
管理broker、consumer
-
创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。具体的,通过注册watcher,获取partition的信息。
-
Topic的注册,zookeeper会维护topic与broker的关系,通/brokers/topics/topic.name节点来记录。
-
Producer向zookeeper中注册watcher,了解topic的partition的消息,以动态了解运行情况,实现负载均衡。Zookeepr不管理producer,只是能够提供当前broker的相关信息。
-
Consumer可以使用group形式消费kafka中的数据。
- 所有的group将以轮询的方式消费broker中的数据,具体的按照启动的顺序。
- Zookeeper会给每个consumer group一个ID,即同一份数据可以被不同的消费者ID多次消费。
单播与多播的实现
-
以单个消费者还是以组别的方式去消费数据,由用户自己去定义。
-
Zookeeper管理consumer的offset跟踪当前消费的offset(新版本已经放在topic去维护了)。