文章目录
- 01. kafka 主题命令行操作
- 02. kafka 生产者命令行操作
- 03. kafka 消费者命令行操作
- 04. Kafka 命令行工具有哪些常用的命令?
- 05. 如何创建一个 Kafka 主题?
- 06. 如何列出 Kafka 中所有的主题?
- 07. 如何向 Kafka 主题发送消息?
- 08. 如何从 Kafka 主题消费消息?
- 09. Java kafka 实现消息的发送和订阅
- 10. SpringBoot kafka 实现消息的生产和订阅
01. kafka 主题命令行操作
① 创建主题:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --create --partitions 3 --replication-factor 2 --topic test
[root@localhost kafka-01]# bin/kafka-topics.sh
# 设置连接kafka broker主机名称和端口号
--zookeeper localhost:2182
# 创建主题
--create
# 处置分区数量
--partitions 3
# 设置副本数量1,副本数量需要小于集群服务器数量
--replication-factor 2
# 设置主题的名称
--topic test
② 查看主题详细描述:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --describe --topic test
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
③ 修改主题:修改分区数,分区数只能增加不能减少
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --alter --topic test --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic test currently has 3 partitions, 2 would not be an increase.
[2023-05-26 11:25:02,926] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test currently has 3 partitions, 2 would not be an increase.
(kafka.admin.TopicCommand$)
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --alter --topic test --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
④ 再次查看 test 主题的详情:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --describe --topic test
Topic:test PartitionCount:4 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test Partition: 3 Leader: 0 Replicas: 0,2 Isr: 0,2
⑤ 查看当前服务器中的所有 topic:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --list
test
⑥ 删除主题:
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
02. kafka 生产者命令行操作
[root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list 10.60.215.238:9092 --topic test
>hello kafa
>hello zhangsan
>hello lisi
>
03. kafka 消费者命令行操作
[root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server 10.60.215.238:9092 --topic test --from-beginning
hello lisi
hello kafa
hello zhangsan
04. Kafka 命令行工具有哪些常用的命令?
Kafka 命令行工具包括 kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh、kafka-configs.sh 等。其中,kafka-topics.sh 用于管理主题,kafka-console-producer.sh 和 kafka-console-consumer.sh 用于生产和消费消息,kafka-configs.sh 用于管理 Kafka 配置。
05. 如何创建一个 Kafka 主题?
可以使用 kafka-topics.sh 工具创建一个 Kafka 主题。例如,要创建一个名为 my-topic 的主题,可以使用以下命令:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
06. 如何列出 Kafka 中所有的主题?
可以使用 kafka-topics.sh 工具列出 Kafka 中所有的主题。例如,要列出 Kafka 中所有的主题,可以使用以下命令:
bin/kafka-topics.sh --list --zookeeper localhost:2181
07. 如何向 Kafka 主题发送消息?
可以使用 kafka-console-producer.sh 工具向 Kafka 主题发送消息。例如,要向名为 my-topic 的主题发送一条消息,可以使用以下命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
然后在命令行中输入要发送的消息。
08. 如何从 Kafka 主题消费消息?
可以使用 kafka-console-consumer.sh 工具从 Kafka 主题消费消息。例如,要从名为 my-topic 的主题消费消息,可以使用以下命令:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
然后命令行会输出从该主题中消费到的消息。
09. Java kafka 实现消息的发送和订阅
① 创建项目,添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.hh</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
② kafka 生产者发送消息:
public class CustomProducer01 {
public static void main(String[] args) {
// kafka生产者属性配置
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.60.215.238:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// kafka生产者发送消息,默认是异步发送方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka,你好,kafka");
try{
// 发送消息
kafkaProducer.send(producerRecord);
}catch (Exception e){
e.printStackTrace();
}
// 关闭资源
kafkaProducer.close();
}
}
③ 启动报错:java.net.ConnectException: Connection timed out: no further information
关闭防火墙:systemctl stop firewalld
④ 重新启动运行,cmd控制台,消费者接收到消息:
[root@localhost kafka-01]# bin/kafka-console-consumer.sh --bootstrap-server 10.60.215.238:9092 --topic test --from-beginning
hello zhangsan
hello lisi
hello kafa
hello,kafka,你好,kafka
③ kafka 消费者消费消息:
public class CustomConsumer01 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.60.215.238:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("test");
consumer.subscribe(topics);
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
控制台输出:
hello,kafka,你好,kafka
10. SpringBoot kafka 实现消息的生产和订阅
① 导入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.hh</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>org.apache.kafka</groupId>-->
<!-- <artifactId>kafka-clients</artifactId>-->
<!-- <version>3.0.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
② 在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=10.60.215.238:9092
spring.kafka.consumer.group-id=consumer-group
③ kafka 生产者:
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
④ kafka消费者:
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
⑤ 测试:
@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaProducerTest {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制台输出:
你好
在吗