消息中间件 --Kafka

news2025/1/11 8:19:13

一、 Kafka

1.kafka介绍

        Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

生产者发送消息,多个消费者只能有一个消费者接收到消息

生产者发送消息,多个消费者都可以接收到消息 

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个 代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些 已发布的消息。 
2.kafka 下载(windows下)

ksfka下载链接如下,点击链接进入官网即可下载

kafka官网:http://kafka.apach e.org/

温馨提示:JDK版本至少需要1.8,高版本也可兼容;

第一步:将kafka文件夹解压缩,

注意:一定一定要解压到自己磁盘的根目录下,不然会出现闪退情况!!!

第二步:创建一个 data 空文件夹

后续需要用来存放日志文件,只要创建完成就可以了,kafka启动后会自动生成日志文件;

第三步: 修改 zookeeper.properties 配置文件

我们点击进入config文件夹,找到 zookeeper.properties 配置文件,双击进行修改

然后,我们找到 dataDir ,将它的值修改为我们刚才创建的 data 文件的路径,还要注意一点,在后面还要多加一个 "/zk",因为一会还要配置 server.properties ,所以要用将她们两个区分开

第四步:修改 server.properties 配置文件

和刚才一样,我们双击修改 "server.properties" 配置文件

我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径,在路径末尾再添加上 "/kafka" ,用来和刚才的zk做区分,kafka 文件夹用来存放kafka的日志文件,zk 文件夹用来存放zoopeeper的日志文件;

第五步:创建 "zk.cmd" windows脚本文件

以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 Zookeeper ,并且启动文件为 "zookeeper.properties" ;

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

第六步:创建 "kfk.cmd" windows脚本文件

仍然以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 kafka ,并且启动文件为 "server.properties" ;

call bin/windows/kafka-server-start.bat config/server.properties

注意:先启动双击 zk.cmd 启动 zookeeper双击 kafka.cmd 启动 kafka,关闭的时候,需要先关闭 kafka,再关闭 zookeeper ;

3.kafka使用:

(1)创建kafka-demo项目,导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

/**
 * @author 生产者
 * @version 1.0
 * @since 2024/9/2
 */
public class ProducerQuickStart {
    public static void main(String[] args) {

        // 1.kafka链接配置信息
        Properties prop = new Properties();

        // kafka链接地址
        // 指定 Kafka 集群的地址。在这个例子中,Kafka 服务运行在本地主机(localhost)的 9092 端口上。
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        // key和value的序列化
        // 配置消息的 key 和 value 序列化类。
        // Kafka 需要将 key 和 value 转换为字节数组进行传输,这里使用的是 StringSerializer,表示 key 和 value 都是字符串类型。
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        // 2.创建kafka生产者对象
        // 生产者对象依赖于前面配置的 Properties 对象来连接 Kafka 集群并发送消息。
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        // topic-first:消息要发送到的 Kafka 主题(topic)。
        // key-001:消息的 key,用于分区策略或消息的唯一标识。
        // hello kafka:消息的内容(value)。
        ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
        // 同步发送消息
        // 使用 Kafka 生产者的 send 方法将消息发送到 Kafka 集群中。
        // 这是一个异步操作,意味着消息会在后台发送,不会阻塞主线程。
        producer.send(kvProducerRecord);

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();

    }
}

(3)消费者接收消息

/**
 * @author 甜甜
 * @version 1.0
 * @since 2024/9/2
 */
public class ConsumerQuickStart1 {

