zookeperkafka学习

news2025/1/23 9:10:35

1、why kafka

优点   缺点
kafka
  • 吞吐量高,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。
延迟也会高,不适合电商场景。
RabbitMQ
  • 如果有大量消息堆积在队列中,性能会急剧下降
  • 每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
性能RocketMQ低
RocketMQ
  • 性能比RabbitMQ高一个数量级,适合电商场景。
  • RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
  • 每秒处理几十万的消息,同时响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。

2、Broker:

缓存代理(可以把Broker理解为Kafka的服务器),Kafka 集群中的一台或多台服务器统称为 broker。kafka中支持消息持久化的,生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,持久化是保存在kafka的日志文件中。 

3、分区:

一个消费者可以对应多个分区,一个分区只能对应一个消费者。

topic分区有leader和follower。 Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。 根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。Leader处理此分区的所有的读写请求,而follower被动的复制数据。如果leader宕机,其它的一个follower会被推举为新的leader。 一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。

4、消费者组 :

topic实现JMS模型中消费者组中只有一个消费者,这种情况下topic的消费的offset是无序的。当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

kafka为什么读写快?

利用零拷贝和页面缓存技术,零拷贝技术读取文件数据并发送到网络的步骤如下:

  • 将磁盘文件的数据复制到页面缓存。
  • 将数据从页面缓存直接发送到网卡从而发到网络中。

rebalance

主要是对partition的个数和group当中的consumer个数重新统计,再重新对应consumer和partition的关系。一个消费者可以对应多个分区。一个分区只能对应一个消费者。

kafka producer API

生产者的分区由key决定

我们创建消息的时候,必须要提供主题和消息的内容,而消息的key是可选的,当不指定key时默认为null。消息的key有两个重要的作用:1)提供描述消息的额外信息;2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。

如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。

如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。

案例:

Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384); //默认是16kB, 每个Batch要存放batch.size大小的数据后,才可以发送出去。
 props.put("linger.ms", 1); //一个Batch被创建之后,最多过多久,不管这个Batch有没有写满,都必须发送出去了。
 props.put("buffer.memory", 33554432); //默认是32MB,KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的Batch,再发送到Broker上去的。
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 
 producer.close();

kafka consumer API

案例一:手动同步提交

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

案例二:每个partition手动同步提交

