kafka 命令脚本说明以及在java中使用

news2025/1/12 5:55:48

一、命令行使用

1.1、topic 命令

1、关于topic,这里用window 来示例

bin\windows\kafka-topics.bat

在这里插入图片描述

2、创建 first topic,五个分区,1个副本

bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

在这里插入图片描述
3、查看当前服务器中的所有 topic

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

在这里插入图片描述

4、查看 first 主题的详情

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

在这里插入图片描述
5、修改分区数**(注意:分区数只能增加,不能减少)**

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

在这里插入图片描述

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 datakafka-logs 这两个文件中的内容,再次重新启动即可。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

在这里插入图片描述

1.2、生产者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-producer.bat

在这里插入图片描述

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

1.3、消费者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-consumer.bat

在这里插入图片描述
在这里插入图片描述

2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

在这里插入图片描述

3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

在这里插入图片描述

1.4、脚本说明

项目Value
connect-standalone.sh用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh用于管理所有TOPIC。
kafka-console-producer.sh用于生产消息。
kafka-console-consumer.sh用于消费消息。
kafka-producer-perf-test.sh用于生产者性能测试
kafka-consumer-perf-test.sh用于消费者性能测试
kafka-delete-records.sh用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。
kafka-dump-log.sh用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。
kafka-log-dirs.sh用于查询各个Broker上的各个日志路径的磁盘占用情况。
kafka-mirror-maker.sh用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。
kafka-reassign-partitions.sh用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh用于执行任何带main方法的Kafka类。
kafka-server-start.sh用于启动Broker进程。
kafka-server-stop.sh用于停止Broker进程。
kafka-streams-application-reset.sh用于给Kafka Streams应用程序重设位移,以便重新消费数据。
kafka-verifiable-producer.sh用于测试验证生产者的功能。
kafka-verifiable-consumer.sh用于测试验证消费者功能。
trogdor.sh是Kafka的测试框架,用于执行各种基准测试和负载测试。
kafka-broker-api-versions.sh脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性

1.5、关闭kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

在这里插入图片描述

1.6、选择分区数及kafka性能测试

1、主要工具是 kafka-producer-perf-test.batkafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试

二、java 使用

2.1、使用原生客户端

1、依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、发送和消费消息,具体代码如下:

public class KafkaConfig {
 
