文章目录
- 1.启动
- 2.创建主题
- 3.发送消息
- 4.消费消息
- 5.使用kafka connect将现有的数据导入到kafka中
- 6.使用kafka streams处理kafka中的events
- 6.终止服务
- 集群配置要点
- 创建主题要点
- 主题分区变更
- 主题副本可变更吗?
- 创建生产者要点
> tar -xzf kafka_2.12-3.3.1.tgz
1.启动
启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
查看端口占用
[root@node2 ~]# netstat -ant |grep 2181
tcp6 0 0 :::2181 :::* LISTEN
或者进程
[root@node2 ~]# ps -ef |grep zookeeper
启动kafka server
bin/kafka-server-start.sh config/server.properties
2.创建主题
创建主题
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
查看主题
[root@node2 kafka_2.12-3.3.1]# bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: 5zoNk2alQ4C1X3qhxVsndw PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[root@node2 kafka_2.12-3.3.1]#
3.发送消息
发送消息
[root@node2 kafka_2.12-3.3.1]# bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>abc
>hello world
>
4.消费消息
消费消息
[root@node2 kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootap-server localhost:9092
abc
hello world
5.使用kafka connect将现有的数据导入到kafka中
修改 config/connect-standalone.properties
plugin.path=libs/connect-file-3.3.1.jar
在xxx/kafka_2.12-3.3.1目录下创建test.txt
echo -e "foo\nbar" > test.txt
启动
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这个命令会创建两个connector,
一个是source connector:读取test.txt中内容到topic:connect-test中,
一个是sink connector,将connect-test主题中的内容读出,并写出到test.sink.txt文件中
[root@node2 kafka_2.12-3.3.1]# more test.sink.txt
foo
bar
如果想看connect-test主题中的内容,可以启动个消费监听
[root@node2 kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
且这个通道是时时的,当我们追加内容到源头的时候
echo Another line>> test.txt
新的数据又会输出到test.sink.txt 中
6.使用kafka streams处理kafka中的events
6.终止服务
0.首先将生产者和消费者停止
1.先停止kafka broker
2.再停止zookeeper
删除数据
rm -rf /tmp/kafka-logs /tmp/zookeeper
集群配置要点
启动集群,server.properties中主要要注意如下三个配置
broker.id=0 --集群的每个id都要不同
log.dirs=/tmp/kafka-logs ---默认都配置到了tmp下,要自定义路径
zookeeper.connect=localhost:2181 ---真实场景需要连接zk集群,且建议不要放在zk根节点下
如下所示,将zookeeper.connect配置到zk集群的kafka节点下
通过脚本启动集群
https://www.bilibili.com/video/BV1vr4y1677k?p=7&spm_id_from=pageDriver&vd_source=867fa98b57c5df7e6cb7e8f3ad59fd84
kafka-console-producer.sh 生产者端脚本
kafka-console-consumer.sh 消费者端脚本
创建主题要点
kafka-topics.sh 集群管理topic脚本
常参数
假设有100T的数据,分成3个分区存储,那一个分区大概是33T;每个分区中,又会将数据按照一块一块进行存储,每一块默认是1G
主题分区变更
如何对topic的分区数进行变更?
topic的partitions只能增加不能减少,更改命令:
主题副本可变更吗?
不可以通过命令的方式进行副本变更
创建生产者要点
bin/kafka-console-producer.sh 回车,就可以看到命令的使用说明