【大数据之Kafka】五、Kafka生产者之生产经验

news2025/3/14 23:17:06

1 生产者如何提高吞吐量

  由于linger.ms默认为0,即缓冲区队列中一有数据就sender线程就将其拉出到Kafka集群,效率比较低,提高生产者吞吐量有四种方式:

(1)扩大批次的大小batch.size,默认为16k,当数据积累到batch.size时sender线程才拉取数据。

(2)扩大sender的等待时间linger.ms,默认为0ms,可以修改为2-100ms。

(3)对缓冲区队列中的数据进行压缩再积累由sender拉取compression.type

(4)扩大缓冲区大小RecordAccumulator,默认为32M,修改为64M。

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerParameters {
    public static void main(String[] args) {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);

        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // compression.type:压缩,默认 none,可配置值 gzip、snappy、 lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        

        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        //2.调用 send 方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","test"+i));
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

2 数据可靠性

ACK应答级别:
(1)acks=0:生产者发送过来的数据,不需要等待数据落盘应答。(会丢失数据
  即生产者发送数据过来就不管了,可靠性差,效率高,很少使用。
在这里插入图片描述
(2)acks=1:生产者发送过来的数据,Leader收到数据后应答。(会丢失数据
  当Leader应答完成但还没有开始同步副本时Leader挂了,新的Leader不会收到刚发来的数据,因为生产者接收到应答acks即认为发送成功了。
  即生产者发送数据过来等待Leader应答,可靠性中,效率中,一般用于传输普通数据,允许丢个别数据。
在这里插入图片描述
(3)acks=-1或all:生产者发送过来的数据,Leader和ISR队列里的所有节点收齐数据后应答,可靠性高,效率低,一般用于传输与钱有关的数据,对可靠性要求比较高的场景。
在这里插入图片描述
但会出现一种情况:
  Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步

解决:
  Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。
  如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值replica.lag.time.max.ms参数设定默认30s。例如2超时,(leader:0, isr:0,1)。这样就不用等长期联系不上或者已经故障的节点。

数据可靠性分析
  如果分区副本设置为1个,或者ISR里应答的最小副本数量( min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量>=2

数据重复分析
  acks=-1或all时,生产者发送数据给Leader,Leader接受到数据后返回确认ack给生产者并同步数据到副本,此时Leader挂了,但是生产者并没有接收到返回的ack,所以生产者重新给新的Leader发送数据,导致数据重复。
在这里插入图片描述

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerAck {
    public static void main(String[] args) {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 设置 acks 
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 重试次数retries,默认是 int 最大值,2147483647 
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);


        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //2.调用 send 方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","test"+i));
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

3 数据去重

数据重复分析
  acks=-1或all时,生产者发送数据给Leader,Leader接受到数据后返回确认ack给生产者并同步数据到副本,此时Leader挂了,但是生产者并没有接收到返回的ack,所以生产者重新给新的Leader发送数据,导致数据重复。
在这里插入图片描述

3.1 数据传递语义

(1)至少一次(At Least Once),可以保证数据不丢失,但是不能保证数据不重复。
  至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
(2)最多一次(At Most Once),可以保证数据不重复,但是不能保证数据不丢失
  最多一次(At Most Once)= ACK级别设置为0
(3)精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
  Kafka 0.11版本以后,引入幂等性和事务

3.2 幂等性

  幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性+ 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)

重复数据的判断标准:
  具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其
中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。所以幂等性只能保证的是在单分区单会话内不重复

使用幂等性:
  开启参数 enable.idempotence 默认为 true,false 关闭。
在这里插入图片描述

3.3 生产者事务

  开启事务必须要先开启幂等性。
  Producer 在使用事务功能前,必须先自定义一个唯一的 transactional.id。有了 transactional.id,即使客户端挂掉了,它重启后也能继续处理未完成的事务。
在这里插入图片描述
Kafka 的事务一共有如下 5 个API:

// 1 初始化事务
void initTransactions();

// 2 开启事务
void beginTransaction()throws ProducerFencedException;

// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException;

// 4 提交事务
void commitTransaction()throws ProducerFencedException;

// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction()throws ProducerFencedException;

单个 Producer,使用事务保证消息的仅一次发送。

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerTransactions {
    public static void main(String[] args) {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 设置事务 id(必须),事务 id 任意起名,全局唯一
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");

        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 初始化事务
        kafkaProducer.initTransactions();

        // 开启事务
        kafkaProducer.beginTransaction();

        try {
            //2.调用 send 方法,发送消息
            for (int i = 0; i < 3; i++) {
                kafkaProducer.send(new ProducerRecord<>("first","test"+i));
            }
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (ProducerFencedException e) {
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            //3.关闭资源
            kafkaProducer.close();
        }

    }
}

4 数据有序

消费者接收到的:单分区内有序,多分区间无序。
在这里插入图片描述

5 数据乱序

  生产者端中每个节点的每个队列最多缓存5个请求,在Kafka集群没有回应的情况下最多可以发送5个数据。若前有个数据发送失败,但其前面的数据发送成功,其后数据正常发送,且发送该数据会重试,导致数据到达Kafka集群时出现乱序。

解决:
  (1)kafka在1.x版本之前保证数据单分区有序,条件如下:
   max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
  (2)kafka在1.x及以后版本保证数据单分区有序,条件如下:
   (i)未开启幂等性:max.in.flight.requests.per.connection需要设置为1。
   (ii)开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5。
   原因说明:
    因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,缓存5个数据后再进行排序。故无论如何,都可以保证最近5个request的数据都是有序的。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/943642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode-160. 相交链表

这是一道真的非常巧妙的题&#xff0c;题解思路如下&#xff1a; 如果让他们尾端队齐&#xff0c;那么从后面遍历就会很快找到第一个相交的点。但是逆序很麻烦。 于是有一个巧妙的思路诞生了&#xff0c;如果让短的先走完自己的再走长的&#xff0c;长的走完走短的&#xff0c;…

达梦数据库屏蔽关键字

一、配制位置 配置设备&#xff1a;需要连接数据库的程序运行的设备&#xff1b; 配置位置&#xff1a; 1.32 位的 DM 安装在 Win32 操作平台下&#xff0c;此文件位于%SystemRoot%\system32 目录&#xff1b; 2.64 位的 DM 安装在 Win64 操作平台下&#xff0c;此文件位于…

在线制作作息时间表

时光荏苒&#xff0c;岁月如梭&#xff0c;人们描述时光易逝的句子&#xff0c;多如星河。 一寸光阴一寸金&#xff0c;寸金难买寸光阴。 人生就是一段时间而已&#xff0c;所以我明白了一个道理 人生之中最大的浪费就是时间的浪费 因此我想我们教给我们孩子重要的一课应该也是…

PMO的秘密武器:项目评估与审计机制的深度解析

引言&#xff1a;PMO的核心价值与挑战 项目管理办公室&#xff08;PMO&#xff09;在组织中的角色日益重要。它是组织中的一个关键部门&#xff0c;负责确保项目的顺利进行&#xff0c;并确保项目与组织的整体战略保持一致。但与此同时&#xff0c;PMO也面临着许多挑战。其中最…

1.6 编写双管道ShellCode

本文将介绍如何将CMD绑定到双向管道上&#xff0c;这是一种常用的黑客反弹技巧&#xff0c;可以让用户在命令行界面下与其他程序进行交互&#xff0c;我们将从创建管道、启动进程、传输数据等方面对这个功能进行详细讲解。此外&#xff0c;本文还将通过使用汇编语言一步步来实现…

【WINAPI】文件读写操作问题

问题描述 在利用WINAPI中的WriteFile和ReadFile函数进行文件读写操作时&#xff0c;出现无法正常读写文件报错。 分析问题 查阅WINAPI源码&#xff0c;查看参数列表各个参数的数据类型。 发现其中第二个参数&#xff0c;也就是需要写进文件的真实数据&#xff0c;其数据类型…

Google Play商店优化排名因素之应用的评分评论

下载次数是应用程序受欢迎程度的指标&#xff0c;Google在对我们的应用程序进行排名时也会将其考虑在内。评级和评论会影响应用程序的转化率&#xff0c;因为许多用户在做出决定之前会根据平均评级或最近的评论来评估我们的应用程序。 1、评级的重要性。 如果我们的应用程序有…

安卓版yolo-fastest

安卓版本yolofastest效果测试 安卓配置OPENCV4ANDROID&#xff0c;见我的博客一篇文章opencv4dandroid配置 这个不需要使用JNI&#xff0c;十分简单的配置 说真的&#xff0c;其实只调用OPENCV的函数&#xff0c;自己写的代码不多&#xff0c;使用OPENCV4ANDROID和JNI的时间差…

im6ull-uboot(2021.07)移植(一)

文章目录 声明1 获取源码1.1 从u-boot官网获取1.2 从芯片厂商获取1.3 从开发板厂商获取 2 修改顶层Makefile3 xxx_defconfig配置文件3.1 拷贝生成自己的配置文件3.2 修改defconfig文件3.2.1 查看defconfig文件3.2.2 修改defconfig文件 3.3 添加其他配置文件3.3.1 添加配置头文件…

深入了解电商API的历史和业务场景

在数字时代&#xff0c;电子商务的兴起改变了传统的商业模式&#xff0c;使得商品和服务的交易变得更加便捷和高效。作为电子商务的重要组成部分&#xff0c;电商API&#xff08;应用程序编程接口&#xff09;在过去的几十年中得到了广泛的应用和不断发展。本文将深入探讨电商A…

css 分割线中间带文字

效果图 代码块&#xff08;自适应&#xff09; <div class"line"><span class"text">我是文字</span></div>.line{height:0;border-top:1px solid #000;text-align:center;}.text{position:relative;top:-14px;background-color:#…

如何查看相机的配置信息,以及获取相机当前状态信息---deepstream

sudo apt-get install v4l2-utils v4l2-ctl --list-devicesv4l2-ctl --list-formats-ext --device0 v4l2-ctl --list-formats-ext --device1参考链接&#xff1a; https://www.elecfans.com/d/1677110.html

特斯拉自动驾驶遭质疑?面临两起 Autopilot 诉讼,均为致命事故

根据最新消息&#xff0c;两起涉及特斯拉 Autopilot 的诉讼将在今年的九月和十月分别在法庭上审理。此类诉讼对于特斯拉及其自动驾驶项目的声誉可能带来一定的负面影响。首先&#xff0c;这些诉讼案件的存在将引起公众对特斯拉 Autopilot 系统安全性。 除了以上提到的两起诉讼&…

【进阶篇】MySQL 存储引擎详解

文章目录 0.前言1.基础介绍2.1. InnoDB存储引擎底层原理InnoDB记录存储结构和索引页结构InnoDB记录存储结构&#xff1a;InnoDB索引页结构&#xff1a; 3. MVCC 详解3.1. 版本号分配&#xff1a;3.2. 数据读取&#xff1a;3.3. 数据写入&#xff1a;3.4. 事务隔离级别&#xff…

servlet内存马学习

项目配置 注意一定要添加 否则访问路径会404&#xff0c;tomcat并没有对项目生效 1.实现javax.servlet.Servlet接口的方式 Servlet.class&#xff1a; package org.test;import javax.servlet.*; import java.io.IOException;public class ServletTest implements Servle…

华为OD机试 - MELON的难题 - 动态规划(Java 2023 B卷 100分)

目录 一、题目描述二、输入描述三、输出描述四、动态规划五、解题思路六、Java算法源码七、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 一、题目描述 MELON有一堆精美的雨花石(数量为n&#xff0c;重量各异)&#xff0c;准备送给…

为什么很多干了四五年的老IT告诉你算法没用?

文章目录 前言认知决定上限为什么学算法&#xff1f;为什么很多干了四五年的老IT告诉你算法没用&#xff1f;怎么学算法&#xff1f;算法集训 前言 英雄算法联盟八月集训 已经接近尾声&#xff0c;九月算法集训将于 09月01日 正式开始&#xff0c;目前已经提前开启报名&#xf…

学乐多光屏P90:引领儿童智慧教育新时代

随着科技的迅猛发展&#xff0c;儿童教育正经历着深刻变革。在这个引领变革的浪潮中&#xff0c;学乐多光屏P90以其卓越的特性和教育价值&#xff0c;成为了儿童智慧教育的引领者&#xff0c;为孩子们打开了通向知识世界的大门。 学乐多光屏P90的独特之处在于其融合了触摸和投影…

图像生成模型

监督学习 与 无监督学习 监督学习 数据&#xff1a;&#xff08;x, y&#xff09; X是数据&#xff0c;Y是标签 目标&#xff1a;学习一个从x到y的函数映射 样例&#xff1a;分类、回归、物体检测、语义分割、描述 无监督学习 数据&#xff1a;&#xff08;x&#xff09; 只有…

DPI 设置和获取

DPI设置与获取之前请保证程序已经打开DPI感知或者在清单文件嵌入DPI感知&#xff0c;否则API可能获取的值不准确 方法一:GetScaleFactorForMonitor 通过枚举显示器获取不同设备的DPI&#xff0c;获取的是实际DPI static BOOL CALLBACK MonitorEnumProc(HMONITOR hMonitor,HDC…