Kafka-TopicPartition

news2024/11/28 22:36:20

Kafka主题与分区

主题与分区

topic & partition,是Kafka两个核心的概念,也是Kafka的基本组织单元。 主题作为消息的归类,可以再细分为一个或多个分区,分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水平扩展性、容错性等优势。 分区可以有一个至多个副本,每个副本对应一个日志文件,每个日志文件对应一至多个日志分段(LogSegment),每个日志分段还可以细分为索引文件、日志存储文件和快照文件等

主题的管理

主题的管理

  • 创建主题

  • 查看主题信息

  • 修改主题

  • 删除主题

上述操作可以采用Kafka提供的kafka-topics.sh脚本来完成,也可以采用Kafka提供的AdminClient来完成。 该脚本位于¥KAFKA_HOME/bin目录下 image

创建主题

创建主题的命令格式如下:

kafka-topics.sh --bootstrap-server <server:port> \
    --create --topic <topic> \
    --partitions <numPartitions> \
    --replication-factor <replicationFactor>

创建一个分区数为4、副本因子为2的主题

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create \
    --partitions 4 \
    --replication-factor 2

创建一个分区数为4、副本因子为2的主题,并且指定主题的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create \
    --partitions 4 \
    --replication-factor 2 \
    --config max.message.bytes=128000

通过describe指令来查看分区副本的分配细节

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

使用replica-assignment参数手动指定分区副本的分配方案

使用这种方式根据分区号的数值大小按照从小到大的顺序进行排列

例如:0:1:2,0:1:2,0:1:2,0:1:2

  • 分区与分区之间用逗号分隔

  • 分区与副本之间用冒号分隔

kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic topic-create-same \
    --replica-assignment 0:1:2,0:1:2,0:1:2,0:1:2

注意:

  • 同一个分区内的副本不能有重复,比如0:0,1:1这样,就会报出AdminCommandFailedException异常

  • 分区之间所指定的副本数不同,比如0:0,1:1这样,就会报出AdminOperationException异常

主题命名规范

  • 主题名称只能包含ASCII字母、数字、点、减号和下划线

  • 主题名称长度不能超过249个字符

  • 主题名称不能以点开头

  • 不能以__开头,这是Kafka内部使用的主题前缀

  • 不能包含空格、单引号、双引号、逗号、分号、冒号和NULL字符

  • 主题名称应该全部小写,因为Kafka在区分主题名称时是不区分大小写的

  • 主题名称不能与Kafka保留的名称冲突,比如__consumer_offsets

  • 主题名称不能与已经存在的消费者组名称冲突

  • 主题名称不能与已经存在的主题名称冲突

查看主题信息

通过list指令来查看当前Kafka集群中所有可用的主题

kafka-topics.sh --bootstrap-server localhost:9092 --list

image

通过describe指令来查看主题的详细信息

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic-create

image

修改主题

当主题被创建之后,依然允许我们对其做一定的修改,比如修改分区数、修改副本因子、修改配置等。 通过alter指令来修改主题的配置信息

# 修改主题的最大消息字节数,配置值从10000修改为20000

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-config \
    --config max.message.bytes=20000

通过alter指令来修改主题的分区数

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-create \
    --partitions 6

删除主题

通过delete指令来删除主题

kafka-topics.sh --bootstrap-server localhost:9092 \
    --delete --topic topic-delete

通过delete-config参数来删除之前设置的配置信息

kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic topic-config \
    --delete-config max.message.bytes

手动删除主题

  • 主题中的元数据存储在Zookeeper中的/brokers/topics和/config/topics路径下

  • 主题中的消息数据存储在log.dir或log.dirs配置的路径下,只需要手动删除这些地方的数据即可。

配置管理

kafka-configs.sh脚本用于管理Kafka的配置信息,该脚本位于$KAFKA_HOME/bin目录下 主要包含变更配置alter和查看配置describe两个指令

# 变更主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --entity-type topics --entity-name topic-config \
    --add-config max.message.bytes=128000

# 添加主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --entity-type topics --entity-name topic-config \
    --add-config max.message.bytes=128000

# 查看主题的配置信息
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type topics --entity-name topic-config    

KafkaAdminClient

