kafka题集 - kafka 命令行操作面试题总结

news2025/1/24 17:47:18

文章目录

      • 01. kafka 主题命令行操作
      • 02. kafka 生产者命令行操作
      • 03. kafka 消费者命令行操作
      • 04. Kafka 命令行工具有哪些常用的命令?
      • 05. 如何创建一个 Kafka 主题?
      • 06. 如何列出 Kafka 中所有的主题?
      • 07. 如何向 Kafka 主题发送消息?
      • 08. 如何从 Kafka 主题消费消息?
      • 09. Java kafka 实现消息的发送和订阅
      • 10. SpringBoot kafka 实现消息的生产和订阅

01. kafka 主题命令行操作

在这里插入图片描述

① 创建主题:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --create --partitions 3 --replication-factor 2  --topic test
[root@localhost kafka-01]# bin/kafka-topics.sh
    # 设置连接kafka broker主机名称和端口号
    --zookeeper localhost:2182
    # 创建主题
    --create 
    # 处置分区数量
    --partitions 3 
    # 设置副本数量1,副本数量需要小于集群服务器数量
    --replication-factor 2 
    # 设置主题的名称
    --topic test

② 查看主题详细描述:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --describe --topic test
		Topic:test      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0

③ 修改主题:修改分区数,分区数只能增加不能减少

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --alter --topic test --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic test currently has 3 partitions, 2 would not be an increase.
[2023-05-26 11:25:02,926] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test currently has 3 partitions, 2 would not be an increase.
 (kafka.admin.TopicCommand$)
 