try {
         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                //拿到这个partition下面的所有数据
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                //通过这个partition的list获取最后一个数据的offset
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

Kafka文件存储:

知道通过分片和索引机制找到offset的就行了。index和log文件以当前的第一条消息的offset命名。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1221676.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何用继承和多态来打印个人信息

1 问题 在python中的数据类型中&#xff0c;我们常常运用继承和多态。合理地使用继承和多态可以增强程序的可扩展性使代码更简洁。那么如何使用继承和多态来打印个人信息&#xff1f; 2 方法 打印基本信息添加子类&#xff0c;再定义一个class&#xff0c;可以直接从Person类继…

spring cloud openfeign 使用注意点

近期在做项目时给自己挖了一个坑&#xff0c;问题重现如下 使用的组件版本如下 spring boot 2.7.15&#xff0c;对应的 spring cloud 版本为 2021.0.5&#xff0c;其中 spring cloud 适配的 openfeign 版本是 3.1.5。 项目中使用的 feign 接口如下 public interface QueryApi…

高性能音乐流媒体服务Diosic

什么是 Diosic ? Diosic 是一个开源的基于网络的音乐收集服务器和流媒体。主要适合需要部署在硬件规格不高的服务器上的用户。Diosic 是使用 Rust 开发的&#xff0c;具有低内存使用率和高性能以及用于流媒体音乐的非常干净的界面。 安装 在群晖上以 Docker 方式安装。 在注…

Jenkins自动化部署一个Maven项目

Jenkins自动化部署 提示&#xff1a;本教程基于CentOS Linux 7系统下进行 Jenkins的安装 1. 下载安装jdk11 官网下载地址&#xff1a;https://www.oracle.com/cn/java/technologies/javase/jdk11-archive-downloads.html 本文档教程选择的是jdk-11.0.20_linux-x64_bin.tar.g…

赋能汽车企业数智化转型,鼎捷软件受邀出席“中国工业软件大会”

由中国国际智能产业博览会组委会、工业和信息化部、重庆市人民政府主办的“第三届中国工业软件大会”在重庆盛大召开。工业软件主管部委及政府部门、产业上下游企业代表和业内大咖、科教领域专家学者等800余位嘉宾代表齐聚&#xff0c;为加快制造业数字化转型和高质量发展建言献…

基于SpringBoot的SSMP整合案例(在Linux中发布项目的注意事项与具体步骤步骤)

前言与注意 这几天在Linux中上线之前的小项目时&#xff0c;遇到了很多的问题&#xff0c;Linux镜像的选择&#xff0c;jdk&#xff0c; mysql在linux中的下载&#xff0c;使用finallshell连接linux&#xff0c;使用tomcat连接linux中的数据库........ 在下面的注意事项中我会将…

人生阶段总结

--回顾一下我迷茫、努力、不开心又失败的阶段人生自我介绍一下&#xff0c;我是一个智力平平&#xff0c;记忆力差&#xff0c;适合自学的长睡眠者。 大专之前 国内的应试教育基本上不适合我&#xff0c;厌恶补课厌恶机械式听课刷题&#xff0c;所有的优势学科都是自学&#xf…

Sql Server 2017主从配置之:事务日志传送

使用事务日志传送模式搭建Sql Server 2017主从同步&#xff0c;该模式有一定的延迟&#xff0c;是通过3个不同的定时任务&#xff0c;将主库的日志同步到从库进行恢复来实现数据库同步操作。 环境准备 两台服务器&#xff0c;配置都是8g2核&#xff0c;50g硬盘&#xff0c;操…

CI/CD相关概念学习

文章目录 CI/CD相关概念学习前言CI/CD相关概念介绍集成地狱持续集成持续交付持续部署Devops CI/CD相关应用介绍JenkinsTekton PipelinesSpinnakerTravis CIGoCD CI/CD相关概念学习 前言 本文主要是介绍一些 CI/CD 相关的概念&#xff0c;通过阅读本文你将快速了解 CI/CD 是什么…

腾讯云4核8G服务器性能如何多少钱一年?

腾讯云服务器4核8G配置优惠价格表&#xff0c;轻量应用服务器和CVM云服务器均有活动&#xff0c;云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元&#xff0c;5年6490.44元&#xff0c;轻量应用服务器4核8G12M带宽一年446元、529元15个月&#xff0c;腾讯云百科txybk.com分…

在前端开发中,什么是CDN(Content Delivery Network)?它的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

Python实验项目7 :tkinter GUI编程

&#xff08;1&#xff09;利用tkinter 制作界面&#xff0c;效果图如下&#xff1a; from tkinter import * # winTk() for i in range(1,20):Button(width5,height10,bg"black" if i%20 else"white").pack(side"left") win.geometry("8…

“可信区块链运行监测服务平台TBM发展研讨会”将于11月23日在北京召开

为推动区块链治理与创新&#xff0c;积极推进信任科技生态体系建设&#xff0c;中国信息通信研究院、中国移动设计院联合区块链服务网络&#xff08;BSN&#xff09;发展联盟共同发起建立了可信区块链运行监测服务平台&#xff08;TBM&#xff09;。 TBM平台通过对区块链系统的…

idea显示pom.xml文件漂黄警告 Dependency maven:xxx:xxx is vulnerable

场景&#xff1a; idea警告某些maven依赖包有漏洞或者依赖传递有易受攻击包&#xff0c;如下&#xff1a; 解决&#xff1a; 1、打开idea设置&#xff0c;找到 File | Settings | Editor | Inspections 2、取消上述两项勾选即可

Verilog基础:仿真时x信号的产生和x信号对于各运算符的特性

相关阅读 Verilog基础https://blog.csdn.net/weixin_45791458/category_12263729.html?spm1001.2014.3001.5482 信号爆x也许是所有IC人的噩梦&#xff0c;满屏的红色波形常让人头疼不已&#xff0c;但x信号的产生原因却常常只有几种&#xff0c;只要遵循一定的代码规范&#…

开源与闭源软件的辩论:对大模型技术发展的影响

目录 前言1 开源软件的优缺点1.1 开源软件的优点1.2 开源软件的缺点和挑战 2 闭源软件的优缺点2.1 闭源软件的优点2.2 闭源软件的缺点和挑战 3 大模型发展会走向哪一边结语 前言 近期&#xff0c;特斯拉CEO马斯克公开表示&#xff1a;OpenAI不该闭源&#xff0c;自家首款聊天机…

JVM判断对象是否存活之引用计数法、可达性分析

目录 前言 引用计数法 概念 优点 缺点 可达性分析 概念 缺点&#xff1a; 扩展&#xff1a; 1.GC Roots 概念 2.STW (Stop the world) 前言 JVM有两种算法来判断对象是否存活&#xff0c;分别是引用计数法和可达性分析算法&#xff0c;针对可达性分析算法STW时间长、…

Python入门学习篇(一)——注释变量输入输出

1 注释 1.1 作用 a 方便他人和自己阅读代码 b 告诉编译器这部分内容是不用执行的。1.2 单行注释 # 注释内容1.3 多行注释(引号) 1.3.1 三对双引号 """ 注释内容 """1.3.2 三对单引号 注释内容 1.4 pycharm快捷键使用 ctrl/ 多行注释(以# …

基于SSM的中小型企业财务管理设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

【413.等差数列划分】

目录 一、题目描述二、算法原理三、代码实现 一、题目描述 二、算法原理 三、代码实现 class Solution { public:int numberOfArithmeticSlices(vector<int>& nums) {int nnums.size();if(n<3) return 0;vector<int> dp(n);dp[2]dp[1]dp[0]0;if(nums[2]-nu…