kafka安装说明以及在项目中使用

news2025/1/23 7:21:59

一、window 安装

1.1、下载安装包

  • 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz
  • 其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12
    在这里插入图片描述

1.2、安装配置

1、解压刚刚下载的配置文件,解压后如下,其中 datakafka-logs 这两个文件是没有的

在这里插入图片描述

2、修改配置:进入到config目录,

  • 修改service.properties里面log.dirs路径未 log.dirs=F:\kafka\installSurround\kafka3.4.0\kafka-logs,该目录是kafka的数据存储目录

在这里插入图片描述

  • 修改zookeeper.properties里面dataDir路径为 dataDir=F:\kafka\installSurround\kafka3.4.0\data,该目录是 zookeeper存储的kafka的数据目录
    在这里插入图片描述

3、server.properties说明

属性说明
log.dirs指定Broker需要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中一定要为log.dirs配置多个路径,如果条件允许,需要保证目录被挂载到不同的物理磁盘上。优势在于,提升读写性能,多块物理磁盘同时读写数据具有更高的吞吐量;能够实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,而且Broker还能正常工作,基于Failover机制,Kafka可以舍弃RAID方案。
zookeeper.connectCS格式参数,可以指定值为zk1:2181,zk2:2181,zk3:2181,不同Kafka集群可以指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只需要写一次。
listeners设置内网访问Kafka服务的监听器。
advertised.listeners设置外网访问Kafka服务的监听器。
auto.create.topics.enable是否允许自动创建Topic。
unclean.leader.election.enable是否允许Unclean Leader 选举。
auto.leader.rebalance.enable是否允许定期进行Leader选举,生产环境中建议设置成false。
log.retention.{hoursminutes
log.retention.bytes指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker能够接收的最大消息大小。

1.3、启动

1、 启动脚本都在bin目录的window目录下,一定要先启动 zookeeper,再启动kafka

如果是linux,不使用window下的命令即可,使用对应的 xxxx.sh 即可

在这里插入图片描述

2、首先启动zookeeper

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

在这里插入图片描述

3、在启动kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、linux 安装

暂略

三、docker 安装

暂略

四、docker 安装

暂略

五、命令行使用

5.1、topic 命令

1、关于topic,这里用window 来示例

bin\windows\kafka-topics.bat

在这里插入图片描述

2、创建 first topic,五个分区,1个副本

bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

在这里插入图片描述
3、查看当前服务器中的所有 topic

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

在这里插入图片描述

4、查看 first 主题的详情

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

在这里插入图片描述
5、修改分区数**(注意:分区数只能增加,不能减少)**

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

在这里插入图片描述

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 datakafka-logs 这两个文件中的内容,再次重新启动即可。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

在这里插入图片描述

5.2、生产者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-producer.bat

在这里插入图片描述

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

5.3、消费者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-consumer.bat

在这里插入图片描述
在这里插入图片描述

2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

在这里插入图片描述

3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

在这里插入图片描述

5.4、脚本说明

项目Value
connect-standalone.sh用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh用于管理所有TOPIC。
kafka-console-producer.sh用于生产消息。
kafka-console-consumer.sh用于消费消息。
kafka-producer-perf-test.sh用于生产者性能测试
kafka-consumer-perf-test.sh用于消费者性能测试
kafka-delete-records.sh用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。
kafka-dump-log.sh用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。
kafka-log-dirs.sh用于查询各个Broker上的各个日志路径的磁盘占用情况。
kafka-mirror-maker.sh用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。
kafka-reassign-partitions.sh用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh用于执行任何带main方法的Kafka类。
kafka-server-start.sh用于启动Broker进程。
kafka-server-stop.sh用于停止Broker进程。
kafka-streams-application-reset.sh用于给Kafka Streams应用程序重设位移,以便重新消费数据。
kafka-verifiable-producer.sh用于测试验证生产者的功能。
kafka-verifiable-consumer.sh用于测试验证消费者功能。
trogdor.sh是Kafka的测试框架,用于执行各种基准测试和负载测试。
kafka-broker-api-versions.sh脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性

5.5、关闭kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

在这里插入图片描述

5.6、选择分区数及kafka性能测试

1、主要工具是 kafka-producer-perf-test.batkafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试

六、java 使用

6.1、使用原生客户端

1、依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、发送和消费消息,具体代码如下:

public class KafkaConfig {
 
    public static void main(String[] args) {
        // 声明主题
        String topic = "first";
        // 创建消费者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 订阅主题并循环拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 创建生产者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 给主题发送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

6.2、使用springBoot

1、依赖

 <!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 -->
            <!--            <version>3.0.2</version>-->
        </dependency>

2、配置文件

server:
  port: 7280
  servlet:
    context-path: /thermal-emqx2kafka
  shutdown: graceful

spring:
  application:
    name: thermal-api-demonstration-tdengine
  lifecycle:
    timeout-per-shutdown-phase: 30s
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher  # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298
  kafka:
    bootstrap-servers: 127.0.0.1:9092  # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094  连接的 Kafka Broker 主机名称和端口号
    #properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializer
    producer: # 生产者
      retries: 3 # 重试次数
      #acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      #batch-size: 16384 # 一次最多发送数据量
      #buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改
      #enable-auto-commit: true # 是否自动提交offset
      #auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
      #auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

package cn.jt.thermalemqx2kafka.kafka.controller;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/mock")
    public String sendKafkaMessage() {
        Map<String, Object> data = new HashMap<>(2);
        data.put("id", 1);
        data.put("name", "gkj");
        kafkaTemplate.send("first", JSON.toJSONString(data));
        return "ok";
    }
}

4、接受消息

package cn.jt.thermalemqx2kafka.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = "first")
    private void handler(String content) {
        log.info("consumer received: {} ", content);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/892196.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nginx负载均衡下的webshell连接与过滤绕过以及LD_PROLOAD利用

目录 一、Nginx负载均衡下的webshell连接 1.环境搭建以及webshell连接 2.出现的问题 3.解决方案 二、Webshell的过滤绕过 1.异或操作绕过 ​2.取反操作绕过 3.PHP语法绕过 三、LD_PRELOAD的利用 1.初识LD_PRELOAD 2.利用LD_PRELOAD 2.1.制作linux后门 2.2.绕过PHP…

Python“牵手”京东商品评论数据采集方法,京东API申请指南

京东平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c;京东API接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问京东平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现京东平台…

Redis消息传递:发布订阅模式详解

目录 1.Redis发布订阅简介 2.发布/订阅使用 2.1 基于频道(Channel)的发布/订阅 2.2 基于模式(pattern)的发布/订阅 3.深入理解Redis的订阅发布机制 3.1 基于频道(Channel)的发布/订阅如何实现的&#xff1f; 3.2 基于模式(Pattern)的发布/订阅如何实现的&#xff1f; 3.3 Sp…

Maven官网下载配置新仓库

1.Maven的下载 Maven的官网地址&#xff1a;Maven – Download Apache Maven 点击Download&#xff0c;查找 Files下的版本并下载如下图&#xff1a; 2.Maven的配置 自己在D盘或者E盘创建一个文件夹&#xff0c;作为本地仓库&#xff0c;存放项目依赖。 将下载好的zip文件进行解…

模型预测笔记(一):数据清洗及可视化、模型搭建、模型训练和预测代码一体化和对应结果展示(可作为baseline)

模型预测 一、导入关键包二、如何载入、分析和保存文件三、修改缺失值3.1 众数3.2 平均值3.3 中位数3.4 0填充 四、修改异常值4.1 删除4.2 替换 五、数据绘图分析5.1 饼状图5.1.1 绘制某一特征的数值情况&#xff08;二分类&#xff09; 5.2 柱状图5.2.1 单特征与目标特征之间的…

seller_info-获得淘宝店铺详情

一、接口参数说明&#xff1a; seller_info-获得淘宝店铺详情&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/seller_info 名称类型必须描述keyString是调用key&#xff08;点击获…

JVM——类的生命周期

文章目录 类加载过程加载验证准备解析初始化 卸载 一个类的完整生命周期如下&#xff1a; 类加载过程 Class 文件需要加载到虚拟机中之后才能运行和使用&#xff0c;那么虚拟机是如何加载这些 Class 文件呢&#xff1f; 系统加载 Class 类型的文件主要三步:加载->连接->…

从xxl-job源码看Scheduler定时任务的原始实现

一、背景 因为xxl-job本身是统一的分布式任务调度框架&#xff0c;所以在实现定时任务的时候&#xff0c;就断不能再去依赖别人了。 其次&#xff0c;它尽可能只依赖spring框架&#xff0c;或者说spring boot/cloud。 也就是说&#xff0c;它会尽少地使用spring外的三方框架。…

AutoSAR系列讲解(深入篇)13.3-Mcal Dio配置

目录 一、Dio port配置 二、Dio pin配置 一、Dio port配置 同之前的Port一样,双击进入Dio配置界面后会看到几乎差不多的配置界面。General和Port类似,我们不再赘述,主要讲解Dio的配置 1. 其实Dio并没有什么实质的作用,主要起到了一个重命名的功能。双击DioConfig_0进入下…

小黑子—JavaWeb:第七章 - Vue 与 Element 综合案例

JavaWeb入门7.0 1. VUE1.1 Vue 快速入门1.2 Vue 常用指令1.2.1 v-bind1.2.2 v-model1.2.3 v-on1.2.4 v-if1.2.5 v-show1.2.6 v-for 1.3 Vue 生命周期1.4 Vue案例1.4.1 查询所有1.4.2 新增品牌 2. Element2.1 Element快速入门2.2 Element布局2.3 Element 常用组件2.3.1 表格2.3.…

京东API简介及应用实例

京东API&#xff08;Application Programming Interface&#xff09;&#xff0c;即京东开放平台的接口&#xff0c;是京东提供给开发者的一组规定格式和约定的接口&#xff0c;用于实现与京东电商平台进行数据交互和功能调用。通过使用京东API&#xff0c;开发者可以实现商品查…

Unity的TimeScale的影响范围分析

大家好&#xff0c;我是阿赵。 这期来说一下Unity的TimeScale。 一、前言 Unity提供了Time这个类&#xff0c;来控制时间。其实我自己倒是很少使用这个Time&#xff0c;因为做网络同步的游戏&#xff0c;一般是需要同步服务器时间&#xff0c;所以我比较多是在使用System.Date…

【运筹优化】贪心启发式算法和蜘蛛猴优化算法求解连续选址问题 + Java代码实现

文章目录 一、问题描述二、思路分析三、解决方案3.1 贪心启发式算法3.2 群体智能算法&#xff08;蜘蛛猴优化算法&#xff09; 四、总结 一、问题描述 选址问题是指在规划区域里选择一个或多个设施的位置&#xff0c;使得目标最优。 按照规划区域的结构划分&#xff0c;可以将…

QT的network的使用

一个简单的双向的网络连接和信息发送。效果如下图所示&#xff1a; 只需要配置这个主机的IP和端口号&#xff0c;由客户端发送链接请求。即可进行连接。 QT的network模块是一个用于网络编程的模块&#xff0c;它提供了一系列的类和函数&#xff0c;可以让您使用TCP/IP协议来创…

pdf格式文件下载不预览,云存储的跨域解决

需求背景 后端接口中返回的是pdf文件路径比如&#xff1a; pdf文件路径 &#xff08;https://wangzhendongsky.oss-cn-beijing.aliyuncs.com/wzd-test.pdf&#xff09; 前端适配是这样的 <ahref"https://wangzhendongsky.oss-cn-beijing.aliyuncs.com/wzd-test.pdf&…

Spring框架之AOP详解

目录 一、前言 1.1.Spring简介 1.2.使用Spring的优点 二、Spring之AOP详解 2.1.什么是AOP 2.2.AOP在Spring的作用 2.3.AOP案例讲解 三、AOP案例实操 3.0.代理小故事&#xff08;方便理解代理模式&#xff09; 3.1.代码演示 3.2.前置通知 3.3.后置通知 3.3.环绕通知…

聚观早报|俞敏洪东方甄选带货北京特产;京东物流上半年盈利

【聚观365】8月17日消息 俞敏洪东方甄选直播间带货北京特产 京东物流上半年实现盈利 百度CTO称大语言模型为人工智能带来曙光 腾讯控股第二季度盈利262亿元 2023中国家庭智慧大屏消费白皮书 俞敏洪东方甄选直播间带货北京特产 近日&#xff0c;东方甄选在北京平谷区开播&…

Linux:shell脚本:基础使用(5)《正则表达式-sed工具》

sed是一种流编辑器&#xff0c;它是文本处理中非常中的工具&#xff0c;能够完美的配合正则表达式使用&#xff0c;功能不同凡响。 处理时&#xff0c;把当前处理的行存储在临时缓冲区中&#xff0c;称为“模式空间”&#xff08;pattern space&#xff09;&#xff0c;接着用s…

数据库MySQL 创建表INSERT

创建表 常见数据类型(列类型) 列类型之整型 unsigned的用法 列类型之bit 二进制表示 bit&#xff08;8&#xff09;表示一个字节 列类型之小数 1.单精度float 双精度double 2.decimal 自定义 M为小数点前面有多少位 D是小数点后面有多少位 列类型之字符串 1.char( 字符 )…

实现简单的element-table的拖拽效果

第一步&#xff0c;先随便创建element表格 <el-table ref"dragTable" :data"tableData" style"width: 100%" border fit highlight-current-row><el-table-column label"日期" width"180"><template slot-sc…