[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --alter --topic test --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

④ 再次查看 test 主题的详情:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --describe --topic test
Topic:test      PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: test     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 0,2

⑤ 查看当前服务器中的所有 topic:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --list
test

⑥ 删除主题:

[root@localhost kafka-01]# bin/kafka-topics.sh --zookeeper localhost:2182 --delete --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

02. kafka 生产者命令行操作

在这里插入图片描述

[root@localhost kafka-01]# bin/kafka-console-producer.sh --broker-list 10.60.215.238:9092 --topic test
>hello kafa
>hello zhangsan
>hello lisi
>

03. kafka 消费者命令行操作

在这里插入图片描述

[root@localhost kafka-01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.60.215.238:9092 --topic test --from-beginning
hello lisi
hello kafa
hello zhangsan

04. Kafka 命令行工具有哪些常用的命令?

Kafka 命令行工具包括 kafka-topics.sh、kafka-console-producer.sh、kafka-console-consumer.sh、kafka-configs.sh 等。其中,kafka-topics.sh 用于管理主题,kafka-console-producer.sh 和 kafka-console-consumer.sh 用于生产和消费消息,kafka-configs.sh 用于管理 Kafka 配置。

05. 如何创建一个 Kafka 主题?

可以使用 kafka-topics.sh 工具创建一个 Kafka 主题。例如,要创建一个名为 my-topic 的主题,可以使用以下命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

06. 如何列出 Kafka 中所有的主题?

可以使用 kafka-topics.sh 工具列出 Kafka 中所有的主题。例如,要列出 Kafka 中所有的主题,可以使用以下命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

07. 如何向 Kafka 主题发送消息?

可以使用 kafka-console-producer.sh 工具向 Kafka 主题发送消息。例如,要向名为 my-topic 的主题发送一条消息,可以使用以下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

然后在命令行中输入要发送的消息。

08. 如何从 Kafka 主题消费消息?

可以使用 kafka-console-consumer.sh 工具从 Kafka 主题消费消息。例如,要从名为 my-topic 的主题消费消息,可以使用以下命令:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

然后命令行会输出从该主题中消费到的消息。

09. Java kafka 实现消息的发送和订阅

① 创建项目,添加依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/>
    </parent>

    <groupId>com.hh</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

② kafka 生产者发送消息:

public class CustomProducer01 {
    public static void main(String[] args) {
        // kafka生产者属性配置
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.60.215.238:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // kafka生产者发送消息,默认是异步发送方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "hello,kafka,你好,kafka");
        try{
            // 发送消息
            kafkaProducer.send(producerRecord);
        }catch (Exception e){
            e.printStackTrace();
        }
        // 关闭资源
        kafkaProducer.close();
    }
}

③ 启动报错:java.net.ConnectException: Connection timed out: no further information

关闭防火墙:systemctl stop firewalld

④ 重新启动运行,cmd控制台,消费者接收到消息:

[root@localhost kafka-01]#  bin/kafka-console-consumer.sh --bootstrap-server 10.60.215.238:9092 --topic test --from-beginning
hello zhangsan
hello lisi
hello kafa
hello,kafka,你好,kafka

③ kafka 消费者消费消息:

public class CustomConsumer01 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.60.215.238:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test-consumer-group");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        ArrayList<String> topics = new ArrayList<>();
        topics.add("test");
        consumer.subscribe(topics);

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

控制台输出:

hello,kafka,你好,kafka

10. SpringBoot kafka 实现消息的生产和订阅

① 导入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/>
    </parent>

    <groupId>com.hh</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
<!--        <dependency>-->
<!--            <groupId>org.apache.kafka</groupId>-->
<!--            <artifactId>kafka-clients</artifactId>-->
<!--            <version>3.0.0</version>-->
<!--        </dependency>-->

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

② 在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=10.60.215.238:9092
spring.kafka.consumer.group-id=consumer-group

③ kafka 生产者:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}

④ kafka消费者:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

⑤ 测试:

@SpringBootTest
@RunWith(SpringRunner.class)
public class KafkaProducerTest {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

控制台输出:

你好
在吗

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/572788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

六种基本网络拓扑结构详解

目录 1、总线型网络拓扑结构 2、星型网络拓扑结构 3、环形网络拓扑结构 4、树型网络拓扑结构 5、网状网络拓扑结构 6、混合网络型拓扑结构 常见的网络拓扑结构有以下6种&#xff1a;1.总线型网络拓扑结构&#xff1b;2.星型网络拓扑结构&#xff1b;3.环形网络拓扑结构&a…

Oracle SQL 性能优化

向量I/O 回表 SQL 不一样&#xff0c;plan 一样的 除了统计信息&#xff0c;session 参数导致COST不对 历史执行计划 filter 不同于nest loop 会distinct 之类 放进PGA 不再是SGA中块访问了吧 sql profile fzw rman target / 慢的原因 降低驱动表的row source集 指定nl表的驱动…

EasyRecovery16绿色版安装下载及使用教程

如果你已经在下载了PC版本的EasyRecovery&#xff0c;那么该如何安装EasyRecovery呢&#xff1f;现在就呈上EasyRecovery教程&#xff0c;以便顺利完成安装。EasyRecovery不仅能够恢复多种类型的数据&#xff0c;更能够适用于不同媒体介质&#xff0c;其中包括计算机&#xff0…

Kylin从入门到精通以及案例实操系列

1、Kylin 基础知识 1.1、了解 Kylin 的基本概念、原理和架构 1.1.1、Kylin 定义 Apache Kylin是一个开源的分布式分析引擎&#xff0c;提供Hadoop/Spark之上的SQL查询接口及多维分析&#xff08;OLAP&#xff09;能力以支持超大规模数据&#xff0c;最初由eBay Inc开发并贡献…

书评 | 《新程序员005:开源深度指南 新金融背后的科技力量》

目录 书评 | 《新程序员005&#xff1a;开源深度指南 & 新金融背后的科技力量》 内容介绍 书籍优点 书评 书评 | 《新程序员005&#xff1a;开源深度指南 & 新金融背后的科技力量》 内容介绍 《新程序员005&#xff1a;开源深度指南&amp;新金融背后的科技力量》特…

八、Spring Cloud Alibaba-seata分布式事务

一、引言 1、事务简介 事务(Transaction)是访问并可能更新数据中各种数据项的一个程序执行单元(unit&#xff09;。在关系数据库中&#xff0c;一个事务由一组SQL语向组成。事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。 原子性(atomic…

用Java 的锁机制实现多线程售票案例

本文首发自「慕课网」&#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注"慕课网"及“慕课网公众号”&#xff01; 作者&#xff1a;王军伟Tech | 慕课网讲师 1. 前言 本文内容主要是使用 Java 的锁机制对多线程售票案例进行实现。售…

logback高级特性使用

一、业务需求 日志级别的分类 日志的级别分为&#xff1a; trace&#xff1a;微量&#xff0c;少许的意思&#xff0c;级别最低info&#xff1a;普通的打印信息debug&#xff1a;需要调试时候的关键信息打印warn&#xff1a;警告&#xff0c;不影响使⽤&#xff0c;但需要注…

windows系统python3.7版本pyspider安装

环境&#xff1a;很多的python版本都尝试过安装pyspider&#xff0c;网上多数让python3.6安装&#xff0c;说是这个环境是最佳的环境&#xff0c;测试安装最方便快捷&#xff0c;但是一直报result_worker starting…&#xff01;&#xff01;&#xff01;&#xff0c;卡死在界面…

软件测试技术才是王道,43岁照样拿到年薪70W+,太强了...

最近挺丧的&#xff0c; 可能是之前弦绷的有点紧&#xff0c;现在有点受不了了。 所以突然就泄了气&#xff0c;每天忙完工作的事后就躺在家里打游戏。其实感觉每年都有一段时间是这样丧的。所以我自己其实并不是特别努力的类型&#xff0c;我没办法一直绷着弦的去卷&#xff0…

0基础想入门互联网选择什么好?

互联网岗位划分 研发&#xff1a;技术岗&#xff0c;需要有相关的专业知识。 测试&#xff1a;技术岗&#xff0c;通过相关的程序查找产品中相应的bug。 设计&#xff1a;需要美术素养。 产品经理&#xff1a;设计制定产品的原型&#xff0c;制定每个功能的需求以及输出相应…

论文解读 | IROS 2022:MV6D:在RGB-D图像上使用深度逐点投票网络进行多视角6D姿态估计

原创 | 文 BFT机器人 01 研究背景 在计算机视觉领域&#xff0c;6D姿态估计是一种重要的任务&#xff0c;用于确定物体在3D空间中的位置和方向。它在许多应用领域具有广泛的应用&#xff0c;如机器人操作、虚拟现实、增强现实、物体跟踪等。 然而&#xff0c;传统的6D姿态估计方…

Jmeter实现分布式并发

Jmeter实现分布式并发&#xff0c;即使用远程机执行用例。 环境&#xff1a; VMware Fusion Windows系统是win7。 操作过程 1、Master在jmeter.properties添加remote_hosts 2、Slave在jmeter.properties添加server_port 同时把remote_hosts修改为和主机&#xff08;Master…

超100篇! VAD论文梳理汇总!

GitHub的一位博主整理了上百篇语音活动检测&#xff08;VAD&#xff09;的论文&#xff0c;按照其中使用的特征方法以及适用的环境进行了分类整理&#xff0c;时间跨度为从198*年至2019年。此外&#xff0c;还提供了几个VAD代码&#xff0c;它们的性能表现较好。需要的同学可以…

我的创作纪念日---[需要更开阔的视野!]

文章目录 头绪收获日常 憧憬英语人工智能 希望 头绪 工作很长时间之后&#xff0c;才发现知识的根本&#xff0c;还是在于积累。俗话说好记性不如烂笔头。不管是特定产品相关的知识还是系统类的知识&#xff0c;又或者是语言类的知识&#xff0c;都有很多知识点需要积累。有了…

不会数据分析?无从下手?一文帮你打开数据分析思路

掌握了很多数据分析工具和技能&#xff0c;却依然做不好数据分析。 面对具体的业务问题&#xff0c;我们还是容易两眼一抹黑&#xff1f;除了数据和专业之外&#xff0c;还需要一定的方法论支撑。 文章有点长&#xff08;误区解释方法论分享&#xff09;但干货满满&#xff0c…

药用辅料数据查询网站系统-药品辅料数据

药用辅料是指在制药过程中&#xff0c;用于增加药品稳定性、改善口感、提高吸收率等功效的辅助材料。药用辅料的种类繁多&#xff0c;不同的药品需要使用不同的辅料&#xff0c;因此对于药企来说&#xff0c;了解并选用适合自己的药用辅料显得尤为重要。本文将介绍如何利用药用…

jvm之对象大小分析

写在前面 本文看下计算对象大小相关内容。 1&#xff1a;基础内容 1.1&#xff1a;对象的结构 一个对象由对象头和对象体组成&#xff0c;其中对象头包含如下内容&#xff1a; 标记字&#xff08;mark word&#xff09;&#xff1a;存放GC年龄信息&#xff0c;对象锁信息等…

Hightopo 使用心得(1)- 基本概念

Hightopo 公司 3D 可视化产品有对应的官方手册。但是这些手册内容比较多。对于想学习的新同学来说可能相对比较繁琐。这里本人根据个人使用经验做了一些总结。希望对读者有所帮助。 官方手册地址&#xff1a;Structure (hightopo.com) 本文会提到一些前端开发的概念&#xff…

pdf怎么压缩得小一点?软件压缩更高效

PDF可以在不同操作系统和设备上实现高保真的排版和格式化。然而&#xff0c;随着文档的不断增多和文件大小的增加&#xff0c;传输和存储PDF文件也变得越来越困难。为了解决这个问题&#xff0c;可以使用PDF压缩技术来减小文件大小&#xff0c;提高传输效率。本文将介绍PDF压缩…