KafkaAdminClient是Kafka提供的一个管理客户端,用于管理Kafka集群中的资源,比如主题、分区、消费者组等。

TopicCommand基本使用

使用KafkaAdminClient来完成TopicCommand的基本操作

查看主题信息

public class demo{
    public static void describeTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--describe",
                "--topic", "topic-create"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

创建主题

public class demo{
    public static void createTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--create",
                "--replication-factor", "1",
                "--partitions", "1",
                "--topic", "topic-create-api"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

查看所有可用主题

public class demo{
    public static void listTopic(){

        String[ ] options = new String[ ]{

                "--bootstrap-server localhost:9092",
                "--list"
        };
        kafka.admin.TopicCommand.main(options);
    }
}

KafkaAdminClient基本使用

KafkaAdminClient可以用来管理broker、配置和ACL(Access Control List),以及管理主题、分区和消费者组等。 KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient,提供了一系列的API来管理Kafka集群中的资源。

AdminClient常见的方法

  • createTopics:创建主题

    • CreateTopicsResult createTopics(Collection newTopics)
  • deleteTopics:删除主题

    • DeleteTopicsResult deleteTopics(Collection topics)
  • listTopics:列出所有可用的主题

    • ListTopicsResult listTopics()
  • describeTopics:查看主题的详细信息

    • DescribeTopicsResult describeTopics(Collection topicNames)
  • describeCluster:查看集群的详细信息

    • DescribeClusterResult describeCluster()
  • describeConfigs:查看配置的详细信息

    • DescribeConfigsResult describeConfigs(Collection resources)
  • alterConfigs:修改配置信息

    • AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs)
  • describeConsumerGroups:查看消费者组的详细信息

    • DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds)
  • listConsumerGroups:列出所有可用的消费者组

    • ListConsumerGroupsResult listConsumerGroups()
  • createPartitions:创建分区

    • CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions)
