# Kafka_深入探秘者(4):kafka 主题 topic

news2025/1/21 23:27:09

Kafka_深入探秘者(4):kafka 主题 topic

一、kafka 主题管理

1、kafka 创建主题 topic 命令

1)命令:


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 创建一个名为 heima 的 主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1

2)命令 参数说明:

–zookeeper :指定了 kafka 所连接的 zookeeper 服务地址。zookeeper必传参数,多个zookeeper用’,"分开。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数,每个线程处理一个分区数据。
–replication-factor : 指定了副本因子,用于设置主题副本数,每个副本分布在不通节点,不能超过总结点数。如你只有一个节点,但是创建时指定副本数为2,就会报错
–create : 创建主题的动作指令。

2、查看 topic 元数据信细的方法:

topic 元数据信息保存在 Zookeeper 节点中,


# 切换到 zookeeper 安装目录
cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin

# 连接 zookeeper 查看元数据
bin/zkCli.sh -server localhost:2181

# 查看 节点 信息:
get /brokers/topics/heima

在这里插入图片描述

3、展示出当前所有主题


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 展示出当前所有主题
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --list --zookeeper 172.18.30.110:2181

4、 查看主题详情:


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima

5、 修改 主题 topic 配置(增加配置)


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 修改 主题 topic 配置(增加配置)
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --config flush.messages=1

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --config flush.messages=1

6、 删除 主题 topic 配置 flush.messages=1


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 删除 主题 topic 配置 flush.messages=1
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --delete-config flush.messages

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --delete-config flush.messages

7、 删除 topic 主题

若 delete.topic.enable=true 直接彻底删除该 Topic 主题。

若 delete.topic.enable=false
如果当前 Topic 没有使用过,即没有传输过信息:可以彻底删除。
如果当前 Topic 有使用过,即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标记为删除(marked fordeletion),重启 Kafka Server 后删除。


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 删除 topic 主题名为 heima 的主题
bin/kafka-topics.sh  --delete --zookeeper localhost:2181 --topic heima 

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --delete --zookeeper 172.18.30.110:2181 --topic heima

在这里插入图片描述

二、kafka 分区

1、kafka 增加分区


# 切换到 kafka 安装目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 增加 topic 主题名为 heima 的主题的 分区为 3 个
bin/kafka-topics.sh  --alter --zookeeper localhost:2181 --topic heima --partitions 3 

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh  --alter --zookeeper 172.18.30.110:2181 --topic heima --partitions 3

# 查看 heima 主题详情:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic heima

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --describe --zookeeper 172.18.30.110:2181 --topic heima

在这里插入图片描述

2、其他主题参数配置

见官方文档: http://kafka.apache.org/documentation/#topicconfigs


bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 1 \
  --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
  
  
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
  --alter --add-config max.message.bytes=128000 
  
  
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe  


bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
  --alter --delete-config max.message.bytes

三、kafka 主题 topic 总结

1、KafkaAdminClient 应用

我们都习惯使用 Kafka 中 bin 目录下的脚本工具来管理査看 Kafka,但是有些时候需要将某些管理查看的功能集成到系统 (比如Kafka Manager)中,那么就需要调用一些 API 来直接操作 Kafka 了。

2、在 kafka_learn 工程中,创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用


/**
 *  D:\java-test\idea2019\kafka_learn\src\main\java\djh\it\kafka\learn\chapter3\KafkaAdminConfigOperation.java
 *
 *  2024-6-22 创建 KafkaAdminConfigOperation.java 类, KafkaAdminClient 应用
 */
package djh.it.kafka.learn.chapter3;

