kafka的成神秘籍(java)

news2024/10/8 13:54:44

kafka的成神秘籍

kafka的简介

​ Kafka 最初是由Linkedin 即领英公司基于Scala和 Java语言开发的分布式消息发布-订阅系统,现已捐献给Apache软件基金会。Kafka 最被广为人知的是作为一个 消息队列(mq)系统存在,而事实上kafka已然成为一个流行的分布式流处理平台。其具有高吞吐、低延迟的特性,许多大数据处理系统比如storm、spark、flink等都能很好地与之集成。按照Wikipedia上的说法,kafka的核心数据结构本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”。总的来讲,kafka通常具有3重角色:

消息系统

​ Kafka和传统的消息队列比如RabbitMQ、RocketMQ、ActiveMQ类似,支持流量削锋、服务解耦、异步通信等核心功能。

流处理平台

​ Kafka 不仅能够与大多数流式计算框架完美整合,并且自身也提供了一个完整的流式处理库,即kafka Streaming。kafka Streaming提供了类似Flink中的窗口、聚合、变换、连接等功能。

存储系统

​ 通常消息队列会把消息持久化到磁盘,防止消息丢失,保证消息可靠性。Kafka的消息持久化机制和多副本机制使其能够作为通用数据存储系统来使用

​ 一句话概括:Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),在业界主要应用于大数据实时处理领域。

kafka体系结构

Topic(主题)

  • 适用于存储消息的逻辑概念,1个Topic,可以看做是消息的集合。
  • 这个消息的集合(Topic),可以接收多个生产者(Producer)推送(Push)过来的消息,也可以让多个消费者(Consumer)从中消费(Pull)消息。

分区(Partition)

  • 分区的概念,可以理解为Topic的子集:1个Topic可能只有1个分区,也可能有多个分区。这里说的多个分区,一个分区就代表磁盘的一块连续的位置,不同的分区也就是磁盘上不同的区域块。

  • 分区存在的意义:

​ 通过不同的区域块,kafka存储在Topic里的消息就可以在多个地方存储,也就是对kafka进行了水平扩展,这样可以增加kafka的 并行处理能力。

  • 对于不同分区的理解

​ 同一个Topic下,多个区域块,这些不同的区域块,存储的消息是不同的,也就是说,A、B两个区域块,不会同时存储1类消 息。

  • 对于同一个分区中的消息:

    ​ 在区块A中接收到消息时,该消息会接收它在这块区域块A中的Offset(唯一编号),这个Offset使得kafka确定了消息在区域块中的顺序。(这样就保证了磁盘的顺序读写,减少磁盘IO)

    image-20241002131656296

Log

  • 分区在逻辑上,对应一个Log,当生产者将消息写入分区的时候,实际上就是写入一个log。

  • Log:一个逻辑概念,对应着磁盘上的一个文件夹。

  • Log的组成:由多个Segment组成,每1个Segment对应1个日志文件和一个索引文件。

Broker

​ 区域块A会给他接收的消息分配一个offset,并且x会保存到A所在的磁盘区域上。而这个功能,就是由Broker完成的。

  • Broker:1个Broker就是一个单独的kafka server。

  • Broker的主要工作:接收生产者发送来的消息,分配offset,然后将包装过的数据保存到磁盘上。

  • Broker的其他作用:接收消费者Consumer和其他Broker的请求,根据请求的类型进行相应的处理然后返回响应。

  • 这里引出集群(Cluster)的概念,1个Cluster是由多个Broker构成的,也就是说,1个Broker不会对外提供服务,而是通过Cluster的形式对外提供服务:

因为,一个Cluster里,需要1个Broker担任Controller,这个Broker就是这个集群的指挥中心,负责:

  • 管理各个分区(Partition)的状态;
  • 管理每个分区(Partition)的副本的状态;
  • 监听zookeeper的数据变化。

其他的Broker均是通过这个Controller进行指挥的,完成各自相应的功能。

关于Cluster的一主多从实现:

除了担任Controller的Broker会监听其他Broker的状态,其他Broker也会监听Controller的状态,当Controller出翔了故障,就会重新选取新的Broker担任Controller

消息

