Kafka学习-Java使用Kafka

news2025/1/10 2:53:52

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {
    Properties prop = new Properties();

    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    prop.put(ProducerConfig.RETRIES_CONFIG, 0);
    prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    String topic = "hello";

    KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
    for (int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));
        System.out.println("生产消息:" + i);
        Thread.sleep(1000);
    }
    producer.close();
}

3、消费者

public static void main(String[] args) {
    Properties prop = new Properties();

    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量
    prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
    ArrayList<String> topics = new ArrayList<>();
    //可以订阅多个消息
    topics.add("hello");
    consumer.subscribe(topics);

    try {
        while(true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));
            for (TopicPartition topicPartition : poll.partitions()) {

                //	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
                List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);

                //	获取TopicPartition对应的主题名称
                String topic = topicPartition.topic();
                //	获取TopicPartition对应的分区位置
                int partition = topicPartition.partition();
                //	获取当前TopicPartition下的消息条数
                int size = partitionRecords.size();
                System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",
                        topic,
                        partition,
                        size);

                for(int i = 0; i < size; i++) {
                    ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
                    //	实际的数据内容
                    String key = consumerRecord.key();
                    //	实际的数据内容
                    String value = consumerRecord.value();
                    //	当前获取的消息偏移量
                    long offset = consumerRecord.offset();
                    //	表示下一次从什么位置(offset)拉取消息
                    long commitOffser = offset + 1;
                    System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",
                            key, value, offset, commitOffser);
                    Thread.sleep(1500);
                }

            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1665272.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

盛邦安全荣获北京市海淀区上地街道财源建设工作表彰

近日&#xff0c;盛邦安全受邀出席上地街道2024年第一季度财源建设工作联席会暨上地人工智能产业报告发布大会并收到上地街道颁发的感谢信&#xff0c;这是对公司技术创新、管理提升、营收增长&#xff0c;持续为上地地区财源建设做出突出贡献的鼓励。 盛邦安全副总裁、董事会秘…

第四百九十九回

文章目录 1. 概念介绍2. 使用方法2.1 固定样式2.2 自定义样式 3. 示例代码4. 内容总结 我们在上一章回中介绍了"GetMaterialApp组件"相关的内容&#xff0c;本章回中将介绍使用get显示SnackBar.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在介…

pyqt颜色变换动画效果

pyqt颜色变换动画效果 QPropertyAnimation介绍颜色变换效果代码 QPropertyAnimation介绍 QPropertyAnimation 是 PyQt中的一个类&#xff0c;它用于对 Qt 对象的属性进行动画处理。通过使用 QPropertyAnimation&#xff0c;你可以平滑地改变一个对象的属性值&#xff0c;例如窗…

【企业必备】提升企业新质生产力,英智推出私有化大模型定制方案

市面上有很多性能不错的大模型&#xff0c;但大部分企业都不敢使用&#xff0c;担心模型训练和使用过程中出现数据泄露的风险。智能化升级是当今时代企业发展的必然趋势&#xff0c;在大模型发展迅猛的时代&#xff0c;借助AI能力的公司能够更快速提升自身的核心竞争力&#xf…

使用xtuner微调InternLM-Chat-7B

1. 安装xtuner #激活环境 source activate test_llm # 安装xtuner pip install xtuner#还有一些依赖项需要安装 future>0.6.0 cython lxml>3.1.0 cssselect mmengine 2. 创建一个ft-oasst1 数据集的工作路径&#xff0c;进入 mkdir ft-oasst1 cd ft-oasst1 3.XTune…

以Azure为例的SSO

由于文章的篇幅有限&#xff0c;无法将全部的代码贴上来&#xff0c;如想要看完整案例&#xff0c;请在公众号文章中留言(其他平台很少看…毕竟最近印度同事的UI组件库搞得我好烦) 1.关于SSO 单点登录又称之为SSO,全称为 Single Sign On &#xff0c;一般在多个应用系统中&…

LangChain连接国内大模型测试|智谱ai、讯飞星火、通义千问

智谱AI 配置参考 https://python.langchain.com/v0.1/docs/integrations/chat/zhipuai/ZHIPUAI_API_KEY从https://open.bigmodel.cn/获取 from langchain_community.chat_models import ChatZhipuAI from langchain_core.messages import AIMessage, HumanMessage, SystemMes…

【Ubuntu 安装erlang】

apt-get 安装 apt-get install erlang或 源码安装 git clone https://github.com/erlang/otp.git cd otp git checkout maint-25 # current latest stable version ./configure make make install安装完后&#xff0c;验证是否成功 # 命令行输入 erl

漫谈AI时代的手机

