大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析

news2024/11/15 19:36:55

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Java添加POM依赖
  • Java操作Kafka的API、SpringBoot
  • 实现对Kafka消息发送和消息消费

在这里插入图片描述

基本流程

在这里插入图片描述

  • Producer创建时,会创建一个Sender线程并设置为守护进程
  • 生产消息时,内部其实是异步流程,生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
  • 批次发送的条件是:缓冲区的大小达到batch.size或者linger.ms达到上限,哪个先到达就算哪个
  • 批次发送后,发往指定的分区后,然后落盘到broker
  • 如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试
  • 落盘到Broker成功 返回生产元数据给生产者
  • 元数据返回的两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

Broker配置

这里是Broker的常见配置:
在这里插入图片描述

bootstrap.servers

生产者客户端与broker集群建立初始链接需要Broker的地址列表,由该初始连接发现Kafka集群中其他的所有Broker,该地址列表不需要写全部的Kafka集群地址,但也不要只写一个防止宕机不可用。

key.serializer

实现了 org.apache.kafka.common.serialization.Serializer 的key序列化类

value.serializer

实现了 org.apache.kafka.common.serialization.Serializer的value序列化类

acks

该项控制着已发消息的持久性。

  • acks=0,生产者不等待Broker的任何消息确认。
  • acks=1,Leader将记录写到它本地的地址,就相应客户端的消息,而不等待Follower的副本的确认。
  • acks=all,Leader等待所有有同步副本消息的确认,保证了只要有一个同步副本存在,消息就不会丢失。
  • acks=-1,等价于 acks=all
    默认值为1

compression.type

生产者生成数据的压缩格式,默认是none(无压缩)。
可选:

  • none
  • gzip
  • snappy
  • lz4

默认是none

Broker配置补充

额外的配置还有下图的这些内容:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

retry.backoff.ms

在向一个指定的主题分区重发消息的时候,重试之间的等待时间。
比如三次重试,每次重试之后等待时间长度,再接着重试。
long型 默认 100

retries

retries 重试次数

  • 当消息发送出现错误的时候,系统会重新发送消息,跟客户端收到错误重新发送一样。
  • 如果设置了重试,还要保证消息有序,则需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1

request.timeout.ms

客户端等待请求响应时长,如果服务器端响超时,则会请求重试,除非达到重试次数。
设置应该要大于:replica.lag.time.max.ms,以免服务器延迟时间内重发消息。

int型 默认 30000

interceptor.classes

在生产者接收到该消息,向Kafka集群传输之前,由序列化处理之前,可以通过拦截器对消息进行处理。

  • 要求实现:org.apache.kafka.clients.producer.ProducerInterceptor 接口
  • Map[String, Object] configs 中通过 List集合配置多个拦截器
    默认没有拦截器

acks

同上,不介绍了。

batch.size

当多个消息发送到同一个分区时候,生产者尝试将多个记录作为一个批处理,批处理提高了客户端和副武器的处理效率。该配置项以字节为单位控制默认批的大小。

  • 所有批小于等于该值
  • 发送给Broker的请求将包含多个批次,每个分区一个,并包含可发送的数据
  • 如果该值设置的较小,会限制吞吐量(设置为0会完全关闭批处理);若很大则会浪费内存

client.id

  • 生产者发送请求的时候传递给Broker的id字符串
  • 用于在Broker的请求日志中追踪什么应用发送什么消息
  • 一般该ID跟业务有关的字符串

compression.type

同上,不介绍了。

send.buffer.bytes

TCP发送数据的时候用的缓冲区的大小,若设置为0,则用操作系统默认的。

buffer.memory

生产者可以用来缓存等待发送到服务器的记录的总内存字节,如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时间,此后将引发异常。
此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。

connections.max.idle.ms

当连接空闲时间达到这个值,就关闭连接。
long型 默认 540000

linger.ms

