Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据
topics操作
注意:
创建topic不指定分区数和副本数,默认都是1个
分区数可以后期通过alter增大,但是不能减小
副本数一旦确定,不能修改!
参数如下:
cd /export/server/kafka/bin
./kafka-topics.sh 参数说明:
--bootstrap-server: Kafka集群中broker服务器
--topic: 指定Topic名称
--partitions: 设置Topic的分区数,可以省略不写
--replication-factor: 设置Topic分区的副本数,可以省略不写
--create: 指定操作类型。这里是新建Topic
--delete: 指定操作类型。这里是删除Topic
--alter: 指定操作类型。这里是修改Topic
--list: 指定操作类型。这里是查看所有Topic列表
--describe: 指定操作类型。这里是查看详细且具体的Topic信息
-
1- 创建Topic
# 创建topic,默认1个分区,1个副本 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itcast
# 注意: 如果副本数超过了集群broker节点个数,就会报错 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 4
# 把replication-factor改成3以内就能创建成功了 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 3
-
2- 查看Topic
# --list查看所有topic /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# --describe 可以查看详细Topic信息 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe # --describe 可以查看具体Topic信息 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic itheima
当然也可使用zookeeper客户端查看
-
3- 修改Topic
# 增大topic分区 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4
# 注意: partitions分区,只能增大,不能减小。而且没有数量限制 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 1
# 注意: 副本既不能增大,也不能减小
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4 --replication-factor 2
-
4- 删除Topic
# 再创建一个spark主题 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list # 删除spark主题 /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic spark /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
producer和consumer操作
消费者要和生产者指定是同一个topic主题,才能接收到消息
参数如下:
cd /export/server/kafka/bin ./kafka-console-producer.sh 参数说明 --broker-list: Kafka集群中broker服务器 --topic: 指定Topic ./kafka-console-consumer.sh 参数说明 --bootstrap-server: Kafka集群中broker连接信息 --topic: 指定Topic latest: 消费者(默认)从最新的地方开始消费 --from-beginning: 指定该参数以后,会从最旧的地方开始消费 --max-messages: 最多消费的条数。
-
1- 模拟生产者Producer
# 为了方便演示再创建一个spark /export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark # 模拟生产者给spark发送消息 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark
-
2- 模拟消费者Consumer
# 模拟消费者从spark获取消息,默认每次拿最新的 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark # --from-beginning 会从最旧的地方开始消费 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning # --max-messages x 可以设置从最旧的地方最大消费次数x /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning --max-messages 5
注意:
我们有时候发现消费者打印出来的消息和生产者生产的顺序不一致,是乱序的。原因如下:
topic有多个分区,底层是多线程来读取数据并进行打印输出。因此会存在乱序现象
bootstrap-server和zookeeper以及broker-list的区别:
旧版(<v2.2): kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic .. 注意: 旧版用--zookeeper参数,主机名(或IP)和端口用ZooKeeper的2181,也就是server.properties文件中zookeeper.connect属性的配置值. 新版(>v2.2): kafka-topics.sh --bootstrap-server node1:9092 --create --topic .. 注意: 新版用--bootstrap-server参数,主机名(或IP)和端口用某个节点的即可,即主机名(或主机IP):9092。9092是Kafka的监听端口 broker-list:broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。一般我们再使用console producer的时候,broker-list参数是必备参数,另外一个必备的参数是topic bootstrap-servers: 指的是kafka集群的服务器地址,这个和broker-list功能是一样的,只不过我们在console producer要求用broker-list,其他地方都采用bootstrap-servers。