    public static void main(String[] args) {

        // 1.kafka的配置信息
        Properties prop = new Properties();

        // 链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置消费者组
        // 指定消费者所属的消费者组(group1)。
        //
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");

        // 2.创建消费者对象
        // Kafka 使用消费者组来协调同一组中多个消费者的消息消费情况,确保每条消息只会被组内的一个消费者处理。
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        // 消费者订阅 topic-first 主题。
        // Collections.singletonList("topic-first") 创建了一个包含 topic-first 的列表,表示消费者只订阅一个主题。
        // 对应的生产者那边的主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //4.拉取消息
        // 无限循环,用于不断地从 Kafka 中拉取消息。
        while (true) {
            // 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            // ConsumerRecord 是 Kafka 中表示一条消息的对象,它包含消息的 key、value 以及一些元数据信息。
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

使用情景:

  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多) 
4.springboot集成kafka
4.1导入spring-kafka依赖信息
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  kafkfa  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.11.0-M2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
4.2在resources下创建文件application.yml
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 10 #重试的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4.3消息生产者
4.4消息消费者 

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有 两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
  • 接收消息

二、消息中间件对比

消息中间件对比-选择建议

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2115972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能,语音识别也算一种人工智能。

现在挺晚了&#xff0c;还是没有去睡觉&#xff0c;自己在想什么呢&#xff0c;也不确定。 这是一篇用语音写的文章&#xff0c;先按自己的想法说出来&#xff0c;然后再适当修改&#xff0c;也许就是一个不错的文章。 看来以后就不需要打字了&#xff0c;语音识别度很高&#…

两数之和--力扣1

两数之和 题目思路C代码 题目 思路 根据题目要求&#xff0c;元素不能重复且不需要排序&#xff0c;我们这里使用哈希表unordered_map。注意题目说了只对应一种答案。 所以我们在循环中&#xff0c;使用目标值减去当前循环的nums[i]&#xff0c;得到差值&#xff0c;如果我们…

ICM20948 DMP代码详解(8)

接前一篇文章&#xff1a;ICM20948 DMP代码详解&#xff08;7&#xff09; 上一回讲解了EMP-App中的入口函数main()中重点关注的第2段代码的后一个函数inv_icm20948_register_aux_compass&#xff0c;讲解了其各个参数&#xff0c;本回对于函数代码进行解析。为了便于理解和回顾…

市场独宠大尺寸超微小间距LED显示屏COB智能会议一体机

在当今这个信息化高速发展的时代&#xff0c;大屏幕显示设备已成为企业会议、教育培训、展览展示、商业广告等多个领域不可或缺的重要工具。随着技术的不断进步&#xff0c;市场上涌现出了投影机、液晶一体机、DLP背投、小间距LED、LED会议一体机以及新兴的COB智能会议一体机等…

vulhub Thinkphp5 2-rce远程代码执行漏洞

步骤一&#xff1a; 执行以下命令启动靶场环境并在浏览器访问 cd /vulhub/thinkphp/2-rce #进入漏洞环境所在目录docker-compose up -d #启动靶场docker ps #查看容器信息 步骤二&#xff1a;访问网页 步骤三&#xff1a;?s/Index/index/L/${phpinfo()} 步骤四&#xff1a;?…

828华为云征文 | 搭建云服务器Flexus X实例,开启简单上云第一步

828华为云征文 | 搭建云服务器Flexus X实例&#xff0c;开启简单上云第一步 Flexus云服务器X实例是一个高度可扩展的云平台&#xff0c;提供了一系列的服务&#xff0c;包括数据处理、应用开发、服务器托管等&#xff1b;对于开发者来说&#xff0c;Flexus云服务器X实例提供了一…

Percona 开源监控方案 PMM 详解

文章目录 前言1. 安装部署1.1 Server 安装1.2 Client 安装 2. 监控数据库2.1 MySQL2.2 PostgreSQL 3. Dashboard 介绍总结 前言 Percona Monitoring and Management (PMM) 是 Percona 公司基于业界流行的组件 Prometheus 和 Grafana 设计开发的一体化数据库监控解决方案。本篇…

vulhub ThinPHP5 2-rce远程代码执行漏洞

1.打开环境 进入环境所在的文件 docker-compose up -d 一键启动 2.浏览器访问环境 3.构造payload ?s/index/index/L/${phpinfo()} 4.写入一句话木马 ?s/Index/index/name/${print(eval($_POST[cmd]))} 5.使用蚁剑连接 http://192.168.10.233:8080/?s/Index/index/name…

SpringMVC基于注解使用:国际化

01-国际化介绍 首先在bootstrap下载个页面 下载后把登录页面的代码粘上去 然后再登录页面代码上有些超链接需要再spring-mvc.xml里面配置下&#xff0c;登录页面才能正常显示 配置静态资源 国际化-根据浏览器语言国际化 现在是中文的情况&#xff0c;要改为英文 1.配置下属…

秒懂:进程优先级

1.概念 简单来说&#xff0c;进程优先级是对于资源访问顺序来说的&#xff0c;谁先访问资源&#xff0c;谁的优先级就高。 注意&#xff1a;这和权限概念不一样&#xff0c;权限是能不能访问。 2.情景引入 进程的运行&#xff0c;是在CPU上执行&#xff0c;每次执行只能执行CP…

JavaWeb【day09】--(Mybatis)

1. Mybatis基础操作 学习完mybatis入门后&#xff0c;我们继续学习mybatis基础操作。 1.1 需求 需求说明&#xff1a; 根据资料中提供的《tlias智能学习辅助系统》页面原型及需求&#xff0c;完成员工管理的需求开发。 通过分析以上的页面原型和需求&#xff0c;我们确定了…

寒冬下,你不知道的赛道!

见字如面&#xff0c;技术人雄起&#xff01; 现在的环境怎样没有比技术人更有体会的了吧&#xff0c;但是我也不禁要问&#xff0c;还有那些赛道过的不错&#xff0c;还有谁在挣钱。因为能量是守恒的&#xff0c;西边不亮东边亮。真巧还真让我找到一些。 中药 2023年中国中药…

文件名管理器,一款免费的文件名管理工具,支持文件整理功能

文件名管理器是一款可以批量修改文件名的工具&#xff0c;但是相较于其他工具又有不同。除了批量重命名功能外&#xff0c;软件同时提供一些特色功能&#xff1a;把文件名插入到文本文件中、根据文件名写入音乐ID3信息&#xff0c;整理下载的视频资源、音乐分类整理等。软件提供…

大圣也得靠AI?深扒《黑神话:悟空》后的AI技术!

要说最近爆火刷屏的顶流热词&#xff0c;非《黑神话&#xff1a;悟空》莫属。 图片来源&#xff1a;新华社 《黑神话&#xff1a;悟空》游戏背景设定在一个宏大而神秘的神话宇宙中&#xff0c;融合了古代中国的神话传说与虚构的奇幻元素&#xff0c;构建了一个绚丽多彩的开放世…

019、JOptionPane类的常用静态方法详解

目录 JOptionPane类的常用静态方法详解 1. showInputDialog()方法 1.1基本用法 1.2带有默认值的输入框 1.3带有选项的输入对话框 1.4自定义图标的输入对话框 2. showConfirmDialog()方法 2.1基本用法 2.2自定义按钮和图标 2.3带有自定义组件的确认对话框 3. showMes…

Pipeline流水线通过git拉取Jenkinsfile报错 error: RPC failed; result=22, HTTP code = 404

Pipeline流水线通过git拉取Jenkinsfile报错 error: RPC failed; result22, HTTP code 404 在学习共享库时使用通过git拉取jenkinsfile时&#xff0c;报错在排查gitlab服务状态&#xff0c;网络通讯&#xff0c;防火墙规则以及Jenkins凭据均可以正常使用&#xff0c;最后发现的…

HTB-Tactics(Impacket工具集合 和 smb特殊共享)

前言 各位师傅大家好&#xff0c;我是qmx_07&#xff0c;今天给大家讲解Tactics靶机 渗透过程 信息搜集 服务器开启了smb协议 连接smb服务器 Admin$ 对应的是 C:\Windows 目录,是 Windows 自动创建的共享,主要用于执行一些与系统管理相关的操作&#xff0c;例如远程文件管…

Spring Boot3.x 启动自动执行sql脚本

1 引言 某些项目在首次启动时&#xff0c;需要先手动创建数据库表&#xff0c;然后再手动写入初始数据才能正常使用。为了省去这个手动操作过程&#xff0c;我们可以使用Spring Boot启动时执行sql脚本的配置&#xff0c;全自动完成这个过程。 2 配置 具体配置如下&#xff1…

Redis访问工具

使用Redis存储缓存数据&#xff0c;如何通过Java去访问Redis&#xff1f; 防止后面看晕&#xff0c;先来张图。 1. Redis的客户端库 Redis的客户端库是Redis官方提供的&#xff0c;用于让Java等编程语言与Redis服务器进行通信的工具包。常见的Redis客户端库有多个&#xff0c…

828华为云征文|Flexus X实例C#/.Net Core 结合(git代码管理、docker自定义镜像)快速发布部署-让你的项目飞起来~

目录 前言 环境准备 购买服务器配置 项目部署 项目准备&#xff08;Dockerfile文件&#xff09; Git部署并拉取代码 安装Git 配置Git用户信息 SSH密钥 安装docker 添加Docker GPG密钥 添加Docker官方软件源 再次更新软件包列表 安装docker 项目运行 构建镜像 …