kafka中最基本的消息单元。有一串字节组成,主要由key和value构成(即key、value都是字节数组)

  • key:主要作用是 根据一定策略,将这个消息路由到制定分区中 ==> 这样就保障了,包含同一个key的消息全部写入一个分区A,不会写入另外一个分区。(即实现了Partition分区中提到的“不同分区存储的消息是不一样的”)

副本

kafka会对消息进行冗余备份,每一个分区Partition,都可以有多个副本(每一个副本包含的消息是相同的,但是不能保证同一时刻下完全相同)。

副本类型:Leader、Follower。

副本选举策略(即选举1个Leader,其余为Follower):

  • 当分区只有1个副本,这个副本就是Leader,没有Follower。
  • 在其他不同场景,会采取不同的选举策略。
  • Leader:处理kafka中所有的读写请求
  • Follower:仅仅把数据从Leader中拉取到本地,同步更新到自己的Log中。

生产者

产生消息的对象,产生消息之后,将消息按照一定的规则推送到Topic的分区中

消费者

从Topic中拉取消息,并对消息进行消费

  • Consumer:有一个作用,维护它消费到分区(Partition)上的什么位置(即offset的值)。
  • Consumer Group: 在kafka中,多个Consumer可以组成1个Consumer Group,1个Consumer只属于1个Consumer Group.

Consumer Group的作用:保证了这个Consumer Group订阅的Topic(Partition的集合)中的每一个分区(Partition),只被Consumer Group中的一个Consumer处理。

当然,如果要实现消息的广播消费,则将同1条消息放在多个不同的Consumer Group中即可。

就上述这个Consumer和Partition的关系可以理解下面的说法:

通过向Consumer Group中动态添加适量的Consumer, 可以触发kafka的Rebalance操作(重新分配Partition和Consumer的一一对应关系,结合Topic部分的理解,这样就实现了kafka的水平扩展能力)

kafka的安装与使用

安装

本文采用docker安装需要确定linux主机上是否有docker环境

docker -v

image-20241002221756035

拉取zookeeper镜像与kafka镜像

#拉取zookeeper镜像
docker pull zookeeper
#拉取kafka镜像
docker pull wurstmeister/kafka

运行zookeeper

docker run --restart=always -d -p 2181:2181 --name zookeeper zookeeper
#-restart=always 停机不断重启
#-p 2181:2181 docker容器端口映射到主机的端口

运行kafka

docker run --restart=always -d --name kafka -p 19092:19092 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:19092 -e KAFKA_PORT=19092 wurstmeister/kafka
#KAFKA_ZOOKEEPER_CONNECT=localhost:2181 需要注册的 zookeeper的ip和端口

创建topic

  • 进入容器目录
    docker exec -it kafka bash
  • 执行创建topic命令

sh /opt/kafka_2.13-2.8.1/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:19092 --replication-factor 1 --partitions 1 --topic data-transform

image-20241002222929452

使用(java)

添加项目依赖
  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
  </dependency>
编写配置文件
spring:
  kafka:

    bootstrap-servers: localhost:9092
    # 消费者
    consumer:
	  #分组id
      group-id: my-group
      # 从一次消费过信息的下一条开始消费
      auto-offset-reset: earliest

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      
    #        配置kafka的信息的生产者
    producer:

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    template:
      # 默认的消费主题
      default-topic: data-transform
生产者

@Component
@Slf4j
public class DataProductService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public final static String TOPIC = "data-transform";
    public void sendDataMessage(String message){
        log.info("生产者生产数据开始:", DateUtils.format(new Date()),"yyyyMMdd HH:mm:ss");
        kafkaTemplate.send(TOPIC, message).completable()
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        RecordMetadata metadata = result.getRecordMetadata();
                        log.info("生产者生产数据成功:,数据为:{}",metadata);
                    } else {
                        log.error("生产者生产数据失败:", ex);
                    }
                });
    }
}
消费者

/**
 * @Description:
 * @author:<a href="2358853434@qq.com"></a> zh
 * @Create : 2024/10/2
 **/
@Component
@Slf4j
public class DataConsumerService {