import kafka.controller.NewPartition;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaAdminConfigOperation {

    public static void main( String[] args ) throws ExecutionException, InterruptedException {
        describeTopicConfig();   //获取主题详细信息
//        alterTopicConfig();    //添加主题
//        addTopicPartitions();  //添加分区
    }
    private static void addTopicPartitions() throws ExecutionException, InterruptedException {
        String brokerList = "172.18.30.110:9092";
        String topic = "heima";

        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        AdminClient client = AdminClient.create(props);

        NewPartitions newPartitions = NewPartitions.increaseTo(4);

        Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
        newPartitionsMap.put(topic,newPartitions);
        CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
        result.all().get();

        client.close();
    }

    private static void describeTopicConfig() throws ExecutionException, InterruptedException {
        String brokerList = "172.18.30.110:9092";
        String topic = "heima";

        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        AdminClient client = AdminClient.create(props);

        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
        Config config = result.all().get().get(resource);
        System.out.println(config);
        client.close();
    }
}

在这里插入图片描述

上一节关联链接请点击
# Kafka_深入探秘者(3):kafka 消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1857611.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java项目:基于SSM框架实现的电子竞技管理平台【ssm+B/S架构+源码+数据库+毕业论文】

一、项目简介 本项目是一套基于SSM框架实现的电子竞技管理平台 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能…

最新AI智能聊天对话问答系统源码(图文搭建部署教程)+AI绘画,文生图,TTS语音识别输入,文档分析

一、人工智能语言模型和AI绘画在多个领域广泛应用 人工智能语言模型和AI绘画在多个领域都有广泛的应用。以下是一些它们的主要用处&#xff1a; 人工智能语言模型 内容生成 写作辅助&#xff1a;帮助撰写文章、博客、报告、剧本等。 代码生成&#xff1a;自动生成或补全代码&…

python项目加密和增加时间许可证

1.bat&#xff0c;执行如下的命令&#xff0c;第一句是更新或增加许可证 第二句是加密draw_face.py python offer.py pyarmor obfuscate -O dist draw_face.py绘制自制人脸.py&#xff0c;调用加密的代码draw_face代码 import sys import os import cv2# 添加加密模块所在的路…

国内顶级汽车制造厂的创新实践:如何利用实时数据湖为更多业务提供新鲜数据?

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量代替 OGG、DSG 等同步工具&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业将真正具有业务价值的数据作用到实处&#xff0c…

如何在服务器之间同步文件?

业务需求 因业务需求需要在多台服务器之间做文件资源的双向同步&#xff0c;选择 ownCloud davfs2 rsync 来实现 ownCloud ownCloud 是一个开源免费专业的私有云存储项目&#xff0c;它能帮你快速在个人电脑或服务器上架设一套专属的私有云文件同步网盘。 ownCloud 能让你…

ArkTS开发系列之导航 (2.5.2 页面组件导航)

上篇回顾: ArkTS开发系列之导航 (2.5.1 页面路由&#xff09; 本篇内容&#xff1a;主要学习页面内组件导航 一、 知识储备 1. Navigation 一般作为页面的根容器&#xff0c;包括单页面、分栏和自适应三种显示模式。 自适应模式 (NavigationMode.Auto) &#xff0c;需要注意…

三相变压器:应用和连接配置

变压器的功能和应用 变压器的类型和用途多种多样&#xff0c;可根据其应用、结构类型和尺寸进行分类。 一般来说&#xff0c;变压器的主要功能是改变交流电&#xff08;AC&#xff09;的电压水平&#xff0c;提高电压以供长距离传输或降低电压以供家庭和工业消费者使用。 它…

优先级队列模拟实现

目录 1.堆的概念 2.堆性质堆中的某个元素小于或大于他的左右孩子 3.小根堆实例 4.堆创建 4.1调整思路 4.2向下调整思路 4.3代码实现&#xff08;大根堆&#xff09; 5.堆的删除 6.堆的插入 7.常用接口 7.1PriorityQueue和PriorityBlockingQueue 1.堆的概念 如果有一…

常见硬件工程师面试题(二)

大家好&#xff0c;我是山羊君Goat。 对于硬件工程师&#xff0c;学习的东西主要和电路硬件相关&#xff0c;所以在硬件工程师的面试中&#xff0c;对于经验是十分看重的&#xff0c;像PCB设计&#xff0c;电路设计原理&#xff0c;模拟电路&#xff0c;数字电路等等相关的知识…

微服务(服务治理)

服务远程调用时存在的问题 注册中心原理 服务治理中的三个角色分别是什么&#xff1f; 服务提供者&#xff1a;暴露服务接口&#xff0c;供其它服务调用服务消费者&#xff1a;调用其它服务提供的接口注册中心&#xff1a;记录并监控微服务各实例状态&#xff0c;推送服务变更信…

软件工程体系概念

软件工程 软件工程是应用计算机科学、数学及 管理科学等原理开发软件的工程。它借鉴 传统工程的原则、方法&#xff0c;以提高质量&#xff0c;降 低成本为目的。 一、软件生命周期 二、软件开发模型 1.传统模型 瀑布模型、V模型、W模型、X 模型、H 模型 (1)瀑布模型 瀑布…

材料科学SCI期刊,中科院2区,影响因子4.7

一、期刊名称 Progress in Natural Science-Materials International 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;材料科学 影响因子&#xff1a;4.7 中科院分区&#xff1a;2区 三、期刊征稿范围 由中国材料研究会负责的同行评议 由中国材料研究会&…

java8 将对象list中的某一个属性取出组成一个list

实体类 public class Sp {String spdm;String spmc;public Sp() {}public Sp(String spdm, String spmc) {this.spdm spdm;this.spmc spmc;}public String getSpdm() {return spdm;}public void setSpdm(String spdm) {this.spdm spdm;}public String getSpmc() {return sp…

安美数字酒店宽带运营系统——命令执行漏洞(CNVD-2021-37784)

声明&#xff1a;本文档或演示材料仅供教育和教学目的使用&#xff0c;任何个人或组织使用本文档中的信息进行非法活动&#xff0c;均与本文档的作者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 安美数字酒店宽带运营系统 server_ping.php 存在远程命令执行漏洞&#…

Java-LinkedList和ArrayList的区别、Get/Add操作性能分析以及常见的遍历方式

LinkedList和ArrayList的区别、Get/Add操作性能分析以及常见的遍历方式 一、LinkedList基本特性主要方法 二、ArrayList初始化及基本操作ArrayList注意点&#xff08;待完善&#xff09;代码示例 三、ArrayList与LinkedList的区别四、Get/Add操作性能分析五、LinkedList遍历方式…

藏在十九页PPT里的“海合安之道”

6月6日&#xff0c;成立仅仅两年多的海合安集团亮相2024中国主题公园战略营销峰会&#xff0c;作为本届峰会最年轻的主题公园企业&#xff0c;备受行业关注。 海合安集团成立于2021年&#xff0c;为亚洲最大私募投资基金之一的安博凯投资基金&#xff08;MBK Partners&#xf…

Java 编程语言:过去、现在与未来

引言 自 1995 年由 Sun Microsystems 发布以来&#xff0c;Java 编程语言已经走过了漫长的道路。作为一种面向对象的编程语言&#xff0c;Java 因其“一次编写&#xff0c;到处运行”的理念而广受欢迎。本文将探讨 Java 的历史、主要特点、应用领域以及未来的发展趋势。 Java…

MacBook Pro 忘记root用户密码,重置密码步骤

一、以普通用户名登录系统&#xff0c;并打开terminal终端&#xff0c; 输入&#xff1a;sudo bash sudo bash Password:*****&#xff08;输入当前用户的密码&#xff09; 成功后进入bash-3.2#的命令模式 二、在bash-3.2#命令模式下 输入&#xff1a;sudo passwd root sud…

春招面试面经总结篇

目录 前言一&#xff0c;算法篇1.1 平拍数组1.2 括号匹配1.3 打家劫舍1.4 删除最少使字符串平衡1.5 爬楼梯 二&#xff0c;数据结构篇2.1 二叉树2.2 链表 三&#xff0c;HTML篇3.1 H5新的语义标签3.2 href和src 四&#xff0c;CSS篇4.1 居中4.2 父元素塌陷解决4.3 外边距塌陷4.…

基于SpringBoot的实习管理系统设计与实现

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a; Java 数据库&#xff1a; MySQL 技术&#xff1a; SpringBoot框架&#xff0c;B/S模式 工具&#xff1a; MyEclipse&#xff0c;Tomcat 系统展示 …