生产者发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次发送,一般情况是消息的发送速度比消息积累的速度要慢。
有时候客户端需要减少请求次数,即使在负载不大的情况下。该配置设置了一个延迟,生产者消息不会立即将消息送到Broker,而是等待这么一段时间以积累消息,然后将这段消息之类的消息作为一个批次发送,该设置是批处理的另一个上限,一旦此消息达到了batch.size指定的值,消息批会立即发送,如果积累的消息字节数达不到batch.size的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。
默认值是0

max.block.ms

控制KafkaProducer.send()和KafkaProducer.partitionFor()阻塞时长,当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。
long型值,默认60000

max.request.size

单个请求的最大字节数,该设置会限制单个请求总消息批的个数,以免单个请求发送太多的数据,服务器有自己的限制批大小的设置,与该配置可能不一样
int 型 默认 1048576

partitioner.class

实现了接口 org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes

TCP接收缓存(SO_RECVBUF),设置为01,则使用操作系统默认的值。
int型 默认32768

security.protocol

跟 Broker 通信的协议:PLAINTEXT、SSL、SASL_PLAINTEXT、ASAL_SSL
String型 默认 PLAINTEXT

max.in.flight.requests.per.connection

单个连接上未确认请求的最大数量,达到这个数量,客户端阻塞。
如果该值大于1,则存在失败的请求,在重试的时候消息顺序不能保证。
int型 默认5

reconnect.backoff.max.ms

对于每个连续的连接失败,每台主机退避将成倍增加,直到达到此最大值。

reconnect.backoff.ms

尝试重连指定主机的基础等待时间,避免该主机的密集重连。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1974030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

反转链表~

一:初始化 public class ListNode {public int val;public ListNode next;public ListNode(int val,ListNode next){this.val val;this.next next;}Overridepublic String toString(){StringBuilder sb new StringBuilder(64);sb.append("[");ListNod…

【系统架构设计师】二十四、安全架构设计理论与实践①

目录 一、安全架构概述 1.1 信息安全面临的威胁 1.1.1 安全威胁分类 1.1.2 常见的安全威胁 1.2 安全架构的定义和范围 二、安全模型 2.1 状态机模型 2.2 Bell-LaPadula模型 2.3 Biba模型 2.4 Clark-Wilson模型 2.5 Chinese Wall 模型 往期推荐 一、安全架构概述 1…

vue3+vue-simple-uploader +SpringBoot实现大文件分块上传

效果图 一、安装所需依赖包 npm install vue-simple-uploadernext --savenpm install spark-md5 --save二、main.ts 注册组件 import { createApp } from vue import uploader from vue-simple-uploader import vue-simple-uploader/dist/style.css import App from ./App.vu…

Java ArrayList源码阅读笔记(基于JDK17)

Java ArrayList源码阅读笔记(基于JDK17) 虽然不喜欢看源码,但是据说会让人变强啊,看别的大佬的代码也许才知道怎么处理自己的一坨吧,因此冒着秃顶的风险还是来看看吧。。。 第一遍先简单看看吧,搞不清楚的…

双阈值最大最小值筛选

问题: 如下图所示的问题,给定最小阈值、最大阈值以及一段数据队列,对数据队列中超过阈值部分的极值进行保存,即从队列中得到P1-P6 计算规则 规则类似状态机 首先定义last_type标志位: { 上一时刻大于 m a x _ t h…

win7安装mysql-installer-community-8.0.11.0

1、安装Microsoft Visual C 2019 Redistributable Package (x64) 官网下载地址:https://learn.microsoft.com/en-us/cpp/windows/latest-supported-vc-redist?viewmsvc-160#latest-microsoft-visual-c-redistributable-version 通过百度网盘分享的文件&#xff1…

VBA_MF系列技术资料1-680

MF系列VBA技术资料1-680 WORD 目录下载地址: 链接:https://pan.baidu.com/s/17TrFO37OgnjiwvACvMna_A?pwdbr3g 提取码:br3g 为了让广大学员在VBA编程中有切实可行的思路及有效的提高自己的编程技巧,我参考大量的资料&#xff…

大数据-63 Kafka 高级特性 分区 副本机制 宕机恢复 Leader选举

点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…

【全国大学生电子设计竞赛】2022年D题

🥰🥰全国大学生电子设计大赛学习资料专栏已开启,限时免费,速速收藏~

详解Xilinx FPGA高速串行收发器GTX/GTP(2)--什么是GTX?

GTX本质上是基于SerDes技术的高速串行收发器,它是FPGA内部的底层电路,也叫做Gigabit Transceiver(吉比特收发器,简称为GT)。其中A7系列使用的GT叫GTP,K7系列使用的GT叫GTX,V7系列使用的GT叫GTH和GTZ,它们的结构大致相同,但是线速率的关系是 GTZ>GTH>GTX>GTP,…

Android进程保活:如何让app一直运行

目录 1)为什么需要进行进程保活呢?需求是什么? 2)进程分类 3)进程的优先级 4)如何提高进程优先级 5)如何进行进程保活 一、为什么需要进行进程保活呢?需求是什么? 比如…