使用KafkaAdminClient创建主题
public class KafkaAdminClientCreateTopic {
    /**
     * 使用AdminClient创建Topic
     *
     * 创建完成之后使用如下脚本进行检查
     * 进入KAFKA_HOME/bin
     * 执行 ./kafka-topics.sh --bootstrap-server localhost:9092 --list
     */
    public static void createTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        NewTopic newTopic = new NewTopic("topic-create-api", 1, (short) 1);
        // 创建主题的方法内部是通过发送CreateTopicRequest请求来完成的
        CreateTopicsResult result = adminClient.createTopics(Arrays.asList(newTopic));
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        createTopic();
    }
}
使用KafkaAdminClient查看主题信息
public class KafkaAdminClientDescribeTopic {
    /**
     * 使用AdminClient查看Topic信息
     */
    public static void describeTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("topic-create-api"));
        try {
            Map<String, TopicDescription> map = result.all().get();
            for (Map.Entry<String, TopicDescription> entry : map.entrySet()) {
                System.out.println(entry.getKey() + " : " + entry.getValue());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        describeTopic();
    }
}
使用KafkaAdminClient查看所有可用的主题
public class KafkaAdminClientListTopic {
    /**
     * 使用AdminClient查看所有可用的Topic
     */
    public static void listTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ListTopicsResult result = adminClient.listTopics();
        try {
            Set<String> set = result.names().get();
            for (String s : set) {
                System.out.println(s);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        listTopic();
    }
}
使用KafkaAdminClient创建分区
public class KafkaAdminClientCreatePartition {
    /**
     * 使用AdminClient创建分区
     */
    public static void createPartition(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        Map<String, NewPartitions> map = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(2);
        map.put("topic-create-api", newPartitions);
        CreatePartitionsResult result = adminClient.createPartitions(map);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        createPartition();
    }
}
使用KafkaAdminClient删除主题
public class KafkaAdminClientDeleteTopic {
    /**
     * 使用AdminClient删除Topic
     */
    public static void deleteTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        DeleteTopicsResult result = adminClient.deleteTopics(Arrays.asList("topic-create-api"));
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        deleteTopic();
    }
}
使用KafkaAdminClient修改主题配置
public class KafkaAdminClientAlterTopic {
    /**
     * 使用AdminClient修改Topic配置
     */
    public static void alterTopic(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ConfigEntry configEntry = new ConfigEntry("max.message.bytes", "128000");
        Config config = new Config(Arrays.asList(configEntry));
        Map<ConfigResource, Config> map = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
        map.put(configResource, config);
        AlterConfigsResult result = adminClient.alterConfigs(map);
        try {
            result.all().get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        alterTopic();
    }
}
使用KafkaAdminClient查看主题配置
public class KafkaAdminClientDescribeTopicConfig {
    /**
     * 使用AdminClient查看Topic配置
     */
    public static void describeTopicConfig(){
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(props);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "topic-create-api");
        DescribeConfigsResult result = adminClient.describeConfigs(Arrays.asList(configResource));
        try {
            Map<ConfigResource, Config> map = result.all().get();
            for (Map.Entry<ConfigResource, Config> entry : map.entrySet()) {
                System.out.println(entry.getKey() + " : " + entry.getValue());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        // 使用完之后需要关闭AdminClient,释放资源
        adminClient.close();
    }


    public static void main(String[ ] args) {

        describeTopicConfig();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1254139.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RocketMq 主题(TOPIC)生产级应用

RocketMq是阿里出品&#xff08;基于MetaQ&#xff09;的开源中间件&#xff0c;已捐赠给Apache基金会并成为Apache的顶级项目。基于java语言实现&#xff0c;十万级数据吞吐量&#xff0c;ms级处理速度&#xff0c;分布式架构&#xff0c;功能强大&#xff0c;扩展性强。 官方…

Vue框架学习笔记——事件scroll和wheel的区别

文章目录 前文提要滚动条滚动事件 scroll鼠标滚动事件 wheel二者不同点 前文提要 本人仅做个人学习记录&#xff0c;如有错误&#xff0c;请多包涵 滚动条滚动事件 scroll scroll事件绑定html页面中的指定滚动条&#xff0c;无论你拖拽滚动条&#xff0c;选中滚动条之后按键盘…

学生信息管理系统程序Python

系统主界面 在该界面中可以选择要使用功能对应的菜单进行不同的操作。在选择功能菜单时&#xff0c;有两种方法&#xff0c; 一种是输入1&#xff0c;另一种是按下键盘上的↑或↓方向键进行选择。这两种方法的结果是一样的&#xff0c;所以使用哪种方法都可以。 &#xff08;…

【Docker】安装MySQL 通俗易懂 亲测没有任何问题

目录 1.拉取镜像 2.运行容器 3.创建mysql配置文件 4.测试 1.拉取镜像 dockerhub官网&#xff1a;Docker 如果需要其他版本mysql docker pull mysql:xxx&#xff08;版本&#xff09; docker pull mysql #默认拉取最新版本 latest 2.运行容器 docker run -d -p 3306:33…

《大江大河2》最触动我的两段经典对话

第一部分 默默的关注了《大江大河》&#xff0c;第2部依旧保持了和第1部的水准&#xff0c;难得看到这么良心的电视剧。 相比于原剧本来说&#xff0c;我更喜欢改编后的内容&#xff0c;剧情更加凝重&#xff0c;深刻&#xff0c;关键是真实。 宋运辉开始是不认识路司长的&…

前端web开发学习笔记

JavaWeb 前端Web开发HTMLCSSjavaScript1.JS引入2.JS基础语法3.JS函数4.JS对象 BOMDOM文档对象模型JS事件监听VueVue常用指令Vue的生命周期 AjaxAxios 前端工程化环境准备NodeJS安装和Vue-cli安装vue项目Vue组件库Element组件的使用 Vue路由Nginx打包部署 前端Web开发 HTML 负…

4、浏览器插件配置使用

文章目录 一、Hackbar1. Load和Execute功能的使用2. Split功能的使用3. Post功能的使用4. 编码功能的使用 二、FoxyProxy1、设置Burpsuite的代理服务端口2、FoxyProxy插件的简单使用 三、User-Agent Switcher 一、Hackbar 火狐浏览器中按下F12键启用hackbar。 1. Load和Execut…

bit_set位图|布隆过滤器

位图 对于海量整形数据的处理&#xff0c;通常是上百个G的代码。 通常有如下的应用&#xff1a; 1. 快速查找某个数据是否在一个集合中 2. 排序 去重 3. 求两个集合的交集、并集等 4. 操作系统中磁盘块标记 如果将数据加载到内存中&#xff0c;运用基本数据结构处理&…

foobar2000 突然无法正常输出DSD信号

之前一直在用foobar2000加外置dac听音乐&#xff0c;有一天突然发现听dsd的时候&#xff0c;dac面板显示输出的是PCM格式信号&#xff0c;而不是DSD信号&#xff0c;这让我觉得很奇怪&#xff0c;反复折腾了几次&#xff0c;卸载安装驱动什么的&#xff0c;依然如此&#xff0c…

Kubernetes技术与架构-配置

一般情况下&#xff0c;Kubernetes使用yaml文件格式定义配置文件&#xff0c;配置文件须指定对应的API稳定版本号&#xff0c;将配置文件进行版本控制、在发布新版本的过程中出问题时可以执行版本回滚操作&#xff0c;将相关联的对象定义在同一个配置文件中、从而更容易地管理&…

springboot宠物店管理系统-计算机毕设 附源码 32041

SpringBoot宠物店管理系统 摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;宠物行业当然也不例外。宠物店管理系统是以实际运用为开发背景&#xff0c;运用软件工程原理…

Failed to initialize NVML: Driver/library version mismatch

做一个项目的时候&#xff0c;发现vscode一开始训练就报如下错误 RuntimeError: CUDA out of memory. Tried to allocate 916.00 MiB (GPU 0; 6.00 GiB total capacity; 4.47 GiB already allocated; 186.44 MiB free; 4.47 GiB reserved in total by PyTorch) 正常来说这是…

2023-3年CSDN创作纪念日

机缘 今天开开心心出门去上班&#xff0c;就收到了一个csdn私信&#xff0c;打开一看说是给我惊喜来着&#xff0c;我心想csdn还能给惊喜&#xff1f;以为是有什么奖品或者周边之类的&#xff0c;结果什么也没有&#xff0c;打开就是一份信&#x1f602;。 也挺不错的&#xf…

【理解ARM架构】 散列文件 | 重定位

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《理解ARM架构》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 目录 &#x1f3d3;引出重定位&#x1f3d3;散列文件&#x1f3d3;可读可写数据段重定位&#…

Kibana部署

服务器 安装软件主机名IP地址系统版本配置KibanaElk10.3.145.14centos7.5.18042核4G软件版本&#xff1a;nginx-1.14.2、kibana-7.13.2-linux-x86_64.tar.gz 1. 安装配置Kibana &#xff08;1&#xff09;安装 [rootelk ~]# tar zxf kibana-7.13.2-linux-x86_64.tar.gz -C…

leetcode算法之链表

目录 1.两数相加2.两两交换链表中的节点3.重排链表4.合并K个升序链表5.K个一组翻转链表 1.两数相加 两数相加 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(…

针对c语言的scanf读取字符和字符串解析

在scanf函数中&#xff0c;格式字符串里的空格字符有特定的作用。 当你在格式字符串里放置一个空格时&#xff0c;scanf会尝试匹配并消耗输入中的一个或多个空白字符&#xff08;包括空格、制表符或换行符&#xff09;。换句话说&#xff0c;它会跳过任何空白字符&#xff0c;…

针对哈希冲突的解决方法

了解哈希表和哈希冲突是什么 哈希表&#xff1a;是一种实现关联数组抽象数据类型的数据结构&#xff0c;这种结构可以将关键码映射到给定值。简单来说哈希表&#xff08;key-value&#xff09;之间存在一个映射关系&#xff0c;是键值对的关系&#xff0c;一个键对应一个值。 …

蓝桥杯第四场双周赛(1~6)

1、水题 2、模拟题&#xff0c;写个函数即可 #define pb push_back #define x first #define y second #define int long long #define endl \n const LL maxn 4e057; const LL N 5e0510; const LL mod 1e097; const int inf 0x3f3f; const LL llinf 5e18;typedef pair…

java_基础_关键字

1.关键字的字母全部都是小写. 2.常用的代码编辑器(Notepad),针对关键字有特殊的颜色标记,非常的直观.