以chatGPT 为代表的大语言的横空出世使人们感受到AI 时代的到来&#xff0c;大语言模型技术的最大特点是机器开始”懂人话“&#xff0c;”说人话“了。如同任何一个革命性工具的出现一样&#xff0c;它必将改变人类生活和工作。 在这里。我谈谈AI时代的手机。 语音通信的历史…

【C语言—猜数字小游戏】

一、游戏规则 电脑自动生成一个1~100范围内的随机数&#xff0c;由玩家猜测本轮生成的随机数是什么&#xff0c;系统根据玩家猜测数据的⼤⼩给出猜⼤了或猜⼩了的反馈&#xff0c;直到玩家猜对&#xff0c;游戏结束。 如何生成随机数&#xff1a;【C语言】/*如何生成随机值*/-C…

es6语法总结

【1】语法 &#xff08;1&#xff09;声明变量(let-var-const) 变量提升&#xff1a; 是JavaScript引擎在代码执行前将变量的声明部分提升到作用域顶部的行为。尽管变量的声明被提升了&#xff0c;变量的赋值&#xff08;即初始化&#xff09;仍然保留在原来的位置。因此&…

C++:关于圆形鱼眼半全景图转为等距圆柱投影图

C&#xff1a;空间坐标映射到球面坐标/全景图_如何将球体坐标映射到球面uv-CSDN博客 C&#xff1a;关于360全景图像和立方体6面全景图像的相互转换_彩色全景拆解正方体6个面-CSDN博客 之前记录了立方体和360全景之间的转换&#xff0c;这次记录下鱼眼图与360全景图之间的转换…

Doris【部署 01】Linux部署MPP数据库Doris稳定版(下载+安装+连接+测试)

本次安装测试的为稳定版2.0.8官方文档 https://doris.apache.org/zh-CN/docs/2.0/get-starting/quick-start 这个简短的指南将告诉你如何下载 Doris 最新稳定版本&#xff0c;在单节点上安装并运行它&#xff0c;包括创建数据库、数据表、导入数据及查询等。 Linux部署稳定版Do…

【Python】PYQT5详细介绍

本专栏内容为&#xff1a;Python学习专栏 通过本专栏的深入学习&#xff0c;你可以了解并掌握Python。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;Python &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&#x1f69a; &#x1f3…

Array.map解析

map方法会创建一个新数组。该方法会循环数组中的每个值&#xff0c;如果仅仅是想循环数组不需要返回值使用数组的forEach方法就可以。原数组中的每个元素都调用一次提供的函数后的返回值组成。Array.map 它接收一个函数 这个函数可以接收三个参数 数组的每个值item 这个值的索引…

ICode国际青少年编程竞赛- Python-4级训练场-嵌套for循环入门

ICode国际青少年编程竞赛- Python-4级训练场-嵌套for循环入门 1、 for i in range(3):Dev.step(3)for j in range(3):Dev.turnLeft()Dev.step(-2)Dev.turnLeft()2、 for i in range(3):Dev.turnLeft()Dev.step(4)Dev.turnRight()Dev.step(2)for i in range(4):Dev.step(2)D…

ardupilot开发 --- opencv 篇

0. 一些概念 官网&#xff1a;https://opencv.org/ 1. 卸载 sudo apt-get --purge remove libopencv sudo apt-get --purge remove libopencv-dev sudo apt-get --purge remove libopencv* sudo apt-get --purge remove opencv sudo apt-get --purge remove *opencv* sudo a…

视频剪辑达人分享:一键批量置入随机封面,创意无限

在数字化媒体飞速发展的今天&#xff0c;视频内容已经成为我们表达创意、分享故事、传递信息的主要方式之一。而在视频制作过程中&#xff0c;封面作为视频的“脸面”&#xff0c;往往决定了观众是否愿意点击观看。因此&#xff0c;为视频选择合适的封面变得至关重要。 在大量…

3D数字化解决方案助力文博行业转型,让文物“活”起来!

博物馆是保护和传承人类文明的重要殿堂&#xff0c;是连接过去、现在、未来的桥梁&#xff0c;为了进一步加强文物及藏品保护&#xff0c;不断提高博物馆服务和科普宣传水平&#xff0c;博物馆数字化转型已是当下发展趋势。 在科技的“加持”下&#xff0c;不少博物馆凭借强大的…

LeetCode-258. 各位相加【数学 数论 模拟】

LeetCode-258. 各位相加【数学 数论 模拟】 题目描述&#xff1a;解题思路一&#xff1a;循环解题思路二&#xff1a;进阶 O(1)解题思路三&#xff1a; 题目描述&#xff1a; 给定一个非负整数 num&#xff0c;反复将各个位上的数字相加&#xff0c;直到结果为一位数。返回这个…