    @KafkaListener(topics = DataProductService.TOPIC,groupId = "my-group")
    public void ConsumerData(String message){
          log.info("消费者消费数据,数据为:{}",message);
    }
}
测试用例
@SpringBootTest(classes = application.class)
public class demo {
    @Resource
    DataProductService dataProductService;
    @Resource
    DataConsumerService dataConsumerService;
    @Test
    public void sendDataMessage(){
        long l = System.currentTimeMillis();
        //模拟电商下单信息推送
        for (int i = 1000; i < 10001; i++) {
            String message = "{\"orderId\":\""+i+"\",\"userId\":\""+i+"\",\"productId\":\""+i+"\",\"productName\":\""+i+"\",\"productPrice\":\""+i+"\"}";
            dataProductService.sendDataMessage(message);
            dataConsumerService.ConsumerData(message);
        }
        System.out.println(System.currentTimeMillis()-l);
    }
}
结果

image-20241002235126191

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2196504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【吊打面试官系列-MySQL面试题】试述视图的优点?

大家好&#xff0c;我是锋哥。今天分享关于【试述视图的优点&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; 试述视图的优点&#xff1f; (1) 视图能够简化用户的操作 (2) 视图使用户能以多种角度看待同一数据&#xff1b; (3) 视图为数据库提供了一定程度的…

8年JAVA逆袭转AI之路!成功拿下offer

前段时间有一个粉丝投稿&#xff0c;他是8年老Java程序员了&#xff0c;每天两小时的碎片化学习时间&#xff0c;不仅没有陷入程序员的年龄恐慌&#xff0c;还拿到了目前薪资翻倍的offer 问到他是什么让他坚持学了6个月&#xff0c;他用了华为总裁任正非说的“今后职场上只有…

Nginx03-使用

零、文章目录 Nginx03-使用 1、Nginx服务器启停命令 对于 Nginx 的启停在 Linux 系统中也有很多种方式&#xff0c;我们介绍两种方式&#xff1a; Nginx信号控制Nginx命令行控制 &#xff08;1&#xff09;Nginx信号控制 查看Nginx 中的 master 和 worker 进程 [rootloc…

计算机进制之间的关系

计算机中常见的进制 十进制、二进制、十六进制、八进制之间对照表 进制之间的转换 通过上面的十进制对应二进制进位的表示&#xff1a; 当二进制产生增加位数时&#xff0c;相对应十进制数为2、4、8、16、32、64、128&#xff0c;也被称为二进制的位权&#xff0c;根据规律可知…

linux中缓存,在kafka上应用总结

linux中的缓存 页缓存 pagecatch&#xff08;读缓存用于提供快速读&#xff09;块缓存&#xff08;用于提供其他设备快速写&#xff09;当对读缓存读的时候&#xff0c;修改了读的数据&#xff0c;页缓存就会被标记为脏数据&#xff0c;等到写的时候它会向块缓存同步数据&…

关于7zip解压缩的下载和使用

我们有的时候下载软件&#xff0c;后缀是 ".exe" 或者 “.zip”&#xff0c;".7z"等&#xff0c;".exe"文件还好&#xff0c;打开就能进行下载&#xff0c;但是“.zip”&#xff0c;".7z“等就需要用解压缩软件进行解压了。 今天介绍的解…

No.11 笔记 | PHP学习指南:从函数到面向对象概览

一、PHP函数&#xff1a;代码复用的艺术 1. 函数的本质与魅力 函数是PHP的核心力量&#xff0c;分为内置函数和自定义函数函数名应当简洁明了&#xff0c;以字母或下划线开头 2. 函数的构成要素 function 关键字&#xff1a;函数的开始标志函数名&#xff1a;您的函数的独特…

【Git原理与使用】远程操作标签管理

远程操作&&标签管理 1.理解分布式版本控制系统2.新建远程仓库3.克隆远程仓库4.向远程仓库推送5.拉取远程仓库6.配置 Git7.配置命令别名8.标签管理8.1创建标签8.2操作标签 点赞&#x1f44d;&#x1f44d;收藏&#x1f31f;&#x1f31f;关注&#x1f496;&#x1f496;…

把当抠门程序员,遇到了免费AI大模型

这篇想和大家分享一下&#xff0c;一个抠门的程序员和一个免费的AI大模型的故事。 “抠门程序员<–>免费大模型”&#xff0c;让我看看&#xff0c;能不能擦出马内的火花。 故事的开始 不知道有没有程序员和我一样&#xff0c;付费的东西&#xff0c;都会省着点开。什…

远程访问服务是什么?如何通过节点小宝远程访问办公室电脑?

