Kafka命令行操作
[aa@hadoop102 ~]$ cd /opt/module/kafka/bin/
[aa@hadoop102 bin]$ ll
可以看到自带了zookeeper
主题命令行操作
- 查看操作主题命令需要的参数
[aa@hadoop102 kafka]$ bin/kafka-topics.sh
- 重要的参数如下
- 查看当前服务器中的所有topic
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
- 创建一个主题名为first的topic
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 3 --partitions 1 --topic first
- 查看Topic的详情
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
- 修改分区数(注意:分区数只能增加,不能减少)
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
- 再次查看Topic的详情
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
Topic: first TopicId: EVV4qHcSR_q0O8YyD32gFg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
Topic: first Partition: 1 Leader: 103 Replicas: 103,104,102 Isr: 103,104,102
Topic: first Partition: 2 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103
- 删除topic
[aa@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
生产者命令行操作
- 查看命令行生产者的参数
[aa@hadoop102 kafka]$ bin/kafka-console-producer.sh
- 重要的参数如下:
参数 描述
--bootstrap-server 连接kafka Broker主机名称和端口号
--topic 操作的topic名称
- 生产消息
[aa@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu aa
消费者命令行操作
- 查看命令行消费者的参数
[aa@hadoop102 kafka]$ bin/kafka-console-consumer.sh - 重要的参数如下:
参数 描述
--bootstrap-server 连接kafka Broker主机名称和端口号
--topic 操作的topic名称
--from-beginning 从头开始消费
--group 指定消费者组名称
- 消费消息
[aa@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first - 从头开始消费
[aa@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
思考:再次查看当前kafka中的topic列表,发现了什么?为什么?
Kafka生产者
生产者消息发送流程
发送原理
Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。