【Kafka】Kafka提高生产者吞吐量、数据可靠性-06

news2024/11/25 7:10:33

【Kafka】Kafka提高生产者吞吐量-06

  • 1. 提高生产者吞吐量
  • 2.数据可靠性
    • 2.1 回顾数据的发送流程
    • 2.2 ack应答级别
      • 2.2.1 acks:0
      • 2.2.2 acks:1
      • 2.2.2 acks:-1(all)
        • 2.2.2.1 数据可靠性分析
        • 2.2.2.2 数据完全可靠
    • 2.3 可靠性总结
    • 2.4 可靠性代码配置

1. 提高生产者吞吐量

在这里插入图片描述

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerParameters {
    public static void main(String[] args) throws InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "hadoop102:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new
                    ProducerRecord<>("first", "atguigu " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

测试:
①在 hadoop102 上开启 Kafka 消费者

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4

2.数据可靠性

2.1 回顾数据的发送流程

在这里插入图片描述

2.2 ack应答级别

在这里插入图片描述

2.2.1 acks:0

在这里插入图片描述
当应答级别为0时,生产者发送过来的数据,不需要等数据落盘应答。
工作:生产者将数据发送到leader中,如果leader突然挂掉了,leader还没有与follower同步,那么整个数据就全部都丢了。

2.2.2 acks:1

在这里插入图片描述
当应答级别为1时,生产者发送过来的数据,Leader收到数据后应答。

工作:如果应答完毕之后,leader还未与follower同步,leader挂了,新的leader会产生,原来的一条数据不会再次发送,造成了数据的丢失。

2.2.2 acks:-1(all)

在这里插入图片描述
当应答级别为-1时,生产者发送过来的数据,Leader+和isr队列
里面的所有节点收齐数据后应答。-1和all等价。

工作:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader同步,那这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica set (ISR),意为和Leader保持问步的Folower + Leader集合(leader:0, isr:0,1,2)

如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。例如2超时,(leader:0, isr:0,1)。

这样就不用等长期联系不上或者己经故障的节点。

2.2.2.1 数据可靠性分析

如果分区副本设置为1个,或者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)


2.2.2.2 数据完全可靠

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

2.3 可靠性总结

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高

  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等

  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低

  4. 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

2.4 可靠性代码配置

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerAck {
    public static void main(String[] args) throws
            InterruptedException {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置 acks
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        // 重试次数 retries,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1828614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[C++] vector list 等容器的迭代器失效问题

标题&#xff1a;[C] 容器的迭代器失效问题 水墨不写bug 正文开始&#xff1a; 什么是迭代器&#xff1f; 迭代器是STL提供的六大组件之一&#xff0c;它允许我们访问容器&#xff08;如vector、list、set等&#xff09;中的元素&#xff0c;同时提供一个遍历容器的方法。然而…

Vue26-内置指令03:v-cloak指令

一、需求 将引入本地JS的代码&#xff0c;换成引入外部JS&#xff0c;且引入的外部JS要等待5S。 【备注】&#xff1a;浏览器也能调节网速 二、js阻塞 <body>的最下方也能引入JS&#xff1a; 此时&#xff0c;用户能在5S内看到root容器未编译的部分。 解决该问题&#x…

工程设计问题---滚动轴承问题

参考文献&#xff1a; [1]李煜,梁晓,刘景森,等.基于改进平衡优化器算法求解工程优化问题[J/OL].计算机集成制造系统,1-34[2024-06-16].

高级人工智能复习 题目整理 中科大

题目整理 填空 1.准确性&#xff0c;复杂性&#xff0c;验证集 2. 3 2 n 3^{2^n} 32n 3 C 2 n m 3^{C^m_{2n}} 3C2nm​ 3 m 3^m 3m n 1 n1 n1 3. 状态 从状态s采取行动a后继续采用策略 π \pi π的收益 环境 4. 语法 语义 推理规则 5. 参与者&#xff0c;策略集&#xff…

FreeRTOS简单内核实现5 阻塞延时

文章目录 0、思考与回答0.1、思考一0.2、思考二0.3、思考三 1、创建空闲任务2、实现阻塞延时3、修改任务调度策略4、提供延时时基4.1、SysTick4.2、xPortSysTickHandler( )4.3、xTaskIncrementTick( ) 5、实验5.1、测试5.2、待改进 0、思考与回答 0.1、思考一 为什么 FreeRTO…

MySQL基础——多表查询和事务

目录 1多表关系 2多表查询概述 3连接查询 3.1内连接 3.2左外连接 3.3右外连接 3.4自连接 4联合查询 5子查询 5.1标量子查询(子查询结果为单个值) 5.2列子查询(子查询结果为一列) 5.3行子查询(子查询结果为一行) 5.4表子查询(子查询结果为多行多列) 6事务简介和操…

Axios进阶

目录 axios实例 axios请求配置 拦截器 请求拦截器 响应拦截器 取消请求 axios不仅仅是简单的用基础请求用法的形式向服务器请求数据&#xff0c;一旦请求的端口与次数变多之后&#xff0c;简单的请求用法会有些许麻烦。所以&#xff0c;axios允许我们进行创建axios实例、ax…

MongoDB~分片数据存储Chunk;其迁移原理、影响,以及避免手段

分片数据存储&#xff1a;Chunk存储 Chunk&#xff08;块&#xff09; 是 MongoDB 分片集群的一个核心概念&#xff0c;其本质上就是由一组 Document 组成的逻辑数据单元。每个 Chunk 包含一定范围片键的数据&#xff0c;互不相交且并集为全部数据。 分片集群不会记录每条数据…

计算机组成原理之定点除法

文章目录 定点除法运算原码恢复余数法原码不恢复余数法&#xff08;加减交替法&#xff09;运算规则 习题 定点除法运算 注意 &#xff08;1&#xff09;被除数小于除数的时候&#xff0c;商0 &#xff08;2&#xff09;接下来&#xff0c;有一个除数再原来的基础上&#xff0c…

Vue主要使用-03

组件通讯 组件通讯也是我们需要了解的,在我们的实际开发中,我们使用的非常多,比如父组件内的数据传入到子组件,子组件的数据传入到父组件,什么是父组件什么是子组件&#xff1f;父组件内包含着我们的子组件,我们的父组件可以有多个子组件,父组件就是我们使用子组件拼接的。 …

【字符串】65. 有效数字

本文涉及知识点 字符串 LeetCode65. 有效数字 给定一个字符串 s &#xff0c;返回 s 是否是一个 有效数字。 例如&#xff0c;下面的都是有效数字&#xff1a;“2”, “0089”, “-0.1”, “3.14”, “4.”, “-.9”, “2e10”, “-90E3”, “3e7”, “6e-1”, “53.5e93”,…

举例说明 如何通过SparkUI和日志定位任务莫名失败?

有一个Task OOM&#xff1a; 通过概览信息&#xff0c;发现Stage 10的Task 36失败了4次导致Job失败。概览信息中显示最后一次失败的退出代码&#xff08;exit code&#xff09;是143&#xff0c;意味着发生了内存溢出&#xff08;OOM&#xff0c;即Out of Memory&#xff09;。…

漏洞分析|PHP-CGI Windows平台远程代码执行漏洞(CVE-2024-4577)

1.漏洞描述 CVE-2024-4577导致漏洞产生的本质其实是Windows系统内字符编码转换的Best-Fit特性导致的&#xff0c;相对来说PHP在这个漏洞里更像是一个受害者。 PHP语言在设计时忽略了Windows系统内部对字符编码转换的Best-Fit特性&#xff0c;当PHP运行在Window平台且使用了如…

一键自动粘贴,高效处理邮箱地址,让你的工作效率翻倍提升!

在信息爆炸的时代&#xff0c;邮箱地址已成为我们日常工作和生活中的必备元素。无论是商务沟通、报名注册还是信息传递&#xff0c;邮箱地址都扮演着至关重要的角色。然而&#xff0c;手动复制粘贴邮箱地址的繁琐操作往往让人头疼不已&#xff0c;不仅效率低下&#xff0c;还容…

锁存器的工作原理及其在FPGA设计中的注意事项

锁存器&#xff08;Latch&#xff09;是数字电子中常用的一种基本元件&#xff0c;用于在特定的时间点或条件下“锁存”或保存输入的数据值。锁存器对脉冲电平敏感&#xff0c;它只在输入脉冲的高电平&#xff08;或低电平&#xff09;期间对输入信号敏感并改变状态。在数字电路…

【原理图PCB专题】案例:为什么要把Cadence原理图符号库设计好

Cadence 原理图符号库设计对于提高设计质量、效率和可维护性具有重要意义。一份好的原理图符合库能够帮助我们更好的达成设计目标: 提高设计效率:拥有一个完善的符号库可以让设计师直接调用所需的符号,避免重复绘制,从而节省时间。 确保设计准确性:统一的符号库可以保证符…

快速压缩前端项目

背景 作为前端开发工程师难免会遇到需要把项目压缩成压缩文件来传送的情况&#xff0c;这时候需要压缩软件进行压缩文件处理 问题 项目中的依赖包文件非常庞大&#xff0c;严重影响压缩速度&#xff0c;即使想先删除再压缩&#xff0c;删除文件也不会很快完成 解决 首先要安…

一键分析Bulk转录组数据

我们前面介绍了经典的转录组分析流程&#xff1a;Hisat2 Stringtie&#xff0c;可以帮助用户快速获得基因的表达量矩阵。 云上生信&#xff0c;未来已来 | 转录组标准分析流程重磅上线&#xff01; RNA STAR 也是一款非常流行的转录组数据分析工具。它不仅可以将测序 Reads 比…

博通加速向Nvidia发起进攻 为何连iPhone 15都不能用“苹果智能”?

博通加速向Nvidia发起进攻 博通强调的一项优势是其 XPU 的能效。其功耗不到 600 瓦&#xff0c;是业内功耗最低的 AI 加速器之一。 Nvidia 的许多竞争对手都想抢占其市场主导地位。其中一个不断出现的名字是 Broadcom。仔细观察就会知道原因。其 XPU 功耗不到 600 瓦&#xff…

JavaScript-转换成布尔型

学习目标&#xff1a; 掌握转换成布尔型 学习内容&#xff1a; 显示转换隐式转换 显示转换&#xff1a; Boolean&#xff08;内容&#xff09; 记忆&#xff1a;、0、underfined、null、false、NaN转换成布尔值后都是false&#xff0c;其余则为true。 console.log(Boolean(p…