国标GB28181视频平台LntonCVS视频融合共享平台视频汇聚应用方案

近年来,国内视频监控应用迅猛发展,系统接入规模不断扩大,导致了大量平台提供商的涌现。然而,不同平台的接入协议千差万别,使得终端制造商不得不为每款设备维护多个不同平台的软件版本,造成了资源的严重浪费…

工业大数据通过哪些方式实现价值?详解实施工业大数据的难点!

在数字化转型的浪潮中,工业大数据正成为推动制造业革新的核心动力。它不仅重塑了生产流程,还为企业带来了前所未有的洞察力和竞争优势。本文将深入探讨工业大数据的类别、价值实现方式,以及在实施过程中存在的挑战和解决方案。 更多详细内容&…

JavaScript和vue实现左右两栏,中间拖动按钮可以拖动左右两边的宽度

JavaScript实现&#xff1a; <!DOCTYPE html> <html lang"en"> <head><title>拖动效果</title><style> body, html {margin: 0;padding: 0;height: 100%;font-family: Arial, sans-serif; }.container {display: flex;height: …

pytest测试框架之http协议接口测试

1 接口测试 日常测试中接口测试是一项重要的工作&#xff0c;尤其是http协议的接口测试更加普遍,比如一些常用的测试框架或者工具&#xff08;robotframework框架&#xff0c;testng框架&#xff0c;postman等&#xff09;都支持http接口的测试&#xff0c;而这节内容主要介绍…

函数:全局,局部和静态变量

文章目录 &#x1f34a;自我介绍&#x1f34a;全局变量&#x1f34a;局部变量&#x1f34a;静态局部变量 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以&#xff1a;点赞关注评论收藏&#xff08;一键四连&#xff09;哦~ &#x1f34a;自我介绍 Hello,大家好&#x…

力扣SQL50 餐馆营业额变化增长 子连接

Problem: 1321. 餐馆营业额变化增长 &#x1f468;‍&#x1f3eb; 参考题解 Code select a.visited_on,sum(b.amount) as amount, round(sum(b.amount) / 7,2) as average_amount from (select distinct visited_on from customer) a join customer bon datediff(a.visited…

window安装elasticsearch和可视化界面kibana

ElasticSearch 官网下载zip安装包并解压 Elasticsearch&#xff1a;官方分布式搜索和分析引擎 | Elastic 修改配置文件 改选项是指定ssl访问还是普通http访问 不改的话使用http访问不了&#xff0c;得使用https 浏览器访问 localhost:9200 Kibana Download Kibana Free |…

MySQL 将文件导入数据库(load data Statement)

前面我们介绍过如何用select…into outfile语句将SQL查询结果导出到文件&#xff1a; MySQL 将查询结果导出到文件&#xff08;select … into Statement&#xff09; MySQL同时也提供互补的功能&#xff0c;可以使用load data infile语句将文件中的数据加载到数据库中&#x…

Robot Operating System——Action通信机制的服务端

大纲 回调接受或者拒绝请求执行任务的回调终止任务回调 创建服务完整代码总结 在《Robot Operating System——Action通信机制概念及Client端》一文中&#xff0c;我们介绍了Action客户端的主要流程。本文我们将介绍Action服务端的编写。 回顾下Action的构成: 目标&#xff0…