在家办公若能各安其位、高效完成任务&#xff0c;实为美事。然而&#xff0c;现实往往不尽如人意&#xff0c;偶尔需用到办公室电脑上的资料&#xff0c;这时便需依赖远程访问服务的助力。那么&#xff0c;远程访问服务究竟是何方神圣&#xff1f;又该如何借助节点小宝实现对办…

解锁空间距离计算的多种方式-含前端、空间数据库、后端

目录 前言 一、空间数据库求解 1、PostGIS实现 二、GIS前端组件求解 1、Leaflet.js距离测算 2、Turf.js前端计算 三、后台距离计算生成 1、欧式距离 2、Haversice球面距离 3、GeoTools距离计算 4、Gdal距离生成 5、geodesy距离计算 四、成果与生成对比 1、Java不…

CSRF | POST 型 CSRF 漏洞攻击

关注这个漏洞的其他相关笔记&#xff1a;CSRF 漏洞 - 学习手册-CSDN博客 0x01&#xff1a;POST 型 CSRF 漏洞攻击 —— 理论篇 POST 型 CSRF 漏洞是指攻击者通过构造恶意的 HTTP POST 请求&#xff0c;利用用户的登录状态&#xff0c;在用户不知情的情况下&#xff0c;诱使浏览…

Mythical Beings:Web3游戏如何平衡创造内容、关注度与实现盈利的不可能三角

Web3游戏自其诞生以来&#xff0c;以去中心化和独特的代币经济体系迅速引起关注。然而&#xff0c;如何在创造内容、吸引用户和实现盈利之间达到平衡&#xff0c;始终是Web3游戏面临的核心挑战。Mythical Beings作为一款Web3卡牌游戏&#xff0c;通过创新设计和独特机制&#x…

java集合框架都有哪些

Java集合框架&#xff08;Java Collections Framework&#xff09;是Java提供的一套设计良好的支持对一组对象进行操作的接口和类。这些接口和类定义了如何添加、删除、遍历和搜索集合中的元素。Java集合框架主要包括以下几个部分&#xff1a; 接口&#xff1a; Collection&…

昆虫分类与检测系统源码分享

昆虫分类与检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Visio…

成都睿明智科技有限公司抖音电商服务佼佼者

在当今这个数字化浪潮汹涌的时代&#xff0c;抖音电商以其独特的魅力迅速崛起&#xff0c;成为众多商家竞相追逐的新蓝海。而在这场电商盛宴中&#xff0c;专业的服务商如同灯塔一般&#xff0c;为迷茫的商家指引方向。今天&#xff0c;我们就来深入探讨一家备受瞩目的服务商—…

Qt-QSpacerItem布局相关控件(45)

目录 描述 属性 使用 控件小结 描述 使⽤布局管理器的时候,可能需要在控件之间,添加⼀段空⽩.就可以使⽤ QSpacerItem 来表⽰ 属性 width宽度height⾼度hData⽔平⽅向的 sizePolicy • QSizePolicy::Ignored : 忽略控件的尺⼨&#xff0c;不对布局产⽣影响。 • QSizePol…

业务封装与映射 -- FlexE

什么是FlexE FlexE&#xff08;灵活以太网技术&#xff0c;Flexible Ethernet&#xff09;是由OIF 定义的灵活以太客户端接口标准&#xff0c; 是承载网实现业务隔离和网络分片的一种接口技术&#xff0c;支持路由器和光传输设备之间的灵活以太网连接&#xff0c;实现接口侧业务…

牛顿法、L-M算法

在进行解方程的时候&#xff0c;如下所示方程 其中&#xff0c;相应的k11、k12、k21、k22都是已知常量&#xff0c;可以见到其是一个非线性方程。关于非线程方程的求解&#xff0c;我看到网上有两种方法&#xff0c;牛顿法与L-M算法。 1.牛顿法 之前貌似学过&#xff0c;学过…

基于SSM的服装自销电商平台设计

文未可获取一份本项目的java源码和数据库参考。 一、选题背景 在当今这个信息时代&#xff0c;“网上购物”这种购物方式已经为越来越多的人所接受&#xff0c;越来越多的人选择在网络上购买衣服&#xff0c;方便快捷且实惠。在这种背景之下&#xff0c;一个安全稳定并且强大…