    public static void main(String[] args) {
        // 声明主题
        String topic = "first";
        // 创建消费者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 订阅主题并循环拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 创建生产者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 给主题发送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

2.2、使用springBoot

1、依赖

 <!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 -->
            <!--            <version>3.0.2</version>-->
        </dependency>

2、配置文件

server:
  port: 7280
  servlet:
    context-path: /thermal-emqx2kafka
  shutdown: graceful

spring:
  application:
    name: thermal-api-demonstration-tdengine
  lifecycle:
    timeout-per-shutdown-phase: 30s
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher  # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298
  kafka:
    bootstrap-servers: 127.0.0.1:9092  # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094  连接的 Kafka Broker 主机名称和端口号
    #properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializer
    producer: # 生产者
      retries: 3 # 重试次数
      #acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      #batch-size: 16384 # 一次最多发送数据量
      #buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改
      #enable-auto-commit: true # 是否自动提交offset
      #auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
      #auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

package cn.jt.thermalemqx2kafka.kafka.controller;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/mock")
    public String sendKafkaMessage() {
        Map<String, Object> data = new HashMap<>(2);
        data.put("id", 1);
        data.put("name", "gkj");
        kafkaTemplate.send("first", JSON.toJSONString(data));
        return "ok";
    }
}

4、接受消息

package cn.jt.thermalemqx2kafka.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = "first")
    private void handler(String content) {
        log.info("consumer received: {} ", content);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nacos import com.alibaba.nacos.consistency.entity.ReadRequest

1. 异常情况 import com.alibaba.nacos.consistency.entity.ReadRequest; import com.alibaba.nacos.consistency.entity.Response; import com.alibaba.nacos.consistency.entity.WriteRequest; 2. 解决方法 安装插件&#xff0c;然后重新编译 记住选择Java8

从零做软件开发项目系列之十——项目运维

项目结项后的运维阶段是确保软件持续稳定运行、修复问题、满足用户需求的关键时期。在这个阶段&#xff0c;需要建立有效的维护制度&#xff0c;关注各种问题&#xff0c;并采取相应措施来保障系统的可靠性和可持续性。 1 运维团队 开展服务运维工作&#xff0c;首先需要组建运…

07.Knowing When to Look

目录 前言泛读摘要Introduction小结 精讲方法Encoder-Decoder框架 for Image CaptioningSpatial Attention ModelAdaptive Attention Model Implementation DetailsEncoder-CNNDecoder-RNNTraining details Related Work实验结论 前言 本课程来自深度之眼《多模态》训练营&…

【前端】Vue2 脚手架模块化开发 -快速入门

&#x1f384;欢迎来到边境矢梦的csdn博文&#x1f384; &#x1f384;本文主要梳理Vue2 脚手架模块化开发 &#x1f384; &#x1f308;我是边境矢梦&#xff0c;一个正在为秋招和算法竞赛做准备的学生&#x1f308; &#x1f386;喜欢的朋友可以关注一下&#x1faf0;&#x…

前端项目工程化之代码规范

目录 一、前言二、ESLint三、Prettier四、项目实战4.1 环境依赖版本4.2 使用pnpm4.3 git提交规范 五、资源 收集六、源码地址 一、前言 前端项目工程化之代码规范是指在前端项目中定义一套代码规范&#xff0c;以确保项目中的代码风格和格式一致&#xff0c;提高代码的可读性和…

GaussDB数据库SQL系列-行列转换

一、前言 二、简述 1、行转列概念 2、列转行概念 三、GaussDB数据库的行列转行实验示例 1、行转列示例 1&#xff09;创建实验表&#xff08;行存表&#xff09; 2&#xff09;静态行转列 3&#xff09;行转列&#xff08;结果值&#xff1a;拼接式&#xff09; 4&…

文心一言 VS CHATGPT

由于近几天来&#xff0c;我的手机短信不断收到百度公司对于“文心一言”大模型的体验邀请&#xff08;真是不胜其烦&#xff09;&#xff01;&#xff01;所以我就抱着试试看的态度点开了文心一言的链接&#xff1a;文心一言 目前看来&#xff0c;有以下两点与chatgpt是有比较…

Plex私人影音云盘搭建教程:本地电脑使用内网穿透实现远程访问

文章目录 1.前言2. Plex网站搭建2.1 Plex下载和安装2.2 Plex网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 1.前言 用手机或者平板电脑看视频&#xff0c;已经算是生活中稀松平常的场景了&#xff0c;特别是各…

《中国人工智能人才学习白皮书》发布!

Datawhale发布 2023 中国人工智能人才学习白皮书 I 导读 日前&#xff0c;由 Datawhale 联合上海白玉兰开源开放研究院、和鲸科技、江南大学教育信息化研究中心编写的《2023中国人工智能人才学习白皮书》&#xff08;下简称“白皮书”&#xff09;于8月24日正式发布。 学界大咖…

谈谈对OceanBase单机分布式一体化的思考

关于作者&#xff1a; 杨传辉&#xff0c;OceanBase CTO。2010 年作为创始成员之一加入 OceanBase 团队&#xff0c;主导了 OceanBase 历次架构设计和技术研发&#xff0c;从无到有实现 OceanBase 在蚂蚁集团全面落地。同时&#xff0c;他也主导了两次 OceanBase TPC-C 测试并打…

北约报告:2023-2043,下一代量子技术的发展与挑战

“当今的新技术正在以令人眼花缭乱的速度发展&#xff0c;我们所有人都可以在负责任且合乎道德的方式开发和部署新技术方面发挥作用。” ——这是副秘书长Mircea Geoană在2023年3月22日、在布鲁塞尔发布《北约科学技术组织2023-2043年趋势报告》时传达的信息。 Geoană先生强调…

【Python】使用python处理excel表格数据

Python有许多库可以用于处理Excel表格数据&#xff0c;其中最常用的是pandas和openpyxl。 pandas库 pandas库是一个非常强大的用于数据分析和操作的Python库。它支持处理各种数据类型&#xff0c;包括Excel表格数据。 首先需要安装pandas库&#xff0c;可以通过以下命令在终…

项目介绍:《Online ChatRoom》网页聊天室 — Spring Boot、MyBatis、MySQL和WebSocket的奇妙融合

在当今数字化社会&#xff0c;即时通讯已成为人们生活中不可或缺的一部分。为了满足这一需求&#xff0c;我开发了一个名为"WeTalk"的聊天室项目&#xff0c;该项目基于Spring Boot、MyBatis、MySQL和WebSocket技术&#xff0c;为用户提供了一个实时交流的平台。在本…

Noah-MP模型+Python

目的使参会学员熟悉陆表过程的主要研究内容以及陆面模型在生态水文研究中的地位和作用&#xff1b;深入理解Noah-MP 5.0模型的原理&#xff0c;掌握Noah-MP模型&#xff08;2023年最新发布的5.0版本&#xff09;所需的系统环境与编译环境的搭建方法及模型实践运行&#xff0c;熟…

Can‘t connect to local MySQL server through socket ‘/tmp/mysql.sock‘

最近在用django框架开发后端时&#xff0c;在运行 $python manage.py makemigrations 命令时&#xff0c;报了以上错误&#xff0c;错误显示连接mysql数据库失败&#xff0c;查看了mysql数据库初始化配置文件my.cnf&#xff0c;我的mysql.sock文件存放路径配置在了/usr/local…

【方案】河道漂浮物检测:基于视频智能分析/AI算法智能分析技术在河道整治场景的应用

随着社会的发展和人们生活水平的进步&#xff0c;水污染问题也越来越严重&#xff0c;水资源监管和治理成为城市发展的一大困扰&#xff0c;水面上的漂浮垃圾不仅会影响河道生态安全并阻碍船舶航行&#xff0c;还会影响人们的身体健康。 TSINGSEEE青犀AI智能分析平台在环保场景…

MySql时间

一、查询 查询mysql当前时间 SELECT now();查询mysql时区 show variables like%time_zone;二、修改时区 set global time_zone 8:00; &#xff08;修改mysql全局时区为北京时间&#xff0c;也就是我们所在的东8区&#xff0c;需要root权限&#xff09; set time_zone 8:0…

cad怎么保存成jpg图片?一分钟教会你转换

将CAD文件转换成JPG图片可以帮助我们将文件在更广泛的设备和应用程序中使用&#xff0c;因为JPG格式是一种广泛支持的图像格式。这意味着&#xff0c;无论您使用的是电脑、手机还是平板电脑&#xff0c;都可以轻松地查看和编辑这些图像。另外&#xff0c;JPG格式可以通过压缩图…

C++:重载运算符

1.重载不能改变运算符运算的对象个数 2.重载不能改变运算符的优先级别 3.重载不能改变运算符的结合性 4.重载运算符必须和用户定义的自定义类型的对象一起使用&#xff0c;其参数至少应该有一个是类对象&#xff0c;或类对象的引用 5.重载运算符的功能要类似于该运算符作用…

长胜证券:个税+房贷新政出台 AI+应用落地持续推进

昨日&#xff0c;两市股指盘中弱势震荡下探。到收盘&#xff0c;沪指跌0.55%报3119.88点&#xff0c;深成指跌0.61%报10418.21点&#xff0c;创业板指跌0.69%报2102.57点&#xff1b;两市算计成交8282亿元&#xff0c;北向资金净卖出约43亿元。行业方面&#xff